Refactors concurrency tests as "hammer" tests and adjusts style (#415)

This refactors tests that hammer shared state in ways that use locks or
atomics, so that they are consistent and also follow practice internally
used by Go itself. Notably, this supports the `-test.short` flag also.

Signed-off-by: Adrian Cole <adrian@tetrate.io>
Co-authored-by: Takeshi Yoneda <takeshi@tetrate.io>
This commit is contained in:
Crypt Keeper
2022-03-29 16:23:39 +08:00
committed by GitHub
parent 52fdc073dc
commit 146615d94b
4 changed files with 331 additions and 257 deletions

View File

@@ -199,6 +199,62 @@ See [wasm/jit/RATIONALE.md](internal/wasm/jit/RATIONALE.md).
## Golang patterns
### Hammer tests
Code that uses concurrency primitives, such as locks or atomics, should include "hammer tests", which run large loops
inside a bounded amount of goroutines, run by half that many `GOMAXPROCS`. These are named consistently "hammer", so
they are easy to find. The name inherits from some existing tests in [golang/go](https://github.com/golang/go/search?q=hammer&type=code).
Here is an annotated description of the key pieces of a hammer test:
1. `P` declares the count of goroutines to use, defaulting to 8 or 4 if `testing.Short`.
* Half this amount are the cores used, and 4 is less than a modern laptop's CPU. This allows multiple "hammer" tests to run in parallel.
2. `N` declares the scale of work (loop) per goroutine, defaulting to value that finishes in ~0.1s on a modern laptop.
* When in doubt, try 1000 or 100 if `testing.Short`
* Remember, there are multiple hammer tests and CI nodes are slow. Slower tests hurt feedback loops.
3. `defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2))` makes goroutines switch cores, testing visibility of shared data.
4. Track goroutines progress via `c := make(chan int)` where each goroutine in `P` defers `c <- 1`.
1. Tests use `require.XXX`, so `recover()` into `t.Fail` in a `defer` function before `c <- 1`.
* This makes it easier to spot larger concurrency problems as you see each failure, not just the first.
2. After the `defer` function run the stateful function `N` times in a normal loop.
* This loop should trigger shared state problems as locks or atomics are contended by `P` goroutines.
5. After all `P` goroutines launch, block the runner by blocking on the channel (`<-c`) for each `P`.
6. When all goroutines complete, `return` if `t.Failed()`, otherwise perform follow-up state checks.
Here's an example:
```go
P := 8 // 1. count of goroutines
N := 1000 // 2. count of work per goroutine
if testing.Short() {
P = 4 // 1. count of goroutines if `-test.short`
N = 100 // 2. count of work per goroutine if `-test.short`
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2)) // 3. Ensure goroutines switch cores
c := make(chan int) // 4. tracking channel
for p := 0; p < P; p++ {
p := p // pin p, so it is stable inside the goroutine.
go func() { // Launch goroutine 'p'
defer func() {
if recovered := recover(); recovered != nil {
t.Error(recovered.(string)) // 4.1. Accumulate error instead of failing on first.
}
c <- 1 // 4.1 count down regardless of error
}()
for n := 0; n < N; n++ { // 4.2 loop the test N times
test(p, n)
}
}()
}
for i := 0; i < P; i++ { // 5. Block until all goroutines finish
<-c
}
if t.Failed() { // 6. Return early if there are concurrency errors.
return
}
```
### Lock-free, cross-goroutine observations of updates
How to achieve cross-goroutine reads of a variable are not explicitly defined in https://go.dev/ref/mem. wazero uses

View File

@@ -6,8 +6,8 @@ import (
"errors"
"fmt"
"math"
"runtime"
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/require"
@@ -174,20 +174,17 @@ func TestStore_CloseModule(t *testing.T) {
}
}
func TestStore_concurrent(t *testing.T) {
func TestStore_hammer(t *testing.T) {
const importedModuleName = "imported"
const goroutines = 1000
m, err := NewHostModule(importedModuleName, map[string]interface{}{"fn": func(wasm.Module) {}})
require.NoError(t, err)
var wg sync.WaitGroup
s := newStore()
_, err = s.Instantiate(context.Background(), m, importedModuleName, nil)
imported, err := s.Instantiate(context.Background(), m, importedModuleName, nil)
require.NoError(t, err)
hm, ok := s.modules[importedModuleName]
_, ok := s.modules[imported.module.Name]
require.True(t, ok)
importingModule := &Module{
@@ -202,29 +199,20 @@ func TestStore_concurrent(t *testing.T) {
},
}
// Concurrent instantiation.
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
_, err := s.Instantiate(context.Background(), importingModule, strconv.Itoa(i), nil)
require.NoError(t, err)
}(i)
// Concurrent instantiate, close should test if locks work on the store. If they don't, we should see leaked modules
// after all of these complete, or an error raised.
hammer(t, func(p, n int) {
moduleName := fmt.Sprintf("%s:%d-%d", t.Name(), p, n)
_, instantiateErr := s.Instantiate(context.Background(), importingModule, moduleName, DefaultSysContext())
require.NoError(t, instantiateErr)
require.NoError(t, s.CloseModule(moduleName))
})
if t.Failed() {
return // At least one test failed, so return now.
}
wg.Wait()
// Concurrent release.
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func(i int) {
defer wg.Done()
require.NoError(t, s.CloseModule(strconv.Itoa(i)))
require.NoError(t, err)
}(i)
}
wg.Wait()
require.NoError(t, s.CloseModule(hm.Name))
// Close the imported module.
require.NoError(t, s.CloseModule(imported.module.Name))
// All instances are freed.
require.Len(t, s.modules, 0)
@@ -751,3 +739,38 @@ func TestModuleInstance_applyData(t *testing.T) {
})
require.Equal(t, []byte{0xa, 0xf, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5}, m.Memory.Buffer)
}
// hammer is a concurrency test described in /RATIONALE.md.
func hammer(t *testing.T, test func(p, n int)) {
P := 8 // max count of goroutines
N := 1000 // work per goroutine
if testing.Short() { // Adjust down if `-test.short`
P = 4
N = 100
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores.
// Add channel that tracks P goroutines.
c := make(chan int)
for p := 0; p < P; p++ {
p := p // pin p, so it is stable inside the goroutine.
go func() { // Launch goroutine 'p'
defer func() { // Ensure each require.XX failure is visible on hammer test fail.
if recovered := recover(); recovered != nil {
t.Error(recovered.(string))
}
c <- 1
}()
for n := 0; n < N; n++ { // Invoke one test
test(p, n)
}
}()
}
// Block until P goroutines finish.
for i := 0; i < P; i++ {
<-c
}
}

View File

@@ -14,15 +14,39 @@ import (
publicwasm "github.com/tetratelabs/wazero/wasm"
)
func TestJITAdhoc(t *testing.T) {
var tests = map[string]func(t *testing.T, r wazero.Runtime){
"huge stack": testHugeStack,
"unreachable": testUnreachable,
"recursive entry": testRecursiveEntry,
"imported-and-exported func": testImportedAndExportedFunc,
"host function with context parameter": testHostFunctionContextParameter,
"host function with numeric parameter": testHostFunctionNumericParameter,
"close module with in-flight calls": testCloseInFlight,
"close imported module with in-flight calls": testCloseImportedInFlight,
}
func TestEngineJIT(t *testing.T) {
if !wazero.JITSupported {
t.Skip()
}
runAdhocTests(t, wazero.NewRuntimeConfigJIT)
runAllTests(t, tests, wazero.NewRuntimeConfigJIT())
}
func TestInterpreterAdhoc(t *testing.T) {
runAdhocTests(t, wazero.NewRuntimeConfigInterpreter)
func TestEngineInterpreter(t *testing.T) {
runAllTests(t, tests, wazero.NewRuntimeConfigInterpreter())
}
type configContextKey string
var configContext = context.WithValue(context.Background(), configContextKey("wa"), "zero")
func runAllTests(t *testing.T, tests map[string]func(t *testing.T, r wazero.Runtime), config *wazero.RuntimeConfig) {
for name, testf := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()
testf(t, wazero.NewRuntimeWithConfig(config.WithContext(configContext)))
})
}
}
var (
@@ -34,29 +58,7 @@ var (
hugestackWasm []byte
)
func runAdhocTests(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
t.Run("huge stack", func(t *testing.T) {
testHugeStack(t, newRuntimeConfig)
})
t.Run("unreachable", func(t *testing.T) {
testUnreachable(t, newRuntimeConfig)
})
t.Run("recursive entry", func(t *testing.T) {
testRecursiveEntry(t, newRuntimeConfig)
})
t.Run("imported-and-exported func", func(t *testing.T) {
testImportedAndExportedFunc(t, newRuntimeConfig)
})
t.Run("host function with float type", func(t *testing.T) {
testHostFunctions(t, newRuntimeConfig)
})
t.Run("close with outstanding calls", func(t *testing.T) {
testAdhocCloseWhileExecution(t, newRuntimeConfig)
})
}
func testHugeStack(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
func testHugeStack(t *testing.T, r wazero.Runtime) {
module, err := r.InstantiateModuleFromSource(hugestackWasm)
require.NoError(t, err)
defer module.Close()
@@ -68,13 +70,11 @@ func testHugeStack(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig)
require.NoError(t, err)
}
func testUnreachable(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
func testUnreachable(t *testing.T, r wazero.Runtime) {
callUnreachable := func(nil publicwasm.Module) {
panic("panic in host function")
}
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
_, err := r.NewModuleBuilder("host").ExportFunction("cause_unreachable", callUnreachable).Instantiate()
require.NoError(t, err)
@@ -92,14 +92,12 @@ wasm backtrace:
require.Equal(t, exp, err.Error())
}
func testRecursiveEntry(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
func testRecursiveEntry(t *testing.T, r wazero.Runtime) {
hostfunc := func(mod publicwasm.Module) {
_, err := mod.ExportedFunction("called_by_host_func").Call(nil)
require.NoError(t, err)
}
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
_, err := r.NewModuleBuilder("env").ExportFunction("host_func", hostfunc).Instantiate()
require.NoError(t, err)
@@ -113,7 +111,7 @@ func testRecursiveEntry(t *testing.T, newRuntimeConfig func() *wazero.RuntimeCon
// testImportedAndExportedFunc fails if the engine cannot call an "imported-and-then-exported-back" function
// Notably, this uses memory, which ensures wasm.Module is valid in both interpreter and JIT engines.
func testImportedAndExportedFunc(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
func testImportedAndExportedFunc(t *testing.T, r wazero.Runtime) {
var memory *wasm.MemoryInstance
storeInt := func(nil publicwasm.Module, offset uint32, val uint64) uint32 {
if !nil.Memory().WriteUint64Le(offset, val) {
@@ -124,8 +122,6 @@ func testImportedAndExportedFunc(t *testing.T, newRuntimeConfig func() *wazero.R
return 0
}
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
_, err := r.NewModuleBuilder("").ExportFunction("store_int", storeInt).Instantiate()
require.NoError(t, err)
@@ -149,105 +145,122 @@ func testImportedAndExportedFunc(t *testing.T, newRuntimeConfig func() *wazero.R
require.Equal(t, []byte{0x0, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x0}, memory.Buffer[0:10])
}
func TestHostFunctions(t *testing.T) {
testHostFunctions(t, func() *wazero.RuntimeConfig {
return wazero.NewRuntimeConfig()
})
}
// testHostFunctionContextParameter ensures arg0 is optionally a context.
func testHostFunctionContextParameter(t *testing.T, r wazero.Runtime) {
importedName := t.Name() + "-imported"
importingName := t.Name() + "-importing"
// testHostFunctions ensures arg0 is optionally a context, and fails if a float parameter corrupts a host function value
func testHostFunctions(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
var m publicwasm.Module
floatFuncs := map[string]interface{}{
"identity_f32": func(value float32) float32 {
return value
var importing publicwasm.Module
fns := map[string]interface{}{
"no_context": func(p uint32) uint32 {
return p + 1
},
"identity_f64": func(value float64) float64 {
return value
}}
floatFuncsGoContext := map[string]interface{}{
"identity_f32": func(ctx context.Context, value float32) float32 {
require.Equal(t, context.Background(), ctx)
return value
"go_context": func(ctx context.Context, p uint32) uint32 {
require.Equal(t, configContext, ctx)
return p + 1
},
"identity_f64": func(ctx context.Context, value float64) float64 {
require.Equal(t, context.Background(), ctx)
return value
}}
floatFuncsModule := map[string]interface{}{
"identity_f32": func(ctx publicwasm.Module, value float32) float32 {
require.Equal(t, m, ctx)
return value
"module_context": func(module publicwasm.Module, p uint32) uint32 {
require.Equal(t, importing, module)
return p + 1
},
"identity_f64": func(ctx publicwasm.Module, value float64) float64 {
require.Equal(t, m, ctx)
return value
}}
}
for k, v := range map[string]map[string]interface{}{
"": floatFuncs,
" - context.Context": floatFuncsGoContext,
" - wasm.Module": floatFuncsModule,
} {
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
imported, err := r.NewModuleBuilder(importedName).ExportFunctions(fns).Instantiate()
require.NoError(t, err)
defer imported.Close()
_, err := r.NewModuleBuilder("host").ExportFunctions(v).Instantiate()
require.NoError(t, err)
m, err = r.InstantiateModuleFromSource([]byte(`(module $test
;; these imports return the input param
(import "host" "identity_f32" (func $test.identity_f32 (param f32) (result f32)))
(import "host" "identity_f64" (func $test.identity_f64 (param f64) (result f64)))
;; 'call->test.identity_fXX' proxies 'test.identity_fXX' to test floats aren't corrupted through OpCodeCall
(func $call->test.identity_f32 (param f32) (result f32)
local.get 0
call $test.identity_f32
)
(export "call->test.identity_f32" (func $call->test.identity_f32))
(func $call->test.identity_f64 (param f64) (result f64)
local.get 0
call $test.identity_f64
)
(export "call->test.identity_f64" (func $call->test.identity_f64))
)`))
require.NoError(t, err)
defer m.Close()
t.Run(fmt.Sprintf("host function with f32 param%s", k), func(t *testing.T) {
name := "call->test.identity_f32"
input := float32(math.MaxFloat32)
results, err := m.ExportedFunction(name).Call(nil, publicwasm.EncodeF32(input)) // float bits are a uint32 value, call requires uint64
for test := range fns {
t.Run(test, func(t *testing.T) {
// Instantiate a module that uses Wasm code to call the host function.
importing, err = r.InstantiateModuleFromSource([]byte(fmt.Sprintf(`(module $%[1]s
(import "%[2]s" "%[3]s" (func $%[3]s (param i32) (result i32)))
(func $call_%[3]s (param i32) (result i32) local.get 0 call $%[3]s)
(export "call->%[3]s" (func $call_%[3]s))
)`, importingName, importedName, test)))
require.NoError(t, err)
require.Equal(t, input, publicwasm.DecodeF32(results[0]))
})
defer importing.Close()
t.Run(fmt.Sprintf("host function with f64 param%s", k), func(t *testing.T) {
name := "call->test.identity_f64"
input := math.MaxFloat64
results, err := m.ExportedFunction(name).Call(nil, publicwasm.EncodeF64(input))
results, err := importing.ExportedFunction("call->"+test).Call(nil, math.MaxUint32-1)
require.NoError(t, err)
require.Equal(t, input, publicwasm.DecodeF64(results[0]))
require.Equal(t, uint64(math.MaxUint32), results[0])
})
}
}
// testAdhocCloseWhileExecution ensures that calling Module.Close with outstanding calls is safe.
func testAdhocCloseWhileExecution(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
t.Run("singleton", func(t *testing.T) {
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
var moduleCloser func() error
_, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"close_module": func() { _ = moduleCloser() }, // Closing while executing itself.
}).Instantiate()
require.NoError(t, err)
// testHostFunctionNumericParameter ensures numeric parameters aren't corrupted
func testHostFunctionNumericParameter(t *testing.T, r wazero.Runtime) {
importedName := t.Name() + "-imported"
importingName := t.Name() + "-importing"
m, err := r.InstantiateModuleFromSource([]byte(`(module $test
fns := map[string]interface{}{
"i32": func(p uint32) uint32 {
return p + 1
},
"i64": func(p uint64) uint64 {
return p + 1
},
"f32": func(p float32) float32 {
return p + 1
},
"f64": func(p float64) float64 {
return p + 1
},
}
imported, err := r.NewModuleBuilder(importedName).ExportFunctions(fns).Instantiate()
require.NoError(t, err)
defer imported.Close()
for _, test := range []struct {
name string
input, expected uint64
}{
{
name: "i32",
input: math.MaxUint32 - 1,
expected: math.MaxUint32,
},
{
name: "i64",
input: math.MaxUint64 - 1,
expected: math.MaxUint64,
},
{
name: "f32",
input: publicwasm.EncodeF32(math.MaxFloat32 - 1),
expected: publicwasm.EncodeF32(math.MaxFloat32),
},
{
name: "f64",
input: publicwasm.EncodeF64(math.MaxFloat64 - 1),
expected: publicwasm.EncodeF64(math.MaxFloat64),
},
} {
t.Run(test.name, func(t *testing.T) {
// Instantiate a module that uses Wasm code to call the host function.
importing, err := r.InstantiateModuleFromSource([]byte(fmt.Sprintf(`(module $%[1]s
(import "%[2]s" "%[3]s" (func $%[3]s (param %[3]s) (result %[3]s)))
(func $call_%[3]s (param %[3]s) (result %[3]s) local.get 0 call $%[3]s)
(export "call->%[3]s" (func $call_%[3]s))
)`, importingName, importedName, test.name)))
require.NoError(t, err)
defer importing.Close()
results, err := importing.ExportedFunction("call->"+test.name).Call(nil, test.input)
require.NoError(t, err)
require.Equal(t, test.expected, results[0])
})
}
}
func testCloseInFlight(t *testing.T, r wazero.Runtime) {
var moduleCloser func() error
_, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"close_module": func() { _ = moduleCloser() }, // Closing while executing itself.
}).Instantiate()
require.NoError(t, err)
m, err := r.InstantiateModuleFromSource([]byte(`(module $test
(import "host" "close_module" (func $close_module ))
(func $close_while_execution
@@ -255,21 +268,22 @@ func testAdhocCloseWhileExecution(t *testing.T, newRuntimeConfig func() *wazero.
)
(export "close_while_execution" (func $close_while_execution))
)`))
require.NoError(t, err)
require.NoError(t, err)
moduleCloser = m.Close
moduleCloser = m.Close
_, err = m.ExportedFunction("close_while_execution").Call(nil)
require.NoError(t, err)
})
t.Run("close imported module", func(t *testing.T) {
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
importedModule, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"already_closed": func() {},
}).Instantiate()
require.NoError(t, err)
_, err = m.ExportedFunction("close_while_execution").Call(nil)
require.NoError(t, err)
m, err := r.InstantiateModuleFromSource([]byte(`(module $test
}
func testCloseImportedInFlight(t *testing.T, r wazero.Runtime) {
importedModule, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"already_closed": func() {},
}).Instantiate()
require.NoError(t, err)
m, err := r.InstantiateModuleFromSource([]byte(`(module $test
(import "host" "already_closed" (func $already_closed ))
(func $close_parent_before_execution
@@ -277,21 +291,20 @@ func testAdhocCloseWhileExecution(t *testing.T, newRuntimeConfig func() *wazero.
)
(export "close_parent_before_execution" (func $close_parent_before_execution))
)`))
require.NoError(t, err)
require.NoError(t, err)
// Closing the imported module before making call should also safe.
require.NoError(t, importedModule.Close())
// Closing the imported module before making call should also safe.
require.NoError(t, importedModule.Close())
// Even we can re-enstantiate the module for the same name.
importedModuleNew, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"already_closed": func() {
panic("unreachable") // The new module's function must not be called.
},
}).Instantiate()
require.NoError(t, err)
defer importedModuleNew.Close() // nolint
// Even we can re-enstantiate the module for the same name.
importedModuleNew, err := r.NewModuleBuilder("host").ExportFunctions(map[string]interface{}{
"already_closed": func() {
panic("unreachable") // The new module's function must not be called.
},
}).Instantiate()
require.NoError(t, err)
defer importedModuleNew.Close() // nolint
_, err = m.ExportedFunction("close_parent_before_execution").Call(nil)
require.NoError(t, err)
})
_, err = m.ExportedFunction("close_parent_before_execution").Call(nil)
require.NoError(t, err)
}

View File

@@ -2,7 +2,7 @@ package adhoc
import (
"errors"
"sync"
"runtime"
"sync/atomic"
"testing"
@@ -12,61 +12,21 @@ import (
"github.com/tetratelabs/wazero/wasm"
)
func TestJITConcurrency(t *testing.T) {
var hammers = map[string]func(t *testing.T, r wazero.Runtime){
// Tests here are similar to what's described in /RATIONALE.md, but deviate as they involve blocking functions.
"close importing module while in use": closeImportingModuleWhileInUse,
"close imported module while in use": closeImportedModuleWhileInUse,
}
func TestEngineJIT_hammer(t *testing.T) {
if !wazero.JITSupported {
t.Skip()
}
runAdhocTestsUnderHighConcurrency(t, wazero.NewRuntimeConfigJIT)
singleModuleHighConcurrency(t, wazero.NewRuntimeConfigJIT)
runAllTests(t, hammers, wazero.NewRuntimeConfigJIT())
}
func TestInterpreterConcurrency(t *testing.T) {
runAdhocTestsUnderHighConcurrency(t, wazero.NewRuntimeConfigInterpreter)
singleModuleHighConcurrency(t, wazero.NewRuntimeConfigInterpreter)
}
func runAdhocTestsUnderHighConcurrency(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
t.Run("huge stack", func(t *testing.T) {
t.Parallel()
runAdhocTestUnderHighConcurrency(t, newRuntimeConfig, testHugeStack)
})
t.Run("unreachable", func(t *testing.T) {
t.Parallel()
runAdhocTestUnderHighConcurrency(t, newRuntimeConfig, testUnreachable)
})
t.Run("recursive entry", func(t *testing.T) {
t.Parallel()
runAdhocTestUnderHighConcurrency(t, newRuntimeConfig, testRecursiveEntry)
})
t.Run("imported-and-exported func", func(t *testing.T) {
t.Parallel()
runAdhocTestUnderHighConcurrency(t, newRuntimeConfig, testImportedAndExportedFunc)
})
}
// runAdhocTestUnderHighConcurrency runs a test case in adhoc_test.go with multiple goroutines.
func runAdhocTestUnderHighConcurrency(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig,
adhocTest func(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig)) {
const goroutinesPerCase = 1000
var wg sync.WaitGroup
wg.Add(goroutinesPerCase)
for i := 0; i < goroutinesPerCase; i++ {
go func() {
defer wg.Done()
adhocTest(t, newRuntimeConfig)
}()
}
wg.Wait()
}
func singleModuleHighConcurrency(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
t.Run("close importing module while in use", func(t *testing.T) {
closeImportingModuleWhileInUse(t, newRuntimeConfig)
})
t.Run("close imported module while in use", func(t *testing.T) {
closeImportedModuleWhileInUse(t, newRuntimeConfig)
})
func TestEngineInterpreter_hammer(t *testing.T) {
runAllTests(t, hammers, wazero.NewRuntimeConfigInterpreter())
}
type blocker struct {
@@ -104,12 +64,15 @@ var blockAfterAddSource = []byte(`(module
(export "block_after_add" (func $block_after_add))
)`)
func closeImportingModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) {
args := []uint64{1, 123}
exp := args[0] + args[1]
const goroutines = 1000
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
P := 8 // P+1 == max count of goroutines/in-flight function calls
if testing.Short() { // Adjust down if `-test.short`
P = 4
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores.
running := make(chan bool)
unblocked := make(chan bool)
@@ -123,19 +86,22 @@ func closeImportingModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazer
require.NoError(t, err)
defer importing.Close()
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
// Add channel that tracks P goroutines.
done := make(chan int)
for p := 0; p < P; p++ {
go func() { // Launch goroutine 'p'
defer completeGoroutine(t, done)
// As this is a blocking function, we can only run 1 function per goroutine.
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}()
}
// Wait until all goroutines are running.
for i := 0; i < goroutines; i++ {
// Wait until all functions are in-flight.
for i := 0; i < P; i++ {
<-running
}
// Close the module that exported blockAfterAdd (noting calls to this are in-flight).
require.NoError(t, importing.Close())
@@ -145,26 +111,30 @@ func closeImportingModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazer
defer importing.Close()
// If unloading worked properly, a new function call should route to the newly instantiated module.
wg.Add(1)
go func() {
defer wg.Done()
defer completeGoroutine(t, done)
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}()
<-running // Wait for the above function to be in-flight
P++
// Unblock the other goroutines to ensure they don't err on the return path of a closed module.
for i := 0; i < goroutines+1; i++ {
// Unblock the functions to ensure they don't err on the return path of a closed module.
for i := 0; i < P; i++ {
unblocked <- true
<-done
}
wg.Wait() // Wait for all goroutines to finish.
}
func closeImportedModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero.RuntimeConfig) {
func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) {
args := []uint64{1, 123}
exp := args[0] + args[1]
const goroutines = 1000
r := wazero.NewRuntimeWithConfig(newRuntimeConfig())
P := 8 // P+1 == max count of goroutines/in-flight function calls
if testing.Short() { // Adjust down if `-test.short`
P = 4
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores.
running := make(chan bool)
unblocked := make(chan bool)
@@ -178,19 +148,22 @@ func closeImportedModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero
require.NoError(t, err)
defer importing.Close()
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
// Add channel that tracks P goroutines.
done := make(chan int)
for p := 0; p < P; p++ {
go func() { // Launch goroutine 'p'
defer completeGoroutine(t, done)
// As this is a blocking function, we can only run 1 function per goroutine.
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}()
}
// Wait until all goroutines are running.
for i := 0; i < goroutines; i++ {
// Wait until all functions are in-flight.
for i := 0; i < P; i++ {
<-running
}
// Close the module that exported the host function (noting calls to this are in-flight).
require.NoError(t, imported.Close())
// Close the underlying host function, which causes future calls to it to fail.
@@ -212,19 +185,20 @@ func closeImportedModuleWhileInUse(t *testing.T, newRuntimeConfig func() *wazero
require.NoError(t, err)
defer importing.Close()
// If unloading worked properly, a new function call should route to the newly instantiated host module.
wg.Add(1)
// If unloading worked properly, a new function call should route to the newly instantiated module.
go func() {
defer wg.Done()
defer completeGoroutine(t, done)
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}()
<-running // Wait for the above function to be in-flight
P++
// Unblock the other goroutines to ensure they don't err on the return path of a closed module.
for i := 0; i < goroutines+1; i++ {
// Unblock the functions to ensure they don't err on the return path of a closed module.
for i := 0; i < P; i++ {
unblocked <- true
<-done
}
wg.Wait() // Wait for all goroutines to finish.
}
func requireFunctionCall(t *testing.T, fn wasm.Function, args []uint64, exp uint64) {
@@ -233,3 +207,11 @@ func requireFunctionCall(t *testing.T, fn wasm.Function, args []uint64, exp uint
require.NoError(t, err)
require.Equal(t, exp, res[0])
}
func completeGoroutine(t *testing.T, c chan int) {
// Ensure each require.XX failure is visible on hammer test fail.
if recovered := recover(); recovered != nil {
t.Error(recovered.(string))
}
c <- 1
}