wasi: add more test cases to poll_oneoff, cleanup impl (#1612)

Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
This commit is contained in:
Edoardo Vacchi
2023-08-05 15:22:15 +02:00
committed by GitHub
parent 90f58bce75
commit edb7bc2b10
5 changed files with 177 additions and 76 deletions

View File

@@ -46,7 +46,6 @@ type event struct {
eventType byte
userData []byte
errno wasip1.Errno
outOffset uint32
}
func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno {
@@ -90,16 +89,16 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
var blockingStdinSubs []*event
// The timeout is initialized at max Duration, the loop will find the minimum.
var timeout time.Duration = 1<<63 - 1
// Count of all the clock subscribers that have been already written back to outBuf.
clockEvents := uint32(0)
// Count of all the non-clock subscribers that have been already written back to outBuf.
readySubs := uint32(0)
// Count of all the subscriptions that have been already written back to outBuf.
// nevents*32 returns at all times the offset where the next event should be written:
// this way we ensure that there are no gaps between records.
nevents := uint32(0)
// Layout is subscription_u: Union
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#subscription_u
for i := uint32(0); i < nsubscriptions; i++ {
inOffset := i * 48
outOffset := i * 32
outOffset := nevents * 32
eventType := inBuf[inOffset+8] // +8 past userdata
// +8 past userdata +8 contents_offset
@@ -110,12 +109,10 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
eventType: eventType,
userData: userData,
errno: wasip1.ErrnoSuccess,
outOffset: outOffset,
}
switch eventType {
case wasip1.EventTypeClock: // handle later
clockEvents++
newTimeout, err := processClockEvent(argBuf)
if err != 0 {
return err
@@ -125,7 +122,8 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
timeout = newTimeout
}
// Ack the clock event to the outBuf.
writeEvent(outBuf, evt)
writeEvent(outBuf[outOffset:], evt)
nevents++
case wasip1.EventTypeFdRead:
fd := int32(le.Uint32(argBuf))
if fd < 0 {
@@ -133,16 +131,15 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
}
if file, ok := fsc.LookupFile(fd); !ok {
evt.errno = wasip1.ErrnoBadf
writeEvent(outBuf, evt)
readySubs++
continue
} else if fd == internalsys.FdStdin && !file.File.IsNonblock() {
// if the fd is Stdin, and it is in non-blocking mode,
writeEvent(outBuf[outOffset:], evt)
nevents++
} else if fd != internalsys.FdStdin && file.File.IsNonblock() {
writeEvent(outBuf[outOffset:], evt)
nevents++
} else {
// if the fd is Stdin, and it is in blocking mode,
// do not ack yet, append to a slice for delayed evaluation.
blockingStdinSubs = append(blockingStdinSubs, evt)
} else {
writeEvent(outBuf, evt)
readySubs++
}
case wasip1.EventTypeFdWrite:
fd := int32(le.Uint32(argBuf))
@@ -154,47 +151,46 @@ func pollOneoffFn(_ context.Context, mod api.Module, params []uint64) sys.Errno
} else {
evt.errno = wasip1.ErrnoBadf
}
readySubs++
writeEvent(outBuf, evt)
nevents++
writeEvent(outBuf[outOffset:], evt)
default:
return sys.EINVAL
}
}
// If there are subscribers with data ready, we have already written them to outBuf,
// and we don't need to wait for the timeout: clear it.
if readySubs != 0 {
timeout = 0
sysCtx := mod.(*wasm.ModuleInstance).Sys
if nevents == nsubscriptions {
// We already wrote back all the results. We already wrote this number
// earlier to offset `resultNevents`.
// We only need to observe the timeout (nonzero if there are clock subscriptions)
// and return.
if timeout > 0 {
sysCtx.Nanosleep(int64(timeout))
}
return 0
}
// If there are blocking stdin subscribers, check for data with given timeout.
if len(blockingStdinSubs) > 0 {
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
stdin, ok := fsc.LookupFile(internalsys.FdStdin)
if !ok {
return sys.EBADF
}
// Wait for the timeout to expire, or for some data to become available on Stdin.
if stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds())); errno != 0 {
return errno
} else if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf[nevents*32:], evt)
nevents++
}
// Wait for the timeout to expire, or for some data to become available on Stdin.
stdinReady, errno := stdin.File.Poll(sys.POLLIN, int32(timeout.Milliseconds()))
if errno != 0 {
return errno
}
if stdinReady {
// stdin has data ready to for reading, write back all the events
for i := range blockingStdinSubs {
readySubs++
evt := blockingStdinSubs[i]
evt.errno = 0
writeEvent(outBuf, evt)
}
}
} else {
// No subscribers, just wait for the given timeout.
sysCtx := mod.(*wasm.ModuleInstance).Sys
sysCtx.Nanosleep(int64(timeout))
}
if readySubs != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, readySubs+clockEvents) {
if nevents != nsubscriptions {
if !mod.Memory().WriteUint32Le(resultNevents, nevents) {
return sys.EFAULT
}
}
@@ -234,9 +230,9 @@ func processClockEvent(inBuf []byte) (time.Duration, sys.Errno) {
// writeEvent writes the event corresponding to the processed subscription.
// https://github.com/WebAssembly/WASI/blob/snapshot-01/phases/snapshot/docs.md#-event-struct
func writeEvent(outBuf []byte, evt *event) {
copy(outBuf[evt.outOffset:], evt.userData) // userdata
outBuf[evt.outOffset+8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[evt.outOffset+9] = 0
le.PutUint32(outBuf[evt.outOffset+10:], uint32(evt.eventType))
copy(outBuf, evt.userData) // userdata
outBuf[8] = byte(evt.errno) // uint16, but safe as < 255
outBuf[9] = 0
le.PutUint32(outBuf[10:], uint32(evt.eventType))
// TODO: When FD events are supported, write outOffset+16
}

View File

@@ -2,6 +2,7 @@ package wasi_snapshot_preview1_test
import (
"io/fs"
"os"
"strings"
"testing"
"time"
@@ -150,6 +151,12 @@ func Test_pollOneoff_Errors(t *testing.T) {
}
func Test_pollOneoff_Stdin(t *testing.T) {
w, r, err := os.Pipe()
require.NoError(t, err)
defer w.Close()
defer r.Close()
_, _ = w.Write([]byte("wazero"))
tests := []struct {
name string
in, out, nsubscriptions, resultNevents uint32
@@ -192,7 +199,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
@@ -227,7 +233,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
@@ -262,7 +267,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
@@ -297,7 +301,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
@@ -332,7 +335,6 @@ func Test_pollOneoff_Stdin(t *testing.T) {
mem: concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
expectedErrno: wasip1.ErrnoSuccess,
@@ -357,6 +359,52 @@ func Test_pollOneoff_Stdin(t *testing.T) {
expectedLog: `
==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=2)
<== (nevents=1,errno=ESUCCESS)
`,
},
{
name: "pollable pipe, multiple subs, events returned out of order",
nsubscriptions: 3,
expectedNevents: 3,
mem: concat(
fdReadSub,
clockNsSub(20*1000*1000),
// Illegal file fd with custom user data to recognize it in the event buffer.
fdReadSubFdWithUserData(100, []byte{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77})),
stdin: &sys.StdinFile{Reader: w},
expectedErrno: wasip1.ErrnoSuccess,
out: 128, // past in
resultNevents: 512, // past out
expectedMem: []byte{
// Clock is acknowledged first.
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit
wasip1.EventTypeClock, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,
// Then an illegal file with custom user data.
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, // userdata
byte(wasip1.ErrnoBadf), 0x0, // errno is 16 bit
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,
// Stdin pipes are delayed to invoke sysfs.poll
// thus, they are written back last.
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
byte(wasip1.ErrnoSuccess), 0x0, // errno is 16 bit
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, // 4 bytes for type enum
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0,
'?', // stopped after encoding
},
expectedLog: `
==> wasi_snapshot_preview1.poll_oneoff(in=0,out=128,nsubscriptions=3)
<== (nevents=3,errno=ESUCCESS)
`,
},
}
@@ -420,7 +468,6 @@ func Test_pollOneoff_Zero(t *testing.T) {
concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
)
@@ -460,7 +507,6 @@ func Test_pollOneoff_Zero(t *testing.T) {
concat(
clockNsSub(20*1000*1000),
fdReadSub,
singleton('?'),
),
)
@@ -491,10 +537,6 @@ func Test_pollOneoff_Zero(t *testing.T) {
require.Equal(t, uint32(1), nevents)
}
func singleton(b byte) []byte {
return []byte{b}
}
func concat(bytes ...[]byte) []byte {
var res []byte
for i := range bytes {
@@ -522,9 +564,26 @@ func fdReadSubFd(fd byte) []byte {
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, // userdata
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
fd, 0x0, 0x0, 0x0, // valid readable FD
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, // pad to 32 bytes
}
}
func fdReadSubFdWithUserData(fd byte, userdata []byte) []byte {
return concat(
userdata,
[]byte{
wasip1.EventTypeFdRead, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
fd, 0x0, 0x0, 0x0, // valid readable FD
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0,
0x0, 0x0, 0x0, 0x0, // pad to 32 bytes
})
}
// subscription for an EventTypeFdRead on stdin
var fdReadSub = fdReadSubFd(byte(sys.FdStdin))

View File

@@ -103,3 +103,30 @@ func TestTcpConnFile_Stat(t *testing.T) {
_, errno := file.Stat()
require.Zero(t, errno, "Stat should not fail")
}
func TestTcpConnFile_SetNonblock(t *testing.T) {
listen, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer listen.Close()
lf := newTCPListenerFile(listen.(*net.TCPListener))
tcpAddr, err := net.ResolveTCPAddr("tcp", listen.Addr().String())
require.NoError(t, err)
tcp, err := net.DialTCP("tcp", nil, tcpAddr)
require.NoError(t, err)
defer tcp.Close() //nolint
errno := lf.SetNonblock(true)
require.EqualErrno(t, 0, errno)
require.True(t, lf.IsNonblock())
conn, errno := lf.Accept()
require.EqualErrno(t, 0, errno)
defer conn.Close()
file := newTcpConn(tcp)
errno = file.SetNonblock(true)
require.EqualErrno(t, 0, errno)
require.True(t, file.IsNonblock())
}

View File

@@ -41,8 +41,9 @@ var _ socketapi.TCPSock = (*tcpListenerFile)(nil)
type tcpListenerFile struct {
baseSockFile
fd uintptr
addr *net.TCPAddr
fd uintptr
addr *net.TCPAddr
nonblock bool
}
// Accept implements the same method as documented on socketapi.TCPSock
@@ -57,9 +58,14 @@ func (f *tcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) {
// SetNonblock implements the same method as documented on sys.File
func (f *tcpListenerFile) SetNonblock(enabled bool) sys.Errno {
f.nonblock = enabled
return sys.UnwrapOSError(setNonblock(f.fd, enabled))
}
func (f *tcpListenerFile) IsNonblock() bool {
return f.nonblock
}
// Close implements the same method as documented on sys.File
func (f *tcpListenerFile) Close() sys.Errno {
return sys.UnwrapOSError(syscall.Close(int(f.fd)))
@@ -75,7 +81,8 @@ var _ socketapi.TCPConn = (*tcpConnFile)(nil)
type tcpConnFile struct {
baseSockFile
fd uintptr
fd uintptr
nonblock bool
// closed is true when closed was called. This ensures proper sys.EBADF
closed bool
@@ -91,9 +98,15 @@ func newTcpConn(tc *net.TCPConn) socketapi.TCPConn {
// SetNonblock implements the same method as documented on sys.File
func (f *tcpConnFile) SetNonblock(enabled bool) (errno sys.Errno) {
f.nonblock = enabled
return sys.UnwrapOSError(setNonblock(f.fd, enabled))
}
// IsNonblock implements the same method as documented on sys.File
func (f *tcpConnFile) IsNonblock() bool {
return f.nonblock
}
// Read implements the same method as documented on sys.File
func (f *tcpConnFile) Read(buf []byte) (n int, errno sys.Errno) {
n, err := syscall.Read(int(f.fd), buf)

View File

@@ -90,6 +90,16 @@ func syscallConnControl(conn syscall.Conn, fn func(fd uintptr) (int, sys.Errno))
return
}
func _pollSock(conn syscall.Conn, flag sys.Pflag, timeoutMillis int32) (bool, sys.Errno) {
if flag != sys.POLLIN {
return false, sys.ENOTSUP
}
n, errno := syscallConnControl(conn, func(fd uintptr) (int, sys.Errno) {
return _poll([]pollFd{newPollFd(fd, _POLLIN, 0)}, timeoutMillis)
})
return n > 0, errno
}
// newTCPListenerFile is a constructor for a socketapi.TCPSock.
//
// Note: currently the Windows implementation of socketapi.TCPSock
@@ -99,9 +109,7 @@ func syscallConnControl(conn syscall.Conn, fn func(fd uintptr) (int, sys.Errno))
// standard library, instead of invoke syscalls/Win32 APIs
// because they are sensibly different from Unix's.
func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
w := &winTcpListenerFile{tl: tl}
_ = w.SetNonblock(true)
return w
return &winTcpListenerFile{tl: tl}
}
var _ socketapi.TCPSock = (*winTcpListenerFile)(nil)
@@ -116,14 +124,11 @@ type winTcpListenerFile struct {
// Accept implements the same method as documented on socketapi.TCPSock
func (f *winTcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) {
// Ensure we have an incoming connection using winsock_select.
n, errno := syscallConnControl(f.tl, func(fd uintptr) (int, sys.Errno) {
return _poll([]pollFd{newPollFd(fd, _POLLIN, 0)}, 0)
})
// Otherwise return immediately.
if n == 0 || errno != 0 {
return nil, sys.EAGAIN
// Ensure we have an incoming connection using winsock_select, otherwise return immediately.
if f.nonblock {
if ready, errno := _pollSock(f.tl, sys.POLLIN, 0); !ready || errno != 0 {
return nil, sys.EAGAIN
}
}
// Accept normally blocks goroutines, but we
@@ -186,6 +191,7 @@ func newTcpConn(tc *net.TCPConn) socketapi.TCPConn {
// SetNonblock implements the same method as documented on sys.File
func (f *winTcpConnFile) SetNonblock(enabled bool) (errno sys.Errno) {
f.nonblock = true
_, errno = syscallConnControl(f.tc, func(fd uintptr) (int, sys.Errno) {
return 0, sys.UnwrapOSError(setNonblockSocket(syscall.Handle(fd), enabled))
})