diff --git a/RATIONALE.md b/RATIONALE.md index f54d9eed..6d19cfd6 100644 --- a/RATIONALE.md +++ b/RATIONALE.md @@ -211,49 +211,19 @@ Here is an annotated description of the key pieces of a hammer test: * When in doubt, try 1000 or 100 if `testing.Short` * Remember, there are multiple hammer tests and CI nodes are slow. Slower tests hurt feedback loops. 3. `defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2))` makes goroutines switch cores, testing visibility of shared data. -4. Track goroutines progress via `c := make(chan int)` where each goroutine in `P` defers `c <- 1`. - 1. Tests use `require.XXX`, so `recover()` into `t.Fail` in a `defer` function before `c <- 1`. +4. To ensure goroutines execute at the same time, block them with `sync.WaitGroup`, initialized to `Add(P)`. + * `sync.WaitGroup` internally uses `runtime_Semacquire` not available in any other library. + * `sync.WaitGroup.Add` with a negative value can unblock many goroutines at the same time, e.g. without a for loop. +5. Track goroutines progress via `finished := make(chan int)` where each goroutine in `P` defers `finished <- 1`. + 1. Tests use `require.XXX`, so `recover()` into `t.Fail` in a `defer` function before `finished <- 1`. * This makes it easier to spot larger concurrency problems as you see each failure, not just the first. - 2. After the `defer` function run the stateful function `N` times in a normal loop. + 2. After the `defer` function, await unblocked, then run the stateful function `N` times in a normal loop. * This loop should trigger shared state problems as locks or atomics are contended by `P` goroutines. -5. After all `P` goroutines launch, block the runner by blocking on the channel (`<-c`) for each `P`. -6. When all goroutines complete, `return` if `t.Failed()`, otherwise perform follow-up state checks. +6. After all `P` goroutines launch, atomically release all of them with `WaitGroup.Add(-P)`. +7. Block the runner on goroutine completion, by (`<-finished`) for each `P`. +8. When all goroutines complete, `return` if `t.Failed()`, otherwise perform follow-up state checks. -Here's an example: -```go -P := 8 // 1. count of goroutines -N := 1000 // 2. count of work per goroutine -if testing.Short() { - P = 4 // 1. count of goroutines if `-test.short` - N = 100 // 2. count of work per goroutine if `-test.short` -} -defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2)) // 3. Ensure goroutines switch cores - -c := make(chan int) // 4. tracking channel -for p := 0; p < P; p++ { - p := p // pin p, so it is stable inside the goroutine. - - go func() { // Launch goroutine 'p' - defer func() { - if recovered := recover(); recovered != nil { - t.Error(recovered.(string)) // 4.1. Accumulate error instead of failing on first. - } - c <- 1 // 4.1 count down regardless of error - }() - - for n := 0; n < N; n++ { // 4.2 loop the test N times - test(p, n) - } - }() -} - -for i := 0; i < P; i++ { // 5. Block until all goroutines finish - <-c -} -if t.Failed() { // 6. Return early if there are concurrency errors. - return -} -``` +This is implemented in wazero in [hammer.go](internal/testing/hammer/hammer.go) ### Lock-free, cross-goroutine observations of updates diff --git a/internal/testing/hammer/hammer.go b/internal/testing/hammer/hammer.go new file mode 100644 index 00000000..b5c08fbe --- /dev/null +++ b/internal/testing/hammer/hammer.go @@ -0,0 +1,105 @@ +package hammer + +import ( + "fmt" + "runtime" + "sync" + "testing" +) + +// Hammer invokes a test concurrently in P goroutines N times per goroutine. +// +// Ex. +// P := 8 // max count of goroutines +// N := 1000 // work per goroutine +// if testing.Short() { // Adjust down if `-test.short` +// P = 4 +// N = 100 +// } +// +// hammer.NewHammer(t, P, N).Run(func(name string) { +// // Do test using name if something needs to be unique. +// }, nil) +// +// if t.Failed() { +// return // At least one test failed, so return now. +// } +// +// See /RATIONALE.md +type Hammer interface { + // Run invokes a concurrency test, as described in /RATIONALE.md. + // + // * test is concurrently run in P goroutines, each looping N times. + // * name is unique within the hammer. + // * onRunning is any function to run after all goroutines are running, but before test executes. + // + // On completion, return early if there's a failure like this: + // if t.Failed() { + // return + // } + Run(test func(name string), onRunning func()) +} + +// NewHammer returns a Hammer initialized to indicated count of goroutines (P) and iterations per goroutine (N). +// As discussed in /RATIONALE.md, optimize for Hammer.Run completing in .1 second on a modern laptop. +func NewHammer(t *testing.T, P, N int) Hammer { + return &hammer{t: t, P: P, N: N} +} + +// hammer implements Hammer +type hammer struct { + // t is the calling test + t *testing.T + // P is the max count of goroutines + P int + // N is the work per goroutine + N int +} + +// Run implements Hammer.Run +func (h *hammer) Run(test func(name string), onRunning func()) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(h.P / 2)) // Ensure goroutines have to switch cores. + + // running track + running := make(chan int) + // unblock needs to happen atomically, so we need to use a WaitGroup + var unblocked sync.WaitGroup + finished := make(chan int) + + unblocked.Add(h.P) // P goroutines will be unblocked by the current goroutine. + for p := 0; p < h.P; p++ { + p := p // pin p, so it is stable inside the goroutine. + + go func() { // Launch goroutine 'p' + defer func() { // Ensure each require.XX failure is visible on hammer test fail. + if recovered := recover(); recovered != nil { + h.t.Error(recovered.(string)) + } + finished <- 1 + }() + running <- 1 + + unblocked.Wait() // Wait to be unblocked + for n := 0; n < h.N; n++ { // Invoke one test + test(fmt.Sprintf("%s:%d-%d", h.t.Name(), p, n)) + } + }() + } + + // Block until P goroutines are running. + for i := 0; i < h.P; i++ { + <-running + } + + if onRunning != nil { + onRunning() + } + + // Release all goroutines at the same time. + unblocked.Add(-h.P) + + // Block until P goroutines finish. + for i := 0; i < h.P; i++ { + <-finished + } +} diff --git a/internal/wasm/store_test.go b/internal/wasm/store_test.go index f65f3af8..fd32d38f 100644 --- a/internal/wasm/store_test.go +++ b/internal/wasm/store_test.go @@ -6,12 +6,12 @@ import ( "errors" "fmt" "math" - "runtime" "strconv" "testing" "github.com/stretchr/testify/require" + "github.com/tetratelabs/wazero/internal/testing/hammer" "github.com/tetratelabs/wazero/wasm" ) @@ -201,12 +201,17 @@ func TestStore_hammer(t *testing.T) { // Concurrent instantiate, close should test if locks work on the store. If they don't, we should see leaked modules // after all of these complete, or an error raised. - hammer(t, func(p, n int) { - moduleName := fmt.Sprintf("%s:%d-%d", t.Name(), p, n) - _, instantiateErr := s.Instantiate(context.Background(), importingModule, moduleName, DefaultSysContext()) + P := 8 // max count of goroutines + N := 1000 // work per goroutine + if testing.Short() { // Adjust down if `-test.short` + P = 4 + N = 100 + } + hammer.NewHammer(t, P, N).Run(func(name string) { + _, instantiateErr := s.Instantiate(context.Background(), importingModule, name, DefaultSysContext()) require.NoError(t, instantiateErr) - require.NoError(t, s.CloseModule(moduleName)) - }) + require.NoError(t, s.CloseModule(name)) + }, nil) if t.Failed() { return // At least one test failed, so return now. } @@ -739,38 +744,3 @@ func TestModuleInstance_applyData(t *testing.T) { }) require.Equal(t, []byte{0xa, 0xf, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5}, m.Memory.Buffer) } - -// hammer is a concurrency test described in /RATIONALE.md. -func hammer(t *testing.T, test func(p, n int)) { - P := 8 // max count of goroutines - N := 1000 // work per goroutine - if testing.Short() { // Adjust down if `-test.short` - P = 4 - N = 100 - } - defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores. - - // Add channel that tracks P goroutines. - c := make(chan int) - for p := 0; p < P; p++ { - p := p // pin p, so it is stable inside the goroutine. - - go func() { // Launch goroutine 'p' - defer func() { // Ensure each require.XX failure is visible on hammer test fail. - if recovered := recover(); recovered != nil { - t.Error(recovered.(string)) - } - c <- 1 - }() - - for n := 0; n < N; n++ { // Invoke one test - test(p, n) - } - }() - } - - // Block until P goroutines finish. - for i := 0; i < P; i++ { - <-c - } -} diff --git a/tests/engine/hammer_test.go b/tests/engine/hammer_test.go index 483d3174..f67a1e2e 100644 --- a/tests/engine/hammer_test.go +++ b/tests/engine/hammer_test.go @@ -3,6 +3,7 @@ package adhoc import ( "errors" "runtime" + "sync" "sync/atomic" "testing" @@ -30,8 +31,9 @@ func TestEngineInterpreter_hammer(t *testing.T) { } type blocker struct { - running chan bool - unblocked chan bool + running chan bool + // unblocked uses a WaitGroup as it allows releasing all goroutines at the same time. + unblocked sync.WaitGroup // closed should panic if fn is called when the value is 1. // // Note: Exclusively reading and updating this with atomics guarantees cross-goroutine observations. @@ -48,8 +50,8 @@ func (b *blocker) fn(input uint32) uint32 { if atomic.LoadUint32(&b.closed) == 1 { panic(errors.New("closed")) } - b.running <- true // Signal the goroutine is running - <-b.unblocked // Await until unblocked + b.running <- true // Signal the goroutine is running + b.unblocked.Wait() // Await until unblocked return input } @@ -75,8 +77,8 @@ func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores. running := make(chan bool) - unblocked := make(chan bool) - b := &blocker{running: running, unblocked: unblocked} + b := &blocker{running: running} + b.unblocked.Add(P) imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate() require.NoError(t, err) @@ -111,6 +113,7 @@ func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) { defer importing.Close() // If unloading worked properly, a new function call should route to the newly instantiated module. + b.unblocked.Add(1) go func() { defer completeGoroutine(t, done) @@ -120,8 +123,10 @@ func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) { P++ // Unblock the functions to ensure they don't err on the return path of a closed module. + b.unblocked.Add(-P) + + // Wait for all goroutines to complete for i := 0; i < P; i++ { - unblocked <- true <-done } } @@ -137,8 +142,8 @@ func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores. running := make(chan bool) - unblocked := make(chan bool) - b := &blocker{running: running, unblocked: unblocked} + b := &blocker{running: running} + b.unblocked.Add(P) imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate() require.NoError(t, err) @@ -175,7 +180,7 @@ func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) { require.NoError(t, importing.Close()) // Prove a host module can be redefined even with in-flight calls. - b1 := &blocker{running: running, unblocked: unblocked} // New instance, so not yet closed! + b1 := &blocker{running: running} // New instance, so not yet closed! imported, err = r.NewModuleBuilder("host").ExportFunction("block", b1.fn).Instantiate() require.NoError(t, err) defer imported.Close() @@ -186,17 +191,20 @@ func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) { defer importing.Close() // If unloading worked properly, a new function call should route to the newly instantiated module. + b1.unblocked.Add(1) go func() { defer completeGoroutine(t, done) requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp) }() <-running // Wait for the above function to be in-flight - P++ // Unblock the functions to ensure they don't err on the return path of a closed module. - for i := 0; i < P; i++ { - unblocked <- true + b.unblocked.Add(-P) + b1.unblocked.Add(-1) + + // Wait for all goroutines to complete + for i := 0; i < P+1; i++ { <-done } }