threads: implement threads proposal in interpreter (#1915)
Signed-off-by: Anuraag Agrawal <anuraaga@gmail.com>
This commit is contained in:
@@ -3897,6 +3897,361 @@ func (ce *callEngine) callNativeFunc(ctx context.Context, m *wasm.ModuleInstance
|
||||
ce.pushValue(retLo)
|
||||
ce.pushValue(retHi)
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicMemoryWait:
|
||||
timeout := int64(ce.popValue())
|
||||
exp := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
// Runtime instead of validation error because the spec intends to allow binaries to include
|
||||
// such instructions as long as they are not executed.
|
||||
if !memoryInst.Shared {
|
||||
panic(wasmruntime.ErrRuntimeExpectedSharedMemory)
|
||||
}
|
||||
|
||||
switch wazeroir.UnsignedType(op.B1) {
|
||||
case wazeroir.UnsignedTypeI32:
|
||||
if offset%4 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
if int(offset) > len(memoryInst.Buffer)-4 {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
ce.pushValue(memoryInst.Wait32(offset, uint32(exp), timeout, func(mem *wasm.MemoryInstance, offset uint32) uint32 {
|
||||
mem.Mux.Lock()
|
||||
defer mem.Mux.Unlock()
|
||||
value, _ := mem.ReadUint32Le(offset)
|
||||
return value
|
||||
}))
|
||||
case wazeroir.UnsignedTypeI64:
|
||||
if offset%8 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
if int(offset) > len(memoryInst.Buffer)-8 {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
ce.pushValue(memoryInst.Wait64(offset, exp, timeout, func(mem *wasm.MemoryInstance, offset uint32) uint64 {
|
||||
mem.Mux.Lock()
|
||||
defer mem.Mux.Unlock()
|
||||
value, _ := mem.ReadUint64Le(offset)
|
||||
return value
|
||||
}))
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicMemoryNotify:
|
||||
count := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
if offset%4 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
// Just a bounds check
|
||||
if offset >= memoryInst.Size() {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
res := memoryInst.Notify(offset, uint32(count))
|
||||
ce.pushValue(uint64(res))
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicFence:
|
||||
// Memory not required for fence only
|
||||
if memoryInst != nil {
|
||||
// An empty critical section can be used as a synchronization primitive, which is what
|
||||
// fence is. Probably, there are no spectests or defined behavior to confirm this yet.
|
||||
memoryInst.Mux.Lock()
|
||||
memoryInst.Mux.Unlock() //nolint:staticcheck
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicLoad:
|
||||
offset := ce.popMemoryOffset(op)
|
||||
switch wazeroir.UnsignedType(op.B1) {
|
||||
case wazeroir.UnsignedTypeI32:
|
||||
if offset%4 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
val, ok := memoryInst.ReadUint32Le(offset)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
ce.pushValue(uint64(val))
|
||||
case wazeroir.UnsignedTypeI64:
|
||||
if offset%8 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
val, ok := memoryInst.ReadUint64Le(offset)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
ce.pushValue(val)
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicLoad8:
|
||||
offset := ce.popMemoryOffset(op)
|
||||
memoryInst.Mux.Lock()
|
||||
val, ok := memoryInst.ReadByte(offset)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
ce.pushValue(uint64(val))
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicLoad16:
|
||||
offset := ce.popMemoryOffset(op)
|
||||
if offset%2 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
val, ok := memoryInst.ReadUint16Le(offset)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
ce.pushValue(uint64(val))
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicStore:
|
||||
val := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
switch wazeroir.UnsignedType(op.B1) {
|
||||
case wazeroir.UnsignedTypeI32:
|
||||
if offset%4 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
ok := memoryInst.WriteUint32Le(offset, uint32(val))
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
case wazeroir.UnsignedTypeI64:
|
||||
if offset%8 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
ok := memoryInst.WriteUint64Le(offset, val)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicStore8:
|
||||
val := byte(ce.popValue())
|
||||
offset := ce.popMemoryOffset(op)
|
||||
memoryInst.Mux.Lock()
|
||||
ok := memoryInst.WriteByte(offset, val)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicStore16:
|
||||
val := uint16(ce.popValue())
|
||||
offset := ce.popMemoryOffset(op)
|
||||
if offset%2 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
ok := memoryInst.WriteUint16Le(offset, val)
|
||||
memoryInst.Mux.Unlock()
|
||||
if !ok {
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicRMW:
|
||||
val := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
switch wazeroir.UnsignedType(op.B1) {
|
||||
case wazeroir.UnsignedTypeI32:
|
||||
if offset%4 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadUint32Le(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
var newVal uint32
|
||||
switch wazeroir.AtomicArithmeticOp(op.B2) {
|
||||
case wazeroir.AtomicArithmeticOpAdd:
|
||||
newVal = old + uint32(val)
|
||||
case wazeroir.AtomicArithmeticOpSub:
|
||||
newVal = old - uint32(val)
|
||||
case wazeroir.AtomicArithmeticOpAnd:
|
||||
newVal = old & uint32(val)
|
||||
case wazeroir.AtomicArithmeticOpOr:
|
||||
newVal = old | uint32(val)
|
||||
case wazeroir.AtomicArithmeticOpXor:
|
||||
newVal = old ^ uint32(val)
|
||||
case wazeroir.AtomicArithmeticOpNop:
|
||||
newVal = uint32(val)
|
||||
}
|
||||
memoryInst.WriteUint32Le(offset, newVal)
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(uint64(old))
|
||||
case wazeroir.UnsignedTypeI64:
|
||||
if offset%8 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadUint64Le(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
var newVal uint64
|
||||
switch wazeroir.AtomicArithmeticOp(op.B2) {
|
||||
case wazeroir.AtomicArithmeticOpAdd:
|
||||
newVal = old + val
|
||||
case wazeroir.AtomicArithmeticOpSub:
|
||||
newVal = old - val
|
||||
case wazeroir.AtomicArithmeticOpAnd:
|
||||
newVal = old & val
|
||||
case wazeroir.AtomicArithmeticOpOr:
|
||||
newVal = old | val
|
||||
case wazeroir.AtomicArithmeticOpXor:
|
||||
newVal = old ^ val
|
||||
case wazeroir.AtomicArithmeticOpNop:
|
||||
newVal = val
|
||||
}
|
||||
memoryInst.WriteUint64Le(offset, newVal)
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(old)
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicRMW8:
|
||||
val := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadByte(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
arg := byte(val)
|
||||
var newVal byte
|
||||
switch wazeroir.AtomicArithmeticOp(op.B2) {
|
||||
case wazeroir.AtomicArithmeticOpAdd:
|
||||
newVal = old + arg
|
||||
case wazeroir.AtomicArithmeticOpSub:
|
||||
newVal = old - arg
|
||||
case wazeroir.AtomicArithmeticOpAnd:
|
||||
newVal = old & arg
|
||||
case wazeroir.AtomicArithmeticOpOr:
|
||||
newVal = old | arg
|
||||
case wazeroir.AtomicArithmeticOpXor:
|
||||
newVal = old ^ arg
|
||||
case wazeroir.AtomicArithmeticOpNop:
|
||||
newVal = arg
|
||||
}
|
||||
memoryInst.WriteByte(offset, newVal)
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(uint64(old))
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicRMW16:
|
||||
val := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
if offset%2 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadUint16Le(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
arg := uint16(val)
|
||||
var newVal uint16
|
||||
switch wazeroir.AtomicArithmeticOp(op.B2) {
|
||||
case wazeroir.AtomicArithmeticOpAdd:
|
||||
newVal = old + arg
|
||||
case wazeroir.AtomicArithmeticOpSub:
|
||||
newVal = old - arg
|
||||
case wazeroir.AtomicArithmeticOpAnd:
|
||||
newVal = old & arg
|
||||
case wazeroir.AtomicArithmeticOpOr:
|
||||
newVal = old | arg
|
||||
case wazeroir.AtomicArithmeticOpXor:
|
||||
newVal = old ^ arg
|
||||
case wazeroir.AtomicArithmeticOpNop:
|
||||
newVal = arg
|
||||
}
|
||||
memoryInst.WriteUint16Le(offset, newVal)
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(uint64(old))
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicRMWCmpxchg:
|
||||
rep := ce.popValue()
|
||||
exp := ce.popValue()
|
||||
offset := ce.popMemoryOffset(op)
|
||||
switch wazeroir.UnsignedType(op.B1) {
|
||||
case wazeroir.UnsignedTypeI32:
|
||||
if offset%4 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadUint32Le(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
if old == uint32(exp) {
|
||||
memoryInst.WriteUint32Le(offset, uint32(rep))
|
||||
}
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(uint64(old))
|
||||
case wazeroir.UnsignedTypeI64:
|
||||
if offset%8 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadUint64Le(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
if old == exp {
|
||||
memoryInst.WriteUint64Le(offset, rep)
|
||||
}
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(old)
|
||||
}
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicRMW8Cmpxchg:
|
||||
rep := byte(ce.popValue())
|
||||
exp := byte(ce.popValue())
|
||||
offset := ce.popMemoryOffset(op)
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadByte(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
if old == exp {
|
||||
memoryInst.WriteByte(offset, rep)
|
||||
}
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(uint64(old))
|
||||
frame.pc++
|
||||
case wazeroir.OperationKindAtomicRMW16Cmpxchg:
|
||||
rep := uint16(ce.popValue())
|
||||
exp := uint16(ce.popValue())
|
||||
offset := ce.popMemoryOffset(op)
|
||||
if offset%2 != 0 {
|
||||
panic(wasmruntime.ErrRuntimeUnalignedAtomic)
|
||||
}
|
||||
memoryInst.Mux.Lock()
|
||||
old, ok := memoryInst.ReadUint16Le(offset)
|
||||
if !ok {
|
||||
memoryInst.Mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess)
|
||||
}
|
||||
if old == exp {
|
||||
memoryInst.WriteUint16Le(offset, rep)
|
||||
}
|
||||
memoryInst.Mux.Unlock()
|
||||
ce.pushValue(uint64(old))
|
||||
frame.pc++
|
||||
default:
|
||||
frame.pc++
|
||||
}
|
||||
|
||||
@@ -287,6 +287,8 @@ func (c command) expectedError() (err error) {
|
||||
panic("unreachable")
|
||||
}
|
||||
switch c.Text {
|
||||
case "expected shared memory":
|
||||
err = wasmruntime.ErrRuntimeExpectedSharedMemory
|
||||
case "out of bounds memory access":
|
||||
err = wasmruntime.ErrRuntimeOutOfBoundsMemoryAccess
|
||||
case "indirect call type mismatch", "indirect call":
|
||||
@@ -299,6 +301,8 @@ func (c command) expectedError() (err error) {
|
||||
err = wasmruntime.ErrRuntimeInvalidConversionToInteger
|
||||
case "integer divide by zero":
|
||||
err = wasmruntime.ErrRuntimeIntegerDivideByZero
|
||||
case "unaligned atomic":
|
||||
err = wasmruntime.ErrRuntimeUnalignedAtomic
|
||||
case "unreachable":
|
||||
err = wasmruntime.ErrRuntimeUnreachable
|
||||
default:
|
||||
@@ -336,7 +340,7 @@ func Run(t *testing.T, testDataFS embed.FS, ctx context.Context, config wazero.R
|
||||
|
||||
// If the go:embed path resolution was wrong, this fails.
|
||||
// https://github.com/tetratelabs/wazero/issues/247
|
||||
require.True(t, len(caseNames) > 1, "len(caseNames)=%d (not greater than one)", len(caseNames))
|
||||
require.True(t, len(caseNames) > 0, "len(caseNames)=%d (not greater than zero)", len(caseNames))
|
||||
|
||||
for _, f := range caseNames {
|
||||
RunCase(t, testDataFS, f, ctx, config, -1, 0, math.MaxInt)
|
||||
|
||||
@@ -26,6 +26,5 @@ func TestCompiler(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestInterpreter(t *testing.T) {
|
||||
t.Skip() // TODO: Delete after implementing interpreter support
|
||||
spectest.Run(t, testcases, context.Background(), wazero.NewRuntimeConfigInterpreter().WithCoreFeatures(enabledFeatures))
|
||||
}
|
||||
|
||||
@@ -1,14 +1,18 @@
|
||||
package wasm
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/tetratelabs/wazero/api"
|
||||
"github.com/tetratelabs/wazero/internal/internalapi"
|
||||
"github.com/tetratelabs/wazero/internal/wasmruntime"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -26,6 +30,11 @@ const (
|
||||
// compile-time check to ensure MemoryInstance implements api.Memory
|
||||
var _ api.Memory = &MemoryInstance{}
|
||||
|
||||
type waiters struct {
|
||||
mux sync.Mutex
|
||||
l *list.List
|
||||
}
|
||||
|
||||
// MemoryInstance represents a memory instance in a store, and implements api.Memory.
|
||||
//
|
||||
// Note: In WebAssembly 1.0 (20191205), there may be up to one Memory per store, which means the precise memory is always
|
||||
@@ -39,6 +48,14 @@ type MemoryInstance struct {
|
||||
Shared bool
|
||||
// definition is known at compile time.
|
||||
definition api.MemoryDefinition
|
||||
|
||||
// Mux is used in interpreter mode to prevent overlapping calls to atomic instructions,
|
||||
// introduced with WebAssembly threads proposal.
|
||||
Mux sync.Mutex
|
||||
|
||||
// waiters implements atomic wait and notify. It is implemented similarly to golang.org/x/sync/semaphore,
|
||||
// with a fixed weight of 1 and no spurious notifications.
|
||||
waiters sync.Map
|
||||
}
|
||||
|
||||
// NewMemoryInstance creates a new instance based on the parameters in the SectionIDMemory.
|
||||
@@ -279,3 +296,103 @@ func (m *MemoryInstance) writeUint64Le(offset uint32, v uint64) bool {
|
||||
binary.LittleEndian.PutUint64(m.Buffer[offset:], v)
|
||||
return true
|
||||
}
|
||||
|
||||
// Wait32 suspends the caller until the offset is notified by a different agent.
|
||||
func (m *MemoryInstance) Wait32(offset uint32, exp uint32, timeout int64, reader func(mem *MemoryInstance, offset uint32) uint32) uint64 {
|
||||
w := m.getWaiters(offset)
|
||||
w.mux.Lock()
|
||||
|
||||
cur := reader(m, offset)
|
||||
if cur != exp {
|
||||
w.mux.Unlock()
|
||||
return 1
|
||||
}
|
||||
|
||||
return m.wait(w, timeout)
|
||||
}
|
||||
|
||||
// Wait64 suspends the caller until the offset is notified by a different agent.
|
||||
func (m *MemoryInstance) Wait64(offset uint32, exp uint64, timeout int64, reader func(mem *MemoryInstance, offset uint32) uint64) uint64 {
|
||||
w := m.getWaiters(offset)
|
||||
w.mux.Lock()
|
||||
|
||||
cur := reader(m, offset)
|
||||
if cur != exp {
|
||||
w.mux.Unlock()
|
||||
return 1
|
||||
}
|
||||
|
||||
return m.wait(w, timeout)
|
||||
}
|
||||
|
||||
func (m *MemoryInstance) wait(w *waiters, timeout int64) uint64 {
|
||||
if w.l == nil {
|
||||
w.l = list.New()
|
||||
}
|
||||
|
||||
// The specification requires a trap if the number of existing waiters + 1 == 2^32, so we add a check here.
|
||||
// In practice, it is unlikely the application would ever accumulate such a large number of waiters as it
|
||||
// indicates several GB of RAM used just for the list of waiters.
|
||||
// https://github.com/WebAssembly/threads/blob/main/proposals/threads/Overview.md#wait
|
||||
if uint64(w.l.Len()+1) == 1<<32 {
|
||||
w.mux.Unlock()
|
||||
panic(wasmruntime.ErrRuntimeTooManyWaiters)
|
||||
}
|
||||
|
||||
ready := make(chan struct{})
|
||||
elem := w.l.PushBack(ready)
|
||||
w.mux.Unlock()
|
||||
|
||||
if timeout < 0 {
|
||||
<-ready
|
||||
return 0
|
||||
} else {
|
||||
select {
|
||||
case <-ready:
|
||||
return 0
|
||||
case <-time.After(time.Duration(timeout)):
|
||||
// While we could see if the channel completed by now and ignore the timeout, similar to x/sync/semaphore,
|
||||
// the Wasm spec doesn't specify this behavior, so we keep things simple by prioritizing the timeout.
|
||||
w.mux.Lock()
|
||||
w.l.Remove(elem)
|
||||
w.mux.Unlock()
|
||||
return 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MemoryInstance) getWaiters(offset uint32) *waiters {
|
||||
wAny, ok := m.waiters.Load(offset)
|
||||
if !ok {
|
||||
// The first time an address is waited on, simultaneous waits will cause extra allocations.
|
||||
// Further operations will be loaded above, which is also the general pattern of usage with
|
||||
// mutexes.
|
||||
wAny, _ = m.waiters.LoadOrStore(offset, &waiters{})
|
||||
}
|
||||
|
||||
return wAny.(*waiters)
|
||||
}
|
||||
|
||||
// Notify wakes up at most count waiters at the given offset.
|
||||
func (m *MemoryInstance) Notify(offset uint32, count uint32) uint32 {
|
||||
wAny, ok := m.waiters.Load(offset)
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
w := wAny.(*waiters)
|
||||
|
||||
w.mux.Lock()
|
||||
defer w.mux.Unlock()
|
||||
if w.l == nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
res := uint32(0)
|
||||
for num := w.l.Len(); num > 0 && res < count; num = w.l.Len() {
|
||||
w := w.l.Remove(w.l.Front()).(chan struct{})
|
||||
close(w)
|
||||
res++
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/tetratelabs/wazero/api"
|
||||
@@ -797,3 +798,143 @@ func BenchmarkWriteString(b *testing.B) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMemoryInstance_WaitNotifyOnce(t *testing.T) {
|
||||
reader := func(mem *MemoryInstance, offset uint32) uint32 {
|
||||
val, _ := mem.ReadUint32Le(offset)
|
||||
return val
|
||||
}
|
||||
t.Run("no waiters", func(t *testing.T) {
|
||||
mem := &MemoryInstance{Buffer: []byte{0, 0, 0, 0, 16, 0, 0, 0}, Min: 1, Shared: true}
|
||||
|
||||
notifyWaiters(t, mem, 0, 1, 0)
|
||||
})
|
||||
|
||||
t.Run("single wait, notify", func(t *testing.T) {
|
||||
mem := &MemoryInstance{Buffer: []byte{0, 0, 0, 0, 16, 0, 0, 0}, Min: 1, Shared: true}
|
||||
|
||||
ch := make(chan string)
|
||||
// Reuse same offset 3 times to verify reuse
|
||||
for i := 0; i < 3; i++ {
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
|
||||
requireChannelEmpty(t, ch)
|
||||
notifyWaiters(t, mem, 0, 1, 1)
|
||||
require.Equal(t, "", <-ch)
|
||||
|
||||
notifyWaiters(t, mem, 0, 1, 0)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("multiple waiters, notify all", func(t *testing.T) {
|
||||
mem := &MemoryInstance{Buffer: []byte{0, 0, 0, 0, 16, 0, 0, 0}, Min: 1, Shared: true}
|
||||
|
||||
ch := make(chan string)
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
|
||||
requireChannelEmpty(t, ch)
|
||||
|
||||
notifyWaiters(t, mem, 0, 2, 2)
|
||||
require.Equal(t, "", <-ch)
|
||||
require.Equal(t, "", <-ch)
|
||||
})
|
||||
|
||||
t.Run("multiple waiters, notify one", func(t *testing.T) {
|
||||
mem := &MemoryInstance{Buffer: []byte{0, 0, 0, 0, 16, 0, 0, 0}, Min: 1, Shared: true}
|
||||
|
||||
ch := make(chan string)
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
|
||||
requireChannelEmpty(t, ch)
|
||||
notifyWaiters(t, mem, 0, 1, 1)
|
||||
require.Equal(t, "", <-ch)
|
||||
requireChannelEmpty(t, ch)
|
||||
notifyWaiters(t, mem, 0, 1, 1)
|
||||
require.Equal(t, "", <-ch)
|
||||
})
|
||||
|
||||
t.Run("multiple offsets", func(t *testing.T) {
|
||||
mem := &MemoryInstance{Buffer: []byte{0, 0, 0, 0, 16, 0, 0, 0}, Min: 1, Shared: true}
|
||||
|
||||
ch := make(chan string)
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
go func() {
|
||||
res := mem.Wait32(1, 268435456, -1, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
|
||||
requireChannelEmpty(t, ch)
|
||||
notifyWaiters(t, mem, 0, 2, 1)
|
||||
require.Equal(t, "", <-ch)
|
||||
requireChannelEmpty(t, ch)
|
||||
notifyWaiters(t, mem, 1, 2, 1)
|
||||
require.Equal(t, "", <-ch)
|
||||
})
|
||||
|
||||
t.Run("timeout", func(t *testing.T) {
|
||||
mem := &MemoryInstance{Buffer: []byte{0, 0, 0, 0, 16, 0, 0, 0}, Min: 1, Shared: true}
|
||||
|
||||
ch := make(chan string)
|
||||
go func() {
|
||||
res := mem.Wait32(0, 0, 10 /* ns */, reader)
|
||||
propagateWaitResult(t, ch, res)
|
||||
}()
|
||||
|
||||
require.Equal(t, "timeout", <-ch)
|
||||
})
|
||||
}
|
||||
|
||||
func notifyWaiters(t *testing.T, mem *MemoryInstance, offset, count, exp int) {
|
||||
t.Helper()
|
||||
cur := 0
|
||||
tries := 0
|
||||
for cur < exp {
|
||||
if tries > 100 {
|
||||
t.Fatal("too many tries waiting for wait and notify to converge")
|
||||
}
|
||||
n := mem.Notify(uint32(offset), uint32(count))
|
||||
cur += int(n)
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
tries++
|
||||
}
|
||||
}
|
||||
|
||||
func propagateWaitResult(t *testing.T, ch chan string, res uint64) {
|
||||
t.Helper()
|
||||
switch res {
|
||||
case 2:
|
||||
ch <- "timeout"
|
||||
default:
|
||||
ch <- ""
|
||||
}
|
||||
}
|
||||
|
||||
func requireChannelEmpty(t *testing.T, ch chan string) {
|
||||
t.Helper()
|
||||
select {
|
||||
case <-ch:
|
||||
t.Fatal("channel should be empty")
|
||||
default:
|
||||
// fallthrough
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,12 @@ var (
|
||||
ErrRuntimeInvalidTableAccess = New("invalid table access")
|
||||
// ErrRuntimeIndirectCallTypeMismatch indicates that the type check failed during call_indirect.
|
||||
ErrRuntimeIndirectCallTypeMismatch = New("indirect call type mismatch")
|
||||
// ErrRuntimeUnalignedAtomic indicates that an atomic operation was made with incorrect memory alignment.
|
||||
ErrRuntimeUnalignedAtomic = New("unaligned atomic")
|
||||
// ErrRuntimeExpectedSharedMemory indicates that an operation was made against unshared memory when not allowed.
|
||||
ErrRuntimeExpectedSharedMemory = New("expected shared memory")
|
||||
// ErrRuntimeTooManyWaiters indicates that atomic.wait was called with too many waiters.
|
||||
ErrRuntimeTooManyWaiters = New("too many waiters")
|
||||
)
|
||||
|
||||
// Error is returned by a wasm.Engine during the execution of Wasm functions, and they indicate that the Wasm runtime
|
||||
|
||||
Reference in New Issue
Block a user