Refactors concurrent close tests (#411)

Signed-off-by: Adrian Cole <adrian@tetrate.io>
This commit is contained in:
Crypt Keeper
2022-03-26 15:01:47 +08:00
committed by GitHub
parent a1dc1f56a0
commit cde7ce6e4a
2 changed files with 183 additions and 89 deletions

View File

@@ -196,3 +196,20 @@ means that we have 1 GiB size of slice which seems large enough for most applica
## JIT engine implementation ## JIT engine implementation
See [wasm/jit/RATIONALE.md](internal/wasm/jit/RATIONALE.md). See [wasm/jit/RATIONALE.md](internal/wasm/jit/RATIONALE.md).
## Golang patterns
### Lock-free, cross-goroutine observations of updates
How to achieve cross-goroutine reads of a variable are not explicitly defined in https://go.dev/ref/mem. wazero uses
atomics to implement this following unofficial practice. For example, a `Close` operation can be guarded to happen only
once via compare-and-swap (CAS) against a zero value. When we use this pattern, we consistently use atomics to both
read and update the same numeric field.
In lieu of formal documentation, we infer this pattern works from other sources (besides tests):
* `sync.WaitGroup` by definition must support calling `Add` from other goroutines. Internally, it uses atomics.
* rsc in golang/go#5045 writes "atomics guarantee sequential consistency among the atomic variables".
See https://github.com/golang/go/blob/011fd002457da0823da5f06b099fcf6e21444b00/src/sync/waitgroup.go#L64
See https://github.com/golang/go/issues/5045#issuecomment-252730563
See https://www.youtube.com/watch?v=VmrEG-3bWyM

View File

@@ -1,13 +1,15 @@
package adhoc package adhoc
import ( import (
"errors"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/wasm"
) )
func TestJITConcurrency(t *testing.T) { func TestJITConcurrency(t *testing.T) {
@@ -58,101 +60,176 @@ func runAdhocTestUnderHighConcurrency(t *testing.T, newRuntimeConfig func() *waz
} }
func singleModuleHighConcurrency(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) { func singleModuleHighConcurrency(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
args := []uint64{1, 123} t.Run("close importing module while in use", func(t *testing.T) {
exp := args[0] + args[1] closeImportingModuleWhileInUse(t, newRuntimeConfig)
const goroutines = 1000
const delay = 100 * time.Millisecond
t.Run("single module", func(t *testing.T) { })
r := wazero.NewRuntimeWithConfig(newRuntimeConfig()) t.Run("close imported module while in use", func(t *testing.T) {
imported, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{ closeImportedModuleWhileInUse(t, newRuntimeConfig)
"delay": func() { time.Sleep(delay) }, })
}).Instantiate() }
require.NoError(t, err)
source := []byte(`(module type blocker struct {
(import "host" "delay" (func $delay )) running chan bool
(func $delay_add unblocked chan bool
(param $value_1 i32) (param $value_2 i32) // closed should panic if fn is called when the value is 1.
(result i32) //
// Note: Exclusively reading and updating this with atomics guarantees cross-goroutine observations.
// See /RATIONALE.md
closed uint32
}
func (b *blocker) close() {
atomic.StoreUint32(&b.closed, 1)
}
// fn sends into the running channel, then blocks until it can receive from unblocked one.
func (b *blocker) fn(input uint32) uint32 {
if atomic.LoadUint32(&b.closed) == 1 {
panic(errors.New("closed"))
}
b.running <- true // Signal the goroutine is running
<-b.unblocked // Await until unblocked
return input
}
var blockAfterAddSource = []byte(`(module
(import "host" "block" (func $block (param i32) (result i32)))
(func $block_after_add (param i32) (param i32) (result i32)
local.get 0 local.get 0
local.get 1 local.get 1
i32.add i32.add
call $delay call $block
) )
(export "delay_add" (func $delay_add)) (export "block_after_add" (func $block_after_add))
)`) )`)
module, err := r.InstantiateModuleFromSource(source) func closeImportingModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
args := []uint64{1, 123}
exp := args[0] + args[1]
const goroutines = 1000
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
running := make(chan bool)
unblocked := make(chan bool)
b := &blocker{running: running, unblocked: unblocked}
imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate()
require.NoError(t, err) require.NoError(t, err)
defer imported.Close()
t.Run("close importing module while in use", func(t *testing.T) { importing, err := r.InstantiateModuleFromSource(blockAfterAddSource)
fn := module.ExportedFunction("delay_add") require.NoError(t, err)
require.NotNil(t, fn) defer importing.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(goroutines) wg.Add(goroutines)
for i := 0; i < goroutines; i++ { for i := 0; i < goroutines; i++ {
if i == 200 {
// Close the importing module.
module.Close()
} else if i == 400 {
// Re-instantiate the importing module, and swap the function.
module, err = r.InstantiateModuleFromSource(source)
require.NoError(t, err)
fn = module.ExportedFunction("delay_add")
require.NotNil(t, fn)
}
go func() { go func() {
defer wg.Done() defer wg.Done()
res, err := fn.Call(nil, args...) requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
require.NoError(t, err)
require.Equal(t, exp, res[0])
}() }()
} }
wg.Wait()
})
t.Run("close imported module while in use", func(t *testing.T) {
fn := module.ExportedFunction("delay_add")
require.NotNil(t, fn)
var newImportedModuleShouldPanic bool = true // Wait until all goroutines are running.
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ { for i := 0; i < goroutines; i++ {
if i == 200 { <-running
// Close the imported module.
imported.Close()
// Re-instantiate the imported module but at this point importing module should
// not use the new one.
imported, err = r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"delay": func() {
time.Sleep(delay)
if newImportedModuleShouldPanic {
panic("unreachable")
} }
}, // Close the module that exported blockAfterAdd (noting calls to this are in-flight).
}).Instantiate() require.NoError(t, importing.Close())
// Prove a module can be redefined even with in-flight calls.
importing, err = r.InstantiateModuleFromSource(blockAfterAddSource)
require.NoError(t, err) require.NoError(t, err)
} else if i == 400 { defer importing.Close()
// Re-instantiate the importing module, and swap the function which will use
// the new imported module. // If unloading worked properly, a new function call should route to the newly instantiated module.
module.Close() wg.Add(1)
module, err = r.InstantiateModuleFromSource(source)
require.NoError(t, err)
fn = module.ExportedFunction("delay_add")
require.NotNil(t, fn)
// The new imported module is now in use.
newImportedModuleShouldPanic = false
}
go func() { go func() {
defer wg.Done() defer wg.Done()
res, err := fn.Call(nil, 1, 123) requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
require.NoError(t, err)
require.Equal(t, exp, res[0])
}() }()
<-running // Wait for the above function to be in-flight
// Unblock the other goroutines to ensure they don't err on the return path of a closed module.
for i := 0; i < goroutines+1; i++ {
unblocked <- true
} }
wg.Wait() wg.Wait() // Wait for all goroutines to finish.
}) }
})
func closeImportedModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
args := []uint64{1, 123}
exp := args[0] + args[1]
const goroutines = 1000
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
running := make(chan bool)
unblocked := make(chan bool)
b := &blocker{running: running, unblocked: unblocked}
imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate()
require.NoError(t, err)
defer imported.Close()
importing, err := r.InstantiateModuleFromSource(blockAfterAddSource)
require.NoError(t, err)
defer importing.Close()
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}()
}
// Wait until all goroutines are running.
for i := 0; i < goroutines; i++ {
<-running
}
// Close the module that exported the host function (noting calls to this are in-flight).
require.NoError(t, imported.Close())
// Close the underlying host function, which causes future calls to it to fail.
b.close()
require.Panics(t, func() {
b.fn(1) // validate it would fail if accidentally called
})
// Close the importing module
require.NoError(t, importing.Close())
// Prove a host module can be redefined even with in-flight calls.
b1 := &blocker{running: running, unblocked: unblocked} // New instance, so not yet closed!
imported, err = r.NewModuleBuilder("host").ExportFunction("block", b1.fn).Instantiate()
require.NoError(t, err)
defer imported.Close()
// Redefine the importing module, which should link to the redefined host module.
importing, err = r.InstantiateModuleFromSource(blockAfterAddSource)
require.NoError(t, err)
defer importing.Close()
// If unloading worked properly, a new function call should route to the newly instantiated host module.
wg.Add(1)
go func() {
defer wg.Done()
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}()
<-running // Wait for the above function to be in-flight
// Unblock the other goroutines to ensure they don't err on the return path of a closed module.
for i := 0; i < goroutines+1; i++ {
unblocked <- true
}
wg.Wait() // Wait for all goroutines to finish.
}
func requireFunctionCall(t *testing.T, fn wasm.Function, args []uint64, exp uint64) {
res, err := fn.Call(nil, args...)
// We don't expect an error because there's currently no functionality to detect or fail on a closed module.
require.NoError(t, err)
require.Equal(t, exp, res[0])
} }