diff --git a/RATIONALE.md b/RATIONALE.md index e3955995..0d505912 100644 --- a/RATIONALE.md +++ b/RATIONALE.md @@ -196,3 +196,20 @@ means that we have 1 GiB size of slice which seems large enough for most applica ## JIT engine implementation See [wasm/jit/RATIONALE.md](internal/wasm/jit/RATIONALE.md). + +## Golang patterns + +### Lock-free, cross-goroutine observations of updates + +How to achieve cross-goroutine reads of a variable are not explicitly defined in https://go.dev/ref/mem. wazero uses +atomics to implement this following unofficial practice. For example, a `Close` operation can be guarded to happen only +once via compare-and-swap (CAS) against a zero value. When we use this pattern, we consistently use atomics to both +read and update the same numeric field. + +In lieu of formal documentation, we infer this pattern works from other sources (besides tests): + * `sync.WaitGroup` by definition must support calling `Add` from other goroutines. Internally, it uses atomics. + * rsc in golang/go#5045 writes "atomics guarantee sequential consistency among the atomic variables". + +See https://github.com/golang/go/blob/011fd002457da0823da5f06b099fcf6e21444b00/src/sync/waitgroup.go#L64 +See https://github.com/golang/go/issues/5045#issuecomment-252730563 +See https://www.youtube.com/watch?v=VmrEG-3bWyM diff --git a/tests/engine/concurrency_test.go b/tests/engine/concurrency_test.go index f0b58d9c..4052fd9c 100644 --- a/tests/engine/concurrency_test.go +++ b/tests/engine/concurrency_test.go @@ -1,13 +1,15 @@ package adhoc import ( + "errors" "sync" + "sync/atomic" "testing" - "time" "github.com/stretchr/testify/require" "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/wasm" ) func TestJITConcurrency(t *testing.T) { @@ -58,101 +60,176 @@ func runAdhocTestUnderHighConcurrency(t *testing.T, newRuntimeConfig func() *waz } func singleModuleHighConcurrency(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) { + t.Run("close importing module while in use", func(t *testing.T) { + closeImportingModuleWhileInUse(t, newRuntimeConfig) + + }) + t.Run("close imported module while in use", func(t *testing.T) { + closeImportedModuleWhileInUse(t, newRuntimeConfig) + }) +} + +type blocker struct { + running chan bool + unblocked chan bool + // closed should panic if fn is called when the value is 1. + // + // Note: Exclusively reading and updating this with atomics guarantees cross-goroutine observations. + // See /RATIONALE.md + closed uint32 +} + +func (b *blocker) close() { + atomic.StoreUint32(&b.closed, 1) +} + +// fn sends into the running channel, then blocks until it can receive from unblocked one. +func (b *blocker) fn(input uint32) uint32 { + if atomic.LoadUint32(&b.closed) == 1 { + panic(errors.New("closed")) + } + b.running <- true // Signal the goroutine is running + <-b.unblocked // Await until unblocked + return input +} + +var blockAfterAddSource = []byte(`(module + (import "host" "block" (func $block (param i32) (result i32))) + (func $block_after_add (param i32) (param i32) (result i32) + local.get 0 + local.get 1 + i32.add + call $block + ) + (export "block_after_add" (func $block_after_add)) +)`) + +func closeImportingModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) { args := []uint64{1, 123} exp := args[0] + args[1] const goroutines = 1000 - const delay = 100 * time.Millisecond - t.Run("single module", func(t *testing.T) { - r := wazero.NewRuntimeWithConfig(newRuntimeConfig()) - imported, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{ - "delay": func() { time.Sleep(delay) }, - }).Instantiate() - require.NoError(t, err) + r := wazero.NewRuntimeWithConfig(newRuntimeConfig()) - source := []byte(`(module - (import "host" "delay" (func $delay )) - (func $delay_add - (param $value_1 i32) (param $value_2 i32) - (result i32) - local.get 0 - local.get 1 - i32.add - call $delay - ) - (export "delay_add" (func $delay_add)) - )`) + running := make(chan bool) + unblocked := make(chan bool) + b := &blocker{running: running, unblocked: unblocked} - module, err := r.InstantiateModuleFromSource(source) - require.NoError(t, err) + imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate() + require.NoError(t, err) + defer imported.Close() - t.Run("close importing module while in use", func(t *testing.T) { - fn := module.ExportedFunction("delay_add") - require.NotNil(t, fn) + importing, err := r.InstantiateModuleFromSource(blockAfterAddSource) + require.NoError(t, err) + defer importing.Close() - var wg sync.WaitGroup - wg.Add(goroutines) - for i := 0; i < goroutines; i++ { - if i == 200 { - // Close the importing module. - module.Close() - } else if i == 400 { - // Re-instantiate the importing module, and swap the function. - module, err = r.InstantiateModuleFromSource(source) - require.NoError(t, err) - fn = module.ExportedFunction("delay_add") - require.NotNil(t, fn) - } - go func() { - defer wg.Done() - res, err := fn.Call(nil, args...) - require.NoError(t, err) - require.Equal(t, exp, res[0]) - }() - } - wg.Wait() - }) - t.Run("close imported module while in use", func(t *testing.T) { - fn := module.ExportedFunction("delay_add") - require.NotNil(t, fn) + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp) + }() + } - var newImportedModuleShouldPanic bool = true - var wg sync.WaitGroup - wg.Add(goroutines) - for i := 0; i < goroutines; i++ { - if i == 200 { - // Close the imported module. - imported.Close() - // Re-instantiate the imported module but at this point importing module should - // not use the new one. - imported, err = r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{ - "delay": func() { - time.Sleep(delay) - if newImportedModuleShouldPanic { - panic("unreachable") - } - }, - }).Instantiate() - require.NoError(t, err) - } else if i == 400 { - // Re-instantiate the importing module, and swap the function which will use - // the new imported module. - module.Close() - module, err = r.InstantiateModuleFromSource(source) - require.NoError(t, err) - fn = module.ExportedFunction("delay_add") - require.NotNil(t, fn) - // The new imported module is now in use. - newImportedModuleShouldPanic = false - } - go func() { - defer wg.Done() - res, err := fn.Call(nil, 1, 123) - require.NoError(t, err) - require.Equal(t, exp, res[0]) - }() - } - wg.Wait() - }) - }) + // Wait until all goroutines are running. + for i := 0; i < goroutines; i++ { + <-running + } + // Close the module that exported blockAfterAdd (noting calls to this are in-flight). + require.NoError(t, importing.Close()) + + // Prove a module can be redefined even with in-flight calls. + importing, err = r.InstantiateModuleFromSource(blockAfterAddSource) + require.NoError(t, err) + defer importing.Close() + + // If unloading worked properly, a new function call should route to the newly instantiated module. + wg.Add(1) + go func() { + defer wg.Done() + requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp) + }() + <-running // Wait for the above function to be in-flight + + // Unblock the other goroutines to ensure they don't err on the return path of a closed module. + for i := 0; i < goroutines+1; i++ { + unblocked <- true + } + wg.Wait() // Wait for all goroutines to finish. +} + +func closeImportedModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) { + args := []uint64{1, 123} + exp := args[0] + args[1] + const goroutines = 1000 + + r := wazero.NewRuntimeWithConfig(newRuntimeConfig()) + + running := make(chan bool) + unblocked := make(chan bool) + b := &blocker{running: running, unblocked: unblocked} + + imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate() + require.NoError(t, err) + defer imported.Close() + + importing, err := r.InstantiateModuleFromSource(blockAfterAddSource) + require.NoError(t, err) + defer importing.Close() + + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp) + }() + } + + // Wait until all goroutines are running. + for i := 0; i < goroutines; i++ { + <-running + } + // Close the module that exported the host function (noting calls to this are in-flight). + require.NoError(t, imported.Close()) + // Close the underlying host function, which causes future calls to it to fail. + b.close() + require.Panics(t, func() { + b.fn(1) // validate it would fail if accidentally called + }) + // Close the importing module + require.NoError(t, importing.Close()) + + // Prove a host module can be redefined even with in-flight calls. + b1 := &blocker{running: running, unblocked: unblocked} // New instance, so not yet closed! + imported, err = r.NewModuleBuilder("host").ExportFunction("block", b1.fn).Instantiate() + require.NoError(t, err) + defer imported.Close() + + // Redefine the importing module, which should link to the redefined host module. + importing, err = r.InstantiateModuleFromSource(blockAfterAddSource) + require.NoError(t, err) + defer importing.Close() + + // If unloading worked properly, a new function call should route to the newly instantiated host module. + wg.Add(1) + go func() { + defer wg.Done() + requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp) + }() + <-running // Wait for the above function to be in-flight + + // Unblock the other goroutines to ensure they don't err on the return path of a closed module. + for i := 0; i < goroutines+1; i++ { + unblocked <- true + } + wg.Wait() // Wait for all goroutines to finish. +} + +func requireFunctionCall(t *testing.T, fn wasm.Function, args []uint64, exp uint64) { + res, err := fn.Call(nil, args...) + // We don't expect an error because there's currently no functionality to detect or fail on a closed module. + require.NoError(t, err) + require.Equal(t, exp, res[0]) }