Extracts hammer.Hammer and adds atomic release of goroutines (#416)

This extracts `hammer.Hammer` as a utility for re-use, notably adding a
feature that ensures all tests run concurrently. Before, tests start in
a loop that could be delayed due to goroutine sheduling.

Signed-off-by: Adrian Cole <adrian@tetrate.io>
This commit is contained in:
Crypt Keeper
2022-03-30 10:18:16 +08:00
committed by GitHub
parent 146615d94b
commit 1e20fe3bfe
4 changed files with 147 additions and 94 deletions

View File

@@ -211,49 +211,19 @@ Here is an annotated description of the key pieces of a hammer test:
* When in doubt, try 1000 or 100 if `testing.Short` * When in doubt, try 1000 or 100 if `testing.Short`
* Remember, there are multiple hammer tests and CI nodes are slow. Slower tests hurt feedback loops. * Remember, there are multiple hammer tests and CI nodes are slow. Slower tests hurt feedback loops.
3. `defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2))` makes goroutines switch cores, testing visibility of shared data. 3. `defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2))` makes goroutines switch cores, testing visibility of shared data.
4. Track goroutines progress via `c := make(chan int)` where each goroutine in `P` defers `c <- 1`. 4. To ensure goroutines execute at the same time, block them with `sync.WaitGroup`, initialized to `Add(P)`.
1. Tests use `require.XXX`, so `recover()` into `t.Fail` in a `defer` function before `c <- 1`. * `sync.WaitGroup` internally uses `runtime_Semacquire` not available in any other library.
* `sync.WaitGroup.Add` with a negative value can unblock many goroutines at the same time, e.g. without a for loop.
5. Track goroutines progress via `finished := make(chan int)` where each goroutine in `P` defers `finished <- 1`.
1. Tests use `require.XXX`, so `recover()` into `t.Fail` in a `defer` function before `finished <- 1`.
* This makes it easier to spot larger concurrency problems as you see each failure, not just the first. * This makes it easier to spot larger concurrency problems as you see each failure, not just the first.
2. After the `defer` function run the stateful function `N` times in a normal loop. 2. After the `defer` function, await unblocked, then run the stateful function `N` times in a normal loop.
* This loop should trigger shared state problems as locks or atomics are contended by `P` goroutines. * This loop should trigger shared state problems as locks or atomics are contended by `P` goroutines.
5. After all `P` goroutines launch, block the runner by blocking on the channel (`<-c`) for each `P`. 6. After all `P` goroutines launch, atomically release all of them with `WaitGroup.Add(-P)`.
6. When all goroutines complete, `return` if `t.Failed()`, otherwise perform follow-up state checks. 7. Block the runner on goroutine completion, by (`<-finished`) for each `P`.
8. When all goroutines complete, `return` if `t.Failed()`, otherwise perform follow-up state checks.
Here's an example: This is implemented in wazero in [hammer.go](internal/testing/hammer/hammer.go)
```go
P := 8 // 1. count of goroutines
N := 1000 // 2. count of work per goroutine
if testing.Short() {
P = 4 // 1. count of goroutines if `-test.short`
N = 100 // 2. count of work per goroutine if `-test.short`
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P/2)) // 3. Ensure goroutines switch cores
c := make(chan int) // 4. tracking channel
for p := 0; p < P; p++ {
p := p // pin p, so it is stable inside the goroutine.
go func() { // Launch goroutine 'p'
defer func() {
if recovered := recover(); recovered != nil {
t.Error(recovered.(string)) // 4.1. Accumulate error instead of failing on first.
}
c <- 1 // 4.1 count down regardless of error
}()
for n := 0; n < N; n++ { // 4.2 loop the test N times
test(p, n)
}
}()
}
for i := 0; i < P; i++ { // 5. Block until all goroutines finish
<-c
}
if t.Failed() { // 6. Return early if there are concurrency errors.
return
}
```
### Lock-free, cross-goroutine observations of updates ### Lock-free, cross-goroutine observations of updates

View File

@@ -0,0 +1,105 @@
package hammer
import (
"fmt"
"runtime"
"sync"
"testing"
)
// Hammer invokes a test concurrently in P goroutines N times per goroutine.
//
// Ex.
// P := 8 // max count of goroutines
// N := 1000 // work per goroutine
// if testing.Short() { // Adjust down if `-test.short`
// P = 4
// N = 100
// }
//
// hammer.NewHammer(t, P, N).Run(func(name string) {
// // Do test using name if something needs to be unique.
// }, nil)
//
// if t.Failed() {
// return // At least one test failed, so return now.
// }
//
// See /RATIONALE.md
type Hammer interface {
// Run invokes a concurrency test, as described in /RATIONALE.md.
//
// * test is concurrently run in P goroutines, each looping N times.
// * name is unique within the hammer.
// * onRunning is any function to run after all goroutines are running, but before test executes.
//
// On completion, return early if there's a failure like this:
// if t.Failed() {
// return
// }
Run(test func(name string), onRunning func())
}
// NewHammer returns a Hammer initialized to indicated count of goroutines (P) and iterations per goroutine (N).
// As discussed in /RATIONALE.md, optimize for Hammer.Run completing in .1 second on a modern laptop.
func NewHammer(t *testing.T, P, N int) Hammer {
return &hammer{t: t, P: P, N: N}
}
// hammer implements Hammer
type hammer struct {
// t is the calling test
t *testing.T
// P is the max count of goroutines
P int
// N is the work per goroutine
N int
}
// Run implements Hammer.Run
func (h *hammer) Run(test func(name string), onRunning func()) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(h.P / 2)) // Ensure goroutines have to switch cores.
// running track
running := make(chan int)
// unblock needs to happen atomically, so we need to use a WaitGroup
var unblocked sync.WaitGroup
finished := make(chan int)
unblocked.Add(h.P) // P goroutines will be unblocked by the current goroutine.
for p := 0; p < h.P; p++ {
p := p // pin p, so it is stable inside the goroutine.
go func() { // Launch goroutine 'p'
defer func() { // Ensure each require.XX failure is visible on hammer test fail.
if recovered := recover(); recovered != nil {
h.t.Error(recovered.(string))
}
finished <- 1
}()
running <- 1
unblocked.Wait() // Wait to be unblocked
for n := 0; n < h.N; n++ { // Invoke one test
test(fmt.Sprintf("%s:%d-%d", h.t.Name(), p, n))
}
}()
}
// Block until P goroutines are running.
for i := 0; i < h.P; i++ {
<-running
}
if onRunning != nil {
onRunning()
}
// Release all goroutines at the same time.
unblocked.Add(-h.P)
// Block until P goroutines finish.
for i := 0; i < h.P; i++ {
<-finished
}
}

View File

@@ -6,12 +6,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"runtime"
"strconv" "strconv"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/tetratelabs/wazero/internal/testing/hammer"
"github.com/tetratelabs/wazero/wasm" "github.com/tetratelabs/wazero/wasm"
) )
@@ -201,12 +201,17 @@ func TestStore_hammer(t *testing.T) {
// Concurrent instantiate, close should test if locks work on the store. If they don't, we should see leaked modules // Concurrent instantiate, close should test if locks work on the store. If they don't, we should see leaked modules
// after all of these complete, or an error raised. // after all of these complete, or an error raised.
hammer(t, func(p, n int) { P := 8 // max count of goroutines
moduleName := fmt.Sprintf("%s:%d-%d", t.Name(), p, n) N := 1000 // work per goroutine
_, instantiateErr := s.Instantiate(context.Background(), importingModule, moduleName, DefaultSysContext()) if testing.Short() { // Adjust down if `-test.short`
P = 4
N = 100
}
hammer.NewHammer(t, P, N).Run(func(name string) {
_, instantiateErr := s.Instantiate(context.Background(), importingModule, name, DefaultSysContext())
require.NoError(t, instantiateErr) require.NoError(t, instantiateErr)
require.NoError(t, s.CloseModule(moduleName)) require.NoError(t, s.CloseModule(name))
}) }, nil)
if t.Failed() { if t.Failed() {
return // At least one test failed, so return now. return // At least one test failed, so return now.
} }
@@ -739,38 +744,3 @@ func TestModuleInstance_applyData(t *testing.T) {
}) })
require.Equal(t, []byte{0xa, 0xf, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5}, m.Memory.Buffer) require.Equal(t, []byte{0xa, 0xf, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5}, m.Memory.Buffer)
} }
// hammer is a concurrency test described in /RATIONALE.md.
func hammer(t *testing.T, test func(p, n int)) {
P := 8 // max count of goroutines
N := 1000 // work per goroutine
if testing.Short() { // Adjust down if `-test.short`
P = 4
N = 100
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores.
// Add channel that tracks P goroutines.
c := make(chan int)
for p := 0; p < P; p++ {
p := p // pin p, so it is stable inside the goroutine.
go func() { // Launch goroutine 'p'
defer func() { // Ensure each require.XX failure is visible on hammer test fail.
if recovered := recover(); recovered != nil {
t.Error(recovered.(string))
}
c <- 1
}()
for n := 0; n < N; n++ { // Invoke one test
test(p, n)
}
}()
}
// Block until P goroutines finish.
for i := 0; i < P; i++ {
<-c
}
}

View File

@@ -3,6 +3,7 @@ package adhoc
import ( import (
"errors" "errors"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@@ -30,8 +31,9 @@ func TestEngineInterpreter_hammer(t *testing.T) {
} }
type blocker struct { type blocker struct {
running chan bool running chan bool
unblocked chan bool // unblocked uses a WaitGroup as it allows releasing all goroutines at the same time.
unblocked sync.WaitGroup
// closed should panic if fn is called when the value is 1. // closed should panic if fn is called when the value is 1.
// //
// Note: Exclusively reading and updating this with atomics guarantees cross-goroutine observations. // Note: Exclusively reading and updating this with atomics guarantees cross-goroutine observations.
@@ -48,8 +50,8 @@ func (b *blocker) fn(input uint32) uint32 {
if atomic.LoadUint32(&b.closed) == 1 { if atomic.LoadUint32(&b.closed) == 1 {
panic(errors.New("closed")) panic(errors.New("closed"))
} }
b.running <- true // Signal the goroutine is running b.running <- true // Signal the goroutine is running
<-b.unblocked // Await until unblocked b.unblocked.Wait() // Await until unblocked
return input return input
} }
@@ -75,8 +77,8 @@ func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores.
running := make(chan bool) running := make(chan bool)
unblocked := make(chan bool) b := &blocker{running: running}
b := &blocker{running: running, unblocked: unblocked} b.unblocked.Add(P)
imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate() imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate()
require.NoError(t, err) require.NoError(t, err)
@@ -111,6 +113,7 @@ func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) {
defer importing.Close() defer importing.Close()
// If unloading worked properly, a new function call should route to the newly instantiated module. // If unloading worked properly, a new function call should route to the newly instantiated module.
b.unblocked.Add(1)
go func() { go func() {
defer completeGoroutine(t, done) defer completeGoroutine(t, done)
@@ -120,8 +123,10 @@ func closeImportingModuleWhileInUse(t *testing.T, r wazero.Runtime) {
P++ P++
// Unblock the functions to ensure they don't err on the return path of a closed module. // Unblock the functions to ensure they don't err on the return path of a closed module.
b.unblocked.Add(-P)
// Wait for all goroutines to complete
for i := 0; i < P; i++ { for i := 0; i < P; i++ {
unblocked <- true
<-done <-done
} }
} }
@@ -137,8 +142,8 @@ func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores. defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P / 2)) // Ensure goroutines have to switch cores.
running := make(chan bool) running := make(chan bool)
unblocked := make(chan bool) b := &blocker{running: running}
b := &blocker{running: running, unblocked: unblocked} b.unblocked.Add(P)
imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate() imported, err := r.NewModuleBuilder("host").ExportFunction("block", b.fn).Instantiate()
require.NoError(t, err) require.NoError(t, err)
@@ -175,7 +180,7 @@ func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) {
require.NoError(t, importing.Close()) require.NoError(t, importing.Close())
// Prove a host module can be redefined even with in-flight calls. // Prove a host module can be redefined even with in-flight calls.
b1 := &blocker{running: running, unblocked: unblocked} // New instance, so not yet closed! b1 := &blocker{running: running} // New instance, so not yet closed!
imported, err = r.NewModuleBuilder("host").ExportFunction("block", b1.fn).Instantiate() imported, err = r.NewModuleBuilder("host").ExportFunction("block", b1.fn).Instantiate()
require.NoError(t, err) require.NoError(t, err)
defer imported.Close() defer imported.Close()
@@ -186,17 +191,20 @@ func closeImportedModuleWhileInUse(t *testing.T, r wazero.Runtime) {
defer importing.Close() defer importing.Close()
// If unloading worked properly, a new function call should route to the newly instantiated module. // If unloading worked properly, a new function call should route to the newly instantiated module.
b1.unblocked.Add(1)
go func() { go func() {
defer completeGoroutine(t, done) defer completeGoroutine(t, done)
requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp) requireFunctionCall(t, importing.ExportedFunction("block_after_add"), args, exp)
}() }()
<-running // Wait for the above function to be in-flight <-running // Wait for the above function to be in-flight
P++
// Unblock the functions to ensure they don't err on the return path of a closed module. // Unblock the functions to ensure they don't err on the return path of a closed module.
for i := 0; i < P; i++ { b.unblocked.Add(-P)
unblocked <- true b1.unblocked.Add(-1)
// Wait for all goroutines to complete
for i := 0; i < P+1; i++ {
<-done <-done
} }
} }