sysfs: implements PollRead with poll instead of select (#1596)

Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
This commit is contained in:
Edoardo Vacchi
2023-07-28 10:49:17 +02:00
committed by GitHub
parent d5d7ac82f1
commit 023a38349a
26 changed files with 525 additions and 1095 deletions

View File

@@ -1270,19 +1270,18 @@ as for regular file descriptors: data is assumed to be present and
a success is written back to the result buffer.
However, if the reader is detected to read from `os.Stdin`,
a special code path is followed, invoking `platform.Select()`.
a special code path is followed, invoking `sysfs.poll()`.
`platform.Select()` is a wrapper for `select(2)` on POSIX systems,
`sysfs.poll()` is a wrapper for `poll(2)` on POSIX systems,
and it is emulated on Windows.
### Select on POSIX
### Poll on POSIX
On POSIX systems,`select(2)` allows to wait for incoming data on a file
On POSIX systems, `poll(2)` allows to wait for incoming data on a file
descriptor, and block until either data becomes available or the timeout
expires. It is not surprising that `select(2)` and `poll(2)` have lot in common:
the main difference is how the file descriptor parameters are passed.
expires.
Usage of `platform.Select()` is only reserved for the standard input case, because
Usage of `syfs.poll()` is currently only reserved for standard input, because
1. it is really only necessary to handle interactive input: otherwise,
there is no way in Go to peek from Standard Input without actually
@@ -1291,11 +1290,11 @@ Usage of `platform.Select()` is only reserved for the standard input case, becau
2. if `Stdin` is connected to a pipe, it is ok in most cases to return
with success immediately;
3. `platform.Select()` is currently a blocking call, irrespective of goroutines,
3. `syfs.poll()` is currently a blocking call, irrespective of goroutines,
because the underlying syscall is; thus, it is better to limit its usage.
So, if the subscription is for `os.Stdin` and the handle is detected
to correspond to an interactive session, then `platform.Select()` will be
to correspond to an interactive session, then `sysfs.poll()` will be
invoked with a the `Stdin` handle *and* the timeout.
This also means that in this specific case, the timeout is uninterruptible,
@@ -1303,7 +1302,7 @@ unless data becomes available on `Stdin` itself.
### Select on Windows
On Windows `platform.Select()` cannot be delegated to a single
On Windows `sysfs.poll()` cannot be delegated to a single
syscall, because there is no single syscall to handle sockets,
pipes and regular files.
@@ -1313,14 +1312,13 @@ of interest.
- For regular files, we _always_ report them as ready, as
[most operating systems do anyway][async-io-windows].
- For pipes, we iterate on the given `readfds`
and we invoke [`PeekNamedPipe`][peeknamedpipe]. We currently ignore
`writefds` and `exceptfds` for pipes. In particular,
`Stdin`, when present, is set to the `readfds` FdSet.
- For pipes, we invoke [`PeekNamedPipe`][peeknamedpipe]
for each file handle we detect is a pipe open for reading.
We currently ignore pipes open for writing.
- Notably, we include also support for sockets using the [WinSock
implementation of `select`][winsock-select], but instead
of relying on the timeout argument of the `select` function,
implementation of `poll`][wsapoll], but instead
of relying on the timeout argument of the `WSAPoll` function,
we set a 0-duration timeout so that it behaves like a peek.
This way, we can check for regular files all at once,
@@ -1328,10 +1326,22 @@ at the beginning of the function, then we poll pipes and
sockets periodically using a cancellable `time.Tick`,
which plays nicely with the rest of the Go runtime.
### Impact of blocking
Because this is a blocking syscall, it will also block the carrier thread of
the goroutine, preventing any means to support context cancellation directly.
There are ways to obviate this issue. We outline here one idea, that is however
not currently implemented. A common approach to support context cancellation is
to add a signal file descriptor to the set, e.g. the read-end of a pipe or an
eventfd on Linux. When the context is canceled, we may unblock a Select call by
writing to the fd, causing it to return immediately. This however requires to
do a bit of housekeeping to hide the "special" FD from the end-user.
[poll_oneoff]: https://github.com/WebAssembly/wasi-poll#why-is-the-function-called-poll_oneoff
[async-io-windows]: https://tinyclouds.org/iocp_links
[peeknamedpipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
[winsock-select]: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
[wsapoll]: https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsapoll
## Signed encoding of integer global constant initializers

View File

@@ -214,11 +214,14 @@ type File interface {
// - zero returns immediately
// - any negative value blocks any amount of time
//
// # Errors
// # Results
//
// A zero sys.Errno is success. The below are expected otherwise:
// `ready` means there was data ready to read or false if not or when
// `errno` is not zero.
//
// A zero `errno` is success. The below are expected otherwise:
// - sys.ENOSYS: the implementation does not support this function.
// - sys.EINTR: the call was interrupted prior to data being readable.
// - sys.EINTR: the call was interrupted prior to an event.
//
// # Notes
//
@@ -226,6 +229,7 @@ type File interface {
// See https://pubs.opengroup.org/onlinepubs/9699919799/functions/poll.html
// - No-op files, such as those which read from /dev/null, should return
// immediately true, as data will never become readable.
// - See /RATIONALE.md for detailed notes including impact of blocking.
PollRead(timeoutMillis int32) (ready bool, errno experimentalsys.Errno)
// Readdir reads the contents of the directory associated with file and

View File

@@ -1,25 +0,0 @@
//go:build !windows
package platform
// Set adds the given fd to the set.
func (f *FdSet) Set(fd int) {
f.Bits[fd/nfdbits] |= (1 << (uintptr(fd) % nfdbits))
}
// Clear removes the given fd from the set.
func (f *FdSet) Clear(fd int) {
f.Bits[fd/nfdbits] &^= (1 << (uintptr(fd) % nfdbits))
}
// IsSet returns true when fd is in the set.
func (f *FdSet) IsSet(fd int) bool {
return f.Bits[fd/nfdbits]&(1<<(uintptr(fd)%nfdbits)) != 0
}
// Zero clears the set.
func (f *FdSet) Zero() {
for i := range f.Bits {
f.Bits[i] = 0
}
}

View File

@@ -1,8 +0,0 @@
package platform
import "syscall"
const nfdbits = 0x20
// FdSet re-exports syscall.FdSet with utility methods.
type FdSet syscall.FdSet

View File

@@ -1,8 +0,0 @@
package platform
import "syscall"
const nfdbits = 0x40
// FdSet re-exports syscall.FdSet with utility methods.
type FdSet syscall.FdSet

View File

@@ -1,106 +0,0 @@
//go:build !windows
package platform
import (
"testing"
"github.com/tetratelabs/wazero/internal/testing/require"
)
func TestFdSet(t *testing.T) {
allBitsSetAtIndex0 := FdSet{}
allBitsSetAtIndex0.Bits[0] = -1
tests := []struct {
name string
init FdSet
exec func(fdSet *FdSet)
expected FdSet
}{
{
name: "all bits set",
exec: func(fdSet *FdSet) {
for fd := 0; fd < nfdbits; fd++ {
fdSet.Set(fd)
}
},
expected: allBitsSetAtIndex0,
},
{
name: "all bits cleared",
init: allBitsSetAtIndex0,
exec: func(fdSet *FdSet) {
for fd := 0; fd < nfdbits; fd++ {
fdSet.Clear(fd)
}
},
expected: FdSet{},
},
{
name: "zero should clear all bits",
init: allBitsSetAtIndex0,
exec: func(fdSet *FdSet) {
fdSet.Zero()
},
expected: FdSet{},
},
{
name: "is-set should return true for all bits",
init: allBitsSetAtIndex0,
exec: func(fdSet *FdSet) {
for i := range fdSet.Bits {
require.True(t, fdSet.IsSet(i))
}
},
expected: allBitsSetAtIndex0,
},
{
name: "is-set should return true for all odd bits",
init: FdSet{},
exec: func(fdSet *FdSet) {
for fd := 1; fd < nfdbits; fd += 2 {
fdSet.Set(fd)
}
for fd := 0; fd < nfdbits; fd++ {
isSet := fdSet.IsSet(fd)
if fd&0x1 == 0x1 {
require.True(t, isSet)
} else {
require.False(t, isSet)
}
}
fdSet.Zero()
},
expected: FdSet{},
},
{
name: "should clear all even bits",
init: allBitsSetAtIndex0,
exec: func(fdSet *FdSet) {
for fd := 0; fd < nfdbits; fd += 2 {
fdSet.Clear(fd)
}
for fd := 0; fd < nfdbits; fd++ {
isSet := fdSet.IsSet(fd)
if fd&0x1 == 0x1 {
require.True(t, isSet)
} else {
require.False(t, isSet)
}
}
fdSet.Zero()
},
expected: FdSet{},
},
}
for _, tt := range tests {
tc := tt
t.Run(tc.name, func(t *testing.T) {
x := tc.init
tc.exec(&x)
require.Equal(t, tc.expected, x)
})
}
}

View File

@@ -1,10 +0,0 @@
//go:build !darwin && !linux && !windows
package platform
const nfdbits = 0x40
// FdSet mocks syscall.FdSet on systems that do not support it.
type FdSet struct {
Bits [16]int64
}

View File

@@ -1,239 +0,0 @@
package platform
import (
"syscall"
"unsafe"
)
var procGetNamedPipeInfo = kernel32.NewProc("GetNamedPipeInfo")
// Maximum number of fds in a WinSockFdSet.
const _FD_SETSIZE = 64
// WinSockFdSet implements the FdSet representation that is used internally by WinSock.
//
// Note: this representation is quite different from the one used in most POSIX implementations
// where a bitfield is usually implemented; instead on Windows we have a simpler array+count pair.
// Notice that because it keeps a count of the inserted handles, the first argument of select
// in WinSock is actually ignored.
//
// The implementation of the Set, Clear, IsSet, Zero, methods follows exactly
// the real implementation found in WinSock2.h, e.g. see:
// https://github.com/microsoft/win32metadata/blob/ef7725c75c6b39adfdc13ba26fb1d89ac954449a/generation/WinSDK/RecompiledIdlHeaders/um/WinSock2.h#L124-L175
type WinSockFdSet struct {
// count is the number of used slots used in the handles slice.
count uint64
// handles is the array of handles. This is called "array" in the WinSock implementation
// and it has a fixed length of _FD_SETSIZE.
handles [_FD_SETSIZE]syscall.Handle
}
// FdSet implements the same methods provided on other plaforms.
//
// Note: the implementation is very different from POSIX; Windows provides
// POSIX select only for sockets. We emulate a select for other APIs in the sysfs
// package, but we still want to use the "real" select in the case of sockets.
// So, we keep separate FdSets for sockets, pipes and regular files, so that we can
// handle them separately. For instance sockets can be used directly in winsock select.
type FdSet struct {
sockets WinSockFdSet
pipes WinSockFdSet
regular WinSockFdSet
}
// SetAll overwrites all the fields in f with the fields in g.
func (f *FdSet) SetAll(g *FdSet) {
if f == nil {
return
}
f.sockets = g.sockets
f.pipes = g.pipes
f.regular = g.regular
}
// Sockets returns a WinSockFdSet containing the handles in this FdSet that are sockets.
func (f *FdSet) Sockets() *WinSockFdSet {
if f == nil {
return nil
}
return &f.sockets
}
// Regular returns a WinSockFdSet containing the handles in this FdSet that are regular files.
func (f *FdSet) Regular() *WinSockFdSet {
if f == nil {
return nil
}
return &f.regular
}
// Pipes returns a WinSockFdSet containing the handles in this FdSet that are pipes.
func (f *FdSet) Pipes() *WinSockFdSet {
if f == nil {
return nil
}
return &f.pipes
}
// getFdSetFor returns a pointer to the right fd set for the given fd.
// It checks the type for fd and returns the field for pipes, regular or sockets
// to simplify code.
//
// For instance, if fd is a socket and it must be set if f.pipes, instead
// of writing:
//
// if isSocket(fd) {
// f.sockets.Set(fd)
// }
//
// It is possible to write:
//
// f.getFdSetFor(fd).Set(fd)
func (f *FdSet) getFdSetFor(fd int) *WinSockFdSet {
h := syscall.Handle(fd)
t, err := syscall.GetFileType(h)
if err != nil {
return nil
}
switch t {
case syscall.FILE_TYPE_CHAR, syscall.FILE_TYPE_DISK:
return &f.regular
case syscall.FILE_TYPE_PIPE:
if isSocket(h) {
return &f.sockets
} else {
return &f.pipes
}
default:
return nil
}
}
// Set adds the given fd to the set.
func (f *FdSet) Set(fd int) {
if s := f.getFdSetFor(fd); s != nil {
s.Set(fd)
}
}
// Clear removes the given fd from the set.
func (f *FdSet) Clear(fd int) {
if s := f.getFdSetFor(fd); s != nil {
s.Clear(fd)
}
}
// IsSet returns true when fd is in the set.
func (f *FdSet) IsSet(fd int) bool {
if s := f.getFdSetFor(fd); s != nil {
return s.IsSet(fd)
}
return false
}
// Copy returns a copy of this FdSet. It returns nil, if the FdSet is nil.
func (f *FdSet) Copy() *FdSet {
if f == nil {
return nil
}
return &FdSet{
sockets: f.sockets,
pipes: f.pipes,
regular: f.regular,
}
}
// Zero clears the set. It returns 0 if the FdSet is nil.
func (f *FdSet) Count() int {
if f == nil {
return 0
}
return f.sockets.Count() + f.regular.Count() + f.pipes.Count()
}
// Zero clears the set.
func (f *FdSet) Zero() {
if f == nil {
return
}
f.sockets.Zero()
f.regular.Zero()
f.pipes.Zero()
}
// Set adds the given fd to the set.
func (f *WinSockFdSet) Set(fd int) {
if f.count < _FD_SETSIZE {
f.handles[f.count] = syscall.Handle(fd)
f.count++
}
}
// Clear removes the given fd from the set.
func (f *WinSockFdSet) Clear(fd int) {
h := syscall.Handle(fd)
for i := uint64(0); i < f.count; i++ {
if f.handles[i] == h {
for ; i < f.count-1; i++ {
f.handles[i] = f.handles[i+1]
}
f.count--
break
}
}
}
// IsSet returns true when fd is in the set.
func (f *WinSockFdSet) IsSet(fd int) bool {
h := syscall.Handle(fd)
for i := uint64(0); i < f.count; i++ {
if f.handles[i] == h {
return true
}
}
return false
}
// Zero clears the set.
func (f *WinSockFdSet) Zero() {
if f == nil {
return
}
f.handles = [64]syscall.Handle{}
f.count = 0
}
// Count returns the number of values that are set in the fd set.
func (f *WinSockFdSet) Count() int {
if f == nil {
return 0
}
return int(f.count)
}
// Copy returns a copy of the fd set or nil if it is nil.
func (f *WinSockFdSet) Copy() *WinSockFdSet {
if f == nil {
return nil
}
copy := *f
return &copy
}
// Get returns the handle at the given index.
func (f *WinSockFdSet) Get(index int) syscall.Handle {
return f.handles[index]
}
// isSocket returns true if the given file handle
// is a pipe.
func isSocket(fd syscall.Handle) bool {
r, _, errno := syscall.SyscallN(
procGetNamedPipeInfo.Addr(),
uintptr(fd),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)))
return r == 0 || errno != 0
}

View File

@@ -1,156 +0,0 @@
package platform
import (
"net"
"os"
"syscall"
"testing"
"github.com/tetratelabs/wazero/internal/testing/require"
)
func TestWinSockFdSet(t *testing.T) {
allSet := WinSockFdSet{
count: _FD_SETSIZE,
}
for i := 0; i < _FD_SETSIZE; i++ {
allSet.handles[i] = syscall.Handle(i)
}
shiftedFields := WinSockFdSet{
count: _FD_SETSIZE - 1,
}
for i := 0; i < _FD_SETSIZE; i++ {
shiftedFields.handles[i] = syscall.Handle(i)
}
for i := _FD_SETSIZE / 2; i < _FD_SETSIZE-1; i++ {
shiftedFields.handles[i] = syscall.Handle(i + 1)
}
tests := []struct {
name string
init WinSockFdSet
exec func(fdSet *WinSockFdSet)
expected WinSockFdSet
}{
{
name: "all fields set",
exec: func(fdSet *WinSockFdSet) {
for fd := 0; fd < _FD_SETSIZE; fd++ {
fdSet.Set(fd)
}
},
expected: allSet,
},
{
name: "clear should shift all fields by one position",
init: allSet,
exec: func(fdSet *WinSockFdSet) {
fdSet.Clear(_FD_SETSIZE / 2)
},
expected: shiftedFields,
},
{
name: "zero should clear all fields",
init: allSet,
exec: func(fdSet *WinSockFdSet) {
fdSet.Zero()
},
expected: WinSockFdSet{},
},
{
name: "is-set should return true for all fields",
init: allSet,
exec: func(fdSet *WinSockFdSet) {
for i := 0; i < fdSet.Count(); i++ {
require.True(t, fdSet.IsSet(i))
}
},
expected: allSet,
},
{
name: "is-set should return true for all odd bits",
init: WinSockFdSet{},
exec: func(fdSet *WinSockFdSet) {
for fd := 1; fd < _FD_SETSIZE; fd += 2 {
fdSet.Set(fd)
}
for fd := 0; fd < _FD_SETSIZE; fd++ {
isSet := fdSet.IsSet(fd)
if fd&0x1 == 0x1 {
require.True(t, isSet)
} else {
require.False(t, isSet)
}
}
fdSet.Zero()
},
expected: WinSockFdSet{},
},
{
name: "should clear all even bits",
init: allSet,
exec: func(fdSet *WinSockFdSet) {
for fd := 0; fd < _FD_SETSIZE; fd += 2 {
fdSet.Clear(fd)
}
for fd := 0; fd < _FD_SETSIZE; fd++ {
isSet := fdSet.IsSet(fd)
if fd&0x1 == 0x1 {
require.True(t, isSet)
} else {
require.False(t, isSet)
}
}
fdSet.Zero()
},
expected: WinSockFdSet{},
},
}
for _, tt := range tests {
tc := tt
t.Run(tc.name, func(t *testing.T) {
x := tc.init
tc.exec(&x)
require.Equal(t, tc.expected, x)
})
}
}
func TestFdSet(t *testing.T) {
t.Run("A pipe should be set in FdSet.Pipe", func(t *testing.T) {
r, _, _ := os.Pipe()
defer r.Close()
fdSet := FdSet{}
fdSet.Set(int(r.Fd()))
require.Equal(t, syscall.Handle(r.Fd()), fdSet.Pipes().Get(0))
})
t.Run("A regular file should be set in FdSet.Regular", func(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "test")
require.NoError(t, err)
defer f.Close()
fdSet := FdSet{}
fdSet.Set(int(f.Fd()))
require.Equal(t, syscall.Handle(f.Fd()), fdSet.Regular().Get(0))
})
t.Run("A socket should be set in FdSet.Socket", func(t *testing.T) {
listen, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer listen.Close()
conn, err := listen.(*net.TCPListener).SyscallConn()
require.NoError(t, err)
conn.Control(func(fd uintptr) {
fdSet := FdSet{}
fdSet.Set(int(fd))
require.Equal(t, syscall.Handle(fd), fdSet.Sockets().Get(0))
})
})
}

View File

@@ -9,7 +9,6 @@ import (
experimentalsys "github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/fsapi"
"github.com/tetratelabs/wazero/internal/platform"
"github.com/tetratelabs/wazero/sys"
)
@@ -183,21 +182,7 @@ func (f *osFile) Seek(offset int64, whence int) (newOffset int64, errno experime
// PollRead implements the same method as documented on fsapi.File
func (f *osFile) PollRead(timeoutMillis int32) (ready bool, errno experimentalsys.Errno) {
fdSet := platform.FdSet{}
fd := int(f.fd)
fdSet.Set(fd)
nfds := fd + 1 // See https://man7.org/linux/man-pages/man2/select.2.html#:~:text=condition%20has%20occurred.-,nfds,-This%20argument%20should
// Coerce negative timeout to -1 as that's defined in POSIX
if timeoutMillis < 0 {
timeoutMillis = -1
}
ready, err := _select(nfds, &fdSet, nil, nil, timeoutMillis)
if errno = experimentalsys.UnwrapOSError(err); errno != 0 {
// Defer validation overhead until we've already had an error.
errno = fileError(f, f.closed, errno)
}
return ready, errno
return pollRead(f.fd, timeoutMillis)
}
// Readdir implements File.Readdir. Notably, this uses "Readdir", not

13
internal/sysfs/poll.go Normal file
View File

@@ -0,0 +1,13 @@
//go:build windows || linux || darwin
package sysfs
import "github.com/tetratelabs/wazero/experimental/sys"
// pollRead implements `PollRead` as documented on fsapi.File via a file
// descriptor.
func pollRead(fd uintptr, timeoutMillis int32) (ready bool, errno sys.Errno) {
fds := []pollFd{newPollFd(fd, _POLLIN, 0)}
count, errno := poll(fds, timeoutMillis)
return count > 0, errno
}

View File

@@ -0,0 +1,55 @@
package sysfs
import (
"unsafe"
"github.com/tetratelabs/wazero/experimental/sys"
)
// pollFd is the struct to query for file descriptor events using poll.
type pollFd struct {
// fd is the file descriptor.
fd int32
// events is a bitmap containing the requested events.
events int16
// revents is a bitmap containing the returned events.
revents int16
}
// newPollFd is a constructor for pollFd that abstracts the platform-specific type of file descriptors.
func newPollFd(fd uintptr, events, revents int16) pollFd {
return pollFd{fd: int32(fd), events: events, revents: revents}
}
// _POLLIN subscribes a notification when any readable data is available.
const _POLLIN = 0x0001
// poll implements poll on Darwin via the corresponding libc function.
func poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) {
var fdptr *pollFd
nfds := len(fds)
if nfds > 0 {
fdptr = &fds[0]
}
n1, _, err := syscall_syscall6(
libc_poll_trampoline_addr,
uintptr(unsafe.Pointer(fdptr)),
uintptr(nfds),
uintptr(int(timeoutMillis)),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)))
return int(n1), sys.UnwrapOSError(err)
}
// libc_poll_trampoline_addr is the address of the
// `libc_poll_trampoline` symbol, defined in `poll_darwin.s`.
//
// We use this to invoke the syscall through syscall_syscall6 imported below.
var libc_poll_trampoline_addr uintptr
// Imports the select symbol from libc as `libc_poll`.
//
// Note: CGO mechanisms are used in darwin regardless of the CGO_ENABLED value
// or the "cgo" build flag. See /RATIONALE.md for why.
//go:cgo_import_dynamic libc_poll poll "/usr/lib/libSystem.B.dylib"

View File

@@ -0,0 +1,8 @@
// lifted from golang.org/x/sys unix
#include "textflag.h"
TEXT libc_poll_trampoline<>(SB), NOSPLIT, $0-0
JMP libc_poll(SB)
GLOBL ·libc_poll_trampoline_addr(SB), RODATA, $8
DATA ·libc_poll_trampoline_addr(SB)/8, $libc_poll_trampoline<>(SB)

View File

@@ -0,0 +1,57 @@
package sysfs
import (
"syscall"
"time"
"unsafe"
"github.com/tetratelabs/wazero/experimental/sys"
)
// pollFd is the struct to query for file descriptor events using poll.
type pollFd struct {
// fd is the file descriptor.
fd int32
// events is a bitmap containing the requested events.
events int16
// revents is a bitmap containing the returned events.
revents int16
}
// newPollFd is a constructor for pollFd that abstracts the platform-specific type of file descriptors.
func newPollFd(fd uintptr, events, revents int16) pollFd {
return pollFd{fd: int32(fd), events: events, revents: revents}
}
// _POLLIN subscribes a notification when any readable data is available.
const _POLLIN = 0x0001
// poll implements poll on Linux via ppoll.
func poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) {
var ts syscall.Timespec
if timeoutMillis >= 0 {
ts = syscall.NsecToTimespec(int64(time.Duration(timeoutMillis) * time.Millisecond))
}
return ppoll(fds, &ts)
}
// ppoll is a poll variant that allows to subscribe to a mask of signals.
// However, we do not need such mask, so the corresponding argument is always nil.
func ppoll(fds []pollFd, timespec *syscall.Timespec) (n int, err sys.Errno) {
var fdptr *pollFd
nfd := len(fds)
if nfd != 0 {
fdptr = &fds[0]
}
n1, _, errno := syscall.Syscall6(
uintptr(syscall.SYS_PPOLL),
uintptr(unsafe.Pointer(fdptr)),
uintptr(nfd),
uintptr(unsafe.Pointer(timespec)),
uintptr(unsafe.Pointer(nil)), // sigmask is currently always ignored
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)))
return int(n1), sys.UnwrapOSError(errno)
}

View File

@@ -0,0 +1,60 @@
package sysfs
import (
"os"
"runtime"
"testing"
"time"
"github.com/tetratelabs/wazero/internal/testing/require"
)
func TestPoll(t *testing.T) {
t.Run("should return immediately with no fds and duration 0", func(t *testing.T) {
n, err := poll([]pollFd{}, 0)
require.EqualErrno(t, 0, err)
require.Equal(t, 0, n)
})
t.Run("should wait for the given duration", func(t *testing.T) {
dur := int32(250)
var took time.Duration
// On some platforms (e.g. Linux), the passed-in timeval is
// updated by select(2). We are not accounting for this
// in our implementation.
start := time.Now()
n, err := poll([]pollFd{}, dur)
took = time.Since(start)
require.EqualErrno(t, 0, err)
require.Equal(t, 0, n)
// On some platforms the actual timeout might be arbitrarily
// less than requested.
if took < time.Duration(dur)*time.Millisecond {
if runtime.GOOS == "linux" {
// Linux promises to only return early if a file descriptor
// becomes ready (not applicable here), or the call
// is interrupted by a signal handler (explicitly retried in the loop above),
// or the timeout expires.
t.Errorf("Select: slept for %v, expected %v", took, dur)
} else {
t.Logf("Select: slept for %v, requested %v", took, dur)
}
}
})
t.Run("should return 1 if a given FD has data", func(t *testing.T) {
rr, ww, err := os.Pipe()
require.NoError(t, err)
defer rr.Close()
defer ww.Close()
_, err = ww.Write([]byte("TEST"))
require.NoError(t, err)
fds := []pollFd{newPollFd(rr.Fd(), _POLLIN, 0)}
n, err := poll(fds, 0)
require.EqualErrno(t, 0, err)
require.Equal(t, 1, n)
})
}

View File

@@ -0,0 +1,11 @@
//go:build !linux && !darwin && !windows
package sysfs
import "github.com/tetratelabs/wazero/experimental/sys"
// pollRead implements `PollRead` as documented on fsapi.File via a file
// descriptor.
func pollRead(fd uintptr, timeoutMillis int32) (ready bool, errno sys.Errno) {
return false, sys.ENOSYS
}

View File

@@ -0,0 +1,227 @@
package sysfs
import (
"syscall"
"time"
"unsafe"
"github.com/tetratelabs/wazero/experimental/sys"
)
var (
procWSAPoll = modws2_32.NewProc("WSAPoll")
procGetNamedPipeInfo = kernel32.NewProc("GetNamedPipeInfo")
)
const (
// _POLLRDNORM subscribes to normal data for read.
_POLLRDNORM = 0x0100
// _POLLRDBAND subscribes to priority band (out-of-band) data for read.
_POLLRDBAND = 0x0200
// _POLLIN subscribes a notification when any readable data is available.
_POLLIN = (_POLLRDNORM | _POLLRDBAND)
)
// pollFd is the struct to query for file descriptor events using poll.
type pollFd struct {
// fd is the file descriptor.
fd uintptr
// events is a bitmap containing the requested events.
events int16
// revents is a bitmap containing the returned events.
revents int16
}
// newPollFd is a constructor for pollFd that abstracts the platform-specific type of file descriptors.
func newPollFd(fd uintptr, events, revents int16) pollFd {
return pollFd{fd: fd, events: events, revents: revents}
}
// pollInterval is the interval between each calls to peekNamedPipe in selectAllHandles
const pollInterval = 100 * time.Millisecond
// poll implements poll on Windows, for a subset of cases.
//
// pollWithContext emulates the behavior of POSIX poll(2) on Windows, for a subset of cases,
// and it supports context cancellation.
//
// fds may contain any number of file handles, but regular files and pipes are only processed for _POLLIN.
// Stdin is a pipe, thus it is checked for readiness when present. Pipes are checked using PeekNamedPipe.
// Regular files always immediately reported as ready, regardless their actual state and timeouts.
//
// If n==0 it will wait for the given timeout duration, but it will return sys.ENOSYS if timeout is nil,
// i.e. it won't block indefinitely. The given ctx is used to allow for cancellation,
// and it is currently used only in tests.
//
// The implementation actually polls every 100 milliseconds (pollInterval) until it reaches the
// given timeout (in millis).
//
// The duration may be negative, in which case it will wait indefinitely. The given ctx is
// used to allow for cancellation, and it is currently used only in tests.
func poll(fds []pollFd, timeoutMillis int32) (n int, errno sys.Errno) {
if fds == nil {
return -1, sys.ENOSYS
}
regular, pipes, sockets, errno := partionByFtype(fds)
nregular := len(regular)
if errno != 0 {
return -1, errno
}
// Ticker that emits at every pollInterval.
tick := time.NewTicker(pollInterval)
tickCh := tick.C
defer tick.Stop()
// Timer that expires after the given duration.
// Initialize afterCh as nil: the select below will wait forever.
var afterCh <-chan time.Time
if timeoutMillis >= 0 {
// If duration is not nil, instantiate the timer.
after := time.NewTimer(time.Duration(timeoutMillis) * time.Millisecond)
defer after.Stop()
afterCh = after.C
}
npipes, nsockets, errno := peekAll(pipes, sockets)
if errno != 0 {
return -1, errno
}
count := nregular + npipes + nsockets
if count > 0 {
return count, 0
}
for {
select {
case <-afterCh:
return 0, 0
case <-tickCh:
npipes, nsockets, errno := peekAll(pipes, sockets)
if errno != 0 {
return -1, errno
}
count = nregular + npipes + nsockets
if count > 0 {
return count, 0
}
}
}
}
func peekAll(pipes, sockets []pollFd) (npipes, nsockets int, errno sys.Errno) {
npipes, errno = peekPipes(pipes)
if errno != 0 {
return
}
// Invoke wsaPoll with a 0-timeout to avoid blocking.
// Timeouts are handled in pollWithContext instead.
nsockets, errno = wsaPoll(sockets, 0)
if errno != 0 {
return
}
count := npipes + nsockets
if count > 0 {
return
}
return
}
func peekPipes(fds []pollFd) (n int, errno sys.Errno) {
for _, fd := range fds {
bytes, errno := peekNamedPipe(syscall.Handle(fd.fd))
if errno != 0 {
return -1, sys.UnwrapOSError(errno)
}
if bytes > 0 {
n++
}
}
return
}
// wsaPoll is the WSAPoll function from winsock2.
//
// See https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsapoll
func wsaPoll(fds []pollFd, timeout int) (n int, errno sys.Errno) {
if len(fds) > 0 {
sockptr := &fds[0]
ns, _, e := syscall.SyscallN(
procWSAPoll.Addr(),
uintptr(unsafe.Pointer(sockptr)),
uintptr(len(fds)),
uintptr(timeout))
if e != 0 {
return -1, sys.UnwrapOSError(e)
}
n = int(ns)
}
return
}
// ftype is a type of file that can be handled by poll.
type ftype uint8
const (
ftype_regular ftype = iota
ftype_pipe
ftype_socket
)
// partionByFtype checks the type of each fd in fds and returns 3 distinct partitions
// for regular files, named pipes and sockets.
func partionByFtype(fds []pollFd) (regular, pipe, socket []pollFd, errno sys.Errno) {
for _, pfd := range fds {
t, errno := ftypeOf(pfd.fd)
if errno != 0 {
return nil, nil, nil, errno
}
switch t {
case ftype_regular:
regular = append(regular, pfd)
case ftype_pipe:
pipe = append(pipe, pfd)
case ftype_socket:
socket = append(socket, pfd)
}
}
return
}
// ftypeOf checks the type of fd and return the corresponding ftype.
func ftypeOf(fd uintptr) (ftype, sys.Errno) {
h := syscall.Handle(fd)
t, err := syscall.GetFileType(h)
if err != nil {
return 0, sys.UnwrapOSError(err)
}
switch t {
case syscall.FILE_TYPE_CHAR, syscall.FILE_TYPE_DISK:
return ftype_regular, 0
case syscall.FILE_TYPE_PIPE:
if isSocket(h) {
return ftype_socket, 0
} else {
return ftype_pipe, 0
}
default:
return ftype_regular, 0
}
}
// isSocket returns true if the given file handle
// is a pipe.
func isSocket(fd syscall.Handle) bool {
r, _, errno := syscall.SyscallN(
procGetNamedPipeInfo.Addr(),
uintptr(fd),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)),
uintptr(unsafe.Pointer(nil)))
return r == 0 || errno != 0
}

View File

@@ -1,7 +1,6 @@
package sysfs
import (
"context"
"net"
"os"
"syscall"
@@ -9,47 +8,27 @@ import (
"time"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
"github.com/tetratelabs/wazero/internal/testing/require"
)
func TestSelect_Windows(t *testing.T) {
func TestPoll_Windows(t *testing.T) {
type result struct {
n int
fdSet platform.FdSet
err sys.Errno
n int
err sys.Errno
}
testCtx, cancel := context.WithCancel(context.Background())
defer cancel()
handleAsFdSet := func(readHandle syscall.Handle) *platform.FdSet {
var fdSet platform.FdSet
fdSet.Set(int(readHandle))
return &fdSet
}
pollToChannel := func(readHandle syscall.Handle, duration *time.Duration, ch chan result) {
pollToChannel := func(fd uintptr, timeoutMillis int32, ch chan result) {
r := result{}
fdSet := handleAsFdSet(readHandle)
r.n, r.err = selectAllHandles(testCtx, fdSet, nil, nil, duration)
r.fdSet = *fdSet
fds := []pollFd{{fd: fd, events: _POLLIN}}
r.n, r.err = poll(fds, timeoutMillis)
ch <- r
close(ch)
}
t.Run("syscall_select returns sys.ENOSYS when n == 0 and timeoutNanos is negative", func(t *testing.T) {
ready, errno := syscall_select(0, nil, nil, nil, -1)
t.Run("poll returns sys.ENOSYS when n == 0 and timeoutMillis is negative", func(t *testing.T) {
n, errno := poll(nil, -1)
require.Equal(t, -1, n)
require.EqualErrno(t, sys.ENOSYS, errno)
require.False(t, ready)
})
t.Run("syscall_select propagates error when peekAllPipes returns an negative", func(t *testing.T) {
fdSet := platform.FdSet{}
fdSet.Pipes().Set(-1)
ready, errno := syscall_select(0, &fdSet, nil, nil, -1)
require.EqualErrno(t, sys.ENOSYS, errno)
require.False(t, ready)
})
t.Run("peekNamedPipe should report the correct state of incoming data in the pipe", func(t *testing.T) {
@@ -75,63 +54,51 @@ func TestSelect_Windows(t *testing.T) {
require.Equal(t, 6, int(n))
})
t.Run("peekAllPipes should return an error on invalid handle", func(t *testing.T) {
fdSet := platform.WinSockFdSet{}
fdSet.Set(int(-1))
err := peekAllPipes(&fdSet)
t.Run("peekPipes should return an error on invalid handle", func(t *testing.T) {
fds := []pollFd{{fd: uintptr(syscall.InvalidHandle)}}
_, err := peekPipes(fds)
require.EqualErrno(t, sys.EBADF, err)
})
t.Run("peekAllHandles should return an error on invalid handle", func(t *testing.T) {
fdSet := platform.FdSet{}
fdSet.Pipes().Set(-1)
n, err := peekAllHandles(&fdSet, nil, nil)
t.Run("peekAll should return an error on invalid handle", func(t *testing.T) {
fds := []pollFd{{fd: uintptr(syscall.InvalidHandle)}}
_, _, err := peekAll(fds, nil)
require.EqualErrno(t, sys.EBADF, err)
require.Equal(t, 0, n)
fdSet.Pipes().Zero()
fdSet.Sockets().Set(-1)
n, err = peekAllHandles(&fdSet, nil, nil)
require.EqualErrno(t, sys.EBADF, err)
require.Equal(t, 0, n)
})
t.Run("peekAllHandles should return successfully with a regular file", func(t *testing.T) {
t.Run("poll should return successfully with a regular file", func(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "test")
require.NoError(t, err)
defer f.Close()
fdSet := platform.FdSet{}
fdSet.Set(int(f.Fd()))
fds := []pollFd{{fd: f.Fd()}}
n, errno := peekAllHandles(&fdSet, nil, nil)
n, errno := poll(fds, 0)
require.Zero(t, errno)
require.Equal(t, 1, n)
require.Equal(t, syscall.Handle(f.Fd()), fdSet.Regular().Get(0))
})
t.Run("peekAllHandles should return successfully with a pipe", func(t *testing.T) {
t.Run("peekAll should return successfully with a pipe", func(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
defer r.Close()
defer w.Close()
fdSet := platform.FdSet{}
fdSet.Set(int(r.Fd()))
fds := []pollFd{{fd: r.Fd()}}
n, errno := peekAllHandles(&fdSet, nil, nil)
npipes, nsockets, errno := peekAll(fds, nil)
require.Zero(t, errno)
require.Equal(t, 0, n)
require.Equal(t, 0, fdSet.Pipes().Count())
require.Equal(t, 0, npipes)
require.Equal(t, 0, nsockets)
w.Write([]byte("wazero"))
fdSet.Set(int(r.Fd()))
n, errno = peekAllHandles(&fdSet, nil, nil)
npipes, nsockets, errno = peekAll(fds, nil)
require.Zero(t, errno)
require.Equal(t, 1, n)
require.Equal(t, syscall.Handle(r.Fd()), fdSet.Pipes().Get(0))
require.Equal(t, 1, npipes)
require.Equal(t, 0, nsockets)
})
t.Run("peekAllHandles should return successfully with a socket", func(t *testing.T) {
t.Run("peekAll should return successfully with a socket", func(t *testing.T) {
listen, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer listen.Close()
@@ -139,15 +106,15 @@ func TestSelect_Windows(t *testing.T) {
conn, err := listen.(*net.TCPListener).SyscallConn()
require.NoError(t, err)
fdSet := platform.FdSet{}
fds := []pollFd{}
conn.Control(func(fd uintptr) {
fdSet.Set(int(fd))
fds = append(fds, pollFd{fd: fd, events: _POLLIN})
})
n, errno := peekAllHandles(&fdSet, nil, nil)
npipes, nsockets, errno := peekAll(nil, fds)
require.Zero(t, errno)
require.Equal(t, 0, n)
require.Equal(t, 0, fdSet.Sockets().Count())
require.Equal(t, 0, npipes)
require.Equal(t, 0, nsockets)
tcpAddr, err := net.ResolveTCPAddr("tcp", listen.Addr().String())
require.NoError(t, err)
@@ -156,32 +123,27 @@ func TestSelect_Windows(t *testing.T) {
tcp.Write([]byte("wazero"))
conn.Control(func(fd uintptr) {
fdSet.Set(int(fd))
fds[0].fd = fd
})
n, errno = peekAllHandles(&fdSet, nil, nil)
npipes, nsockets, errno = peekAll(nil, fds)
require.Zero(t, errno)
require.Equal(t, 1, n)
conn.Control(func(fd uintptr) {
require.Equal(t, syscall.Handle(fd), fdSet.Sockets().Get(0))
})
require.Equal(t, 0, npipes)
require.Equal(t, 1, nsockets)
})
t.Run("selectAllHandles should return immediately when duration is zero (no data)", func(t *testing.T) {
t.Run("poll should return immediately when duration is zero (no data)", func(t *testing.T) {
r, _, err := os.Pipe()
require.NoError(t, err)
rh := syscall.Handle(r.Fd())
d := time.Duration(0)
fdSet := handleAsFdSet(rh)
n, err := selectAllHandles(testCtx, fdSet, nil, nil, &d)
fds := []pollFd{{fd: r.Fd(), events: _POLLIN}}
n, err := poll(fds, 0)
require.Zero(t, err)
require.Zero(t, n)
require.Zero(t, fdSet.Pipes().Count())
})
t.Run("selectAllHandles should return immediately when duration is zero (data)", func(t *testing.T) {
t.Run("poll should return immediately when duration is zero (data)", func(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
rh := handleAsFdSet(syscall.Handle(r.Fd()))
fds := []pollFd{{fd: r.Fd(), events: _POLLIN}}
wh := syscall.Handle(w.Fd())
// Write to the channel immediately.
@@ -191,34 +153,30 @@ func TestSelect_Windows(t *testing.T) {
require.NoError(t, err)
// Verify that the write is reported.
d := time.Duration(0)
n, err := selectAllHandles(testCtx, rh, nil, nil, &d)
n, err := poll(fds, 0)
require.Zero(t, err)
require.NotEqual(t, 0, n)
require.Equal(t, syscall.Handle(r.Fd()), rh.Pipes().Get(0))
require.Equal(t, 1, n)
})
t.Run("selectAllHandles should wait forever when duration is nil (no writes)", func(t *testing.T) {
t.Run("poll should wait forever when duration is nil (no writes)", func(t *testing.T) {
r, _, err := os.Pipe()
require.NoError(t, err)
rh := syscall.Handle(r.Fd())
ch := make(chan result, 1)
go pollToChannel(rh, nil, ch)
go pollToChannel(r.Fd(), -1, ch)
// Wait a little, then ensure no writes occurred.
<-time.After(500 * time.Millisecond)
require.Equal(t, 0, len(ch))
})
t.Run("selectAllHandles should wait forever when duration is nil", func(t *testing.T) {
t.Run("poll should wait forever when duration is nil", func(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
rh := syscall.Handle(r.Fd())
wh := syscall.Handle(w.Fd())
ch := make(chan result, 1)
go pollToChannel(rh, nil, ch)
go pollToChannel(r.Fd(), -1, ch)
// Wait a little, then ensure no writes occurred.
<-time.After(100 * time.Millisecond)
@@ -237,19 +195,16 @@ func TestSelect_Windows(t *testing.T) {
case r := <-ch:
require.Zero(t, r.err)
require.NotEqual(t, 0, r.n)
require.Equal(t, rh, r.fdSet.Pipes().Get(0))
}
})
t.Run("selectAllHandles should wait for the given duration", func(t *testing.T) {
t.Run("poll should wait for the given duration", func(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
rh := syscall.Handle(r.Fd())
wh := syscall.Handle(w.Fd())
d := 500 * time.Millisecond
ch := make(chan result, 1)
go pollToChannel(rh, &d, ch)
go pollToChannel(r.Fd(), 500, ch)
// Wait a little, then ensure no writes occurred.
<-time.After(100 * time.Millisecond)
@@ -268,18 +223,15 @@ func TestSelect_Windows(t *testing.T) {
case r := <-ch:
require.Zero(t, r.err)
require.Equal(t, 1, r.n)
require.Equal(t, rh, r.fdSet.Pipes().Get(0))
}
})
t.Run("selectAllHandles should timeout after the given duration", func(t *testing.T) {
t.Run("poll should timeout after the given duration", func(t *testing.T) {
r, _, err := os.Pipe()
require.NoError(t, err)
rh := syscall.Handle(r.Fd())
d := 200 * time.Millisecond
ch := make(chan result, 1)
go pollToChannel(rh, &d, ch)
go pollToChannel(r.Fd(), 200, ch)
// Wait a little, then ensure a message has been written to the channel.
<-time.After(300 * time.Millisecond)
@@ -289,18 +241,15 @@ func TestSelect_Windows(t *testing.T) {
res := <-ch
require.Zero(t, res.err)
require.Zero(t, res.n)
require.Zero(t, res.fdSet.Pipes().Count())
})
t.Run("selectAllHandles should return when a write occurs before the given duration", func(t *testing.T) {
t.Run("poll should return when a write occurs before the given duration", func(t *testing.T) {
r, w, err := os.Pipe()
require.NoError(t, err)
rh := syscall.Handle(r.Fd())
wh := syscall.Handle(w.Fd())
d := 600 * time.Millisecond
ch := make(chan result, 1)
go pollToChannel(rh, &d, ch)
go pollToChannel(r.Fd(), 600, ch)
<-time.After(300 * time.Millisecond)
require.Equal(t, 0, len(ch))
@@ -313,19 +262,15 @@ func TestSelect_Windows(t *testing.T) {
res := <-ch
require.Zero(t, res.err)
require.Equal(t, 1, res.n)
require.Equal(t, rh, res.fdSet.Pipes().Get(0))
})
t.Run("selectAllHandles should return when a regular file is given", func(t *testing.T) {
t.Run("poll should return when a regular file is given", func(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "ex")
defer f.Close()
require.NoError(t, err)
fh := syscall.Handle(f.Fd())
fdSet := handleAsFdSet(fh)
d := time.Duration(0)
n, errno := selectAllHandles(testCtx, fdSet, nil, nil, &d)
fds := []pollFd{{fd: f.Fd(), events: _POLLIN}}
n, errno := poll(fds, 0)
require.Zero(t, errno)
require.Equal(t, 1, n)
require.Equal(t, fh, fdSet.Regular().Get(0))
})
}

View File

@@ -1,42 +0,0 @@
package sysfs
import (
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
)
// _select waits until one or more of the file descriptors become ready for
// reading or writing.
//
// # Parameters
//
// The `timeoutMillis` parameter is how long to block for an event, or
// interrupted, in milliseconds. There are two special values:
// - zero returns immediately
// - any negative value blocks any amount of time
//
// A zero sys.Errno is success. The below are expected otherwise:
// - sys.ENOSYS: the implementation does not support this function.
// - sys.EINTR: the call was interrupted prior to an event.
//
// # Impact of blocking
//
// Because this is a blocking syscall, it will also block the carrier thread of the goroutine,
// preventing any means to support context cancellation directly.
//
// There are ways to obviate this issue. We outline here one idea, that is however not currently implemented.
// A common approach to support context cancellation is to add a signal file descriptor to the set,
// e.g. the read-end of a pipe or an eventfd on Linux.
// When the context is canceled, we may unblock a Select call by writing to the fd, causing it to return immediately.
// This however requires to do a bit of housekeeping to hide the "special" FD from the end-user.
//
// # Notes
//
// - This is like `select` in POSIX except it returns if any are ready
// instead of a specific file descriptor. See
// https://pubs.opengroup.org/onlinepubs/9699919799/functions/select.html
// - This is named _select to avoid collision on the select keyword, while
// not exporting the function.
func _select(n int, r, w, e *platform.FdSet, timeoutNanos int32) (ready bool, errno sys.Errno) {
return syscall_select(n, r, w, e, timeoutNanos)
}

View File

@@ -1,42 +0,0 @@
package sysfs
import (
"syscall"
"unsafe"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
)
// syscall_select implements _select on Darwin
//
// Note: We implement our own version instead of relying on syscall.Select
// because the latter only returns the error and discards the result.
func syscall_select(n int, r, w, e *platform.FdSet, timeoutNanos int32) (ready bool, errno sys.Errno) {
var t *syscall.Timeval
if timeoutNanos >= 0 {
tv := syscall.NsecToTimeval(int64(timeoutNanos))
t = &tv
}
r1, _, err := syscall_syscall6(
libc_select_trampoline_addr,
uintptr(n),
uintptr(unsafe.Pointer(r)),
uintptr(unsafe.Pointer(w)),
uintptr(unsafe.Pointer(e)),
uintptr(unsafe.Pointer(t)),
0)
return r1 > 0, sys.UnwrapOSError(err)
}
// libc_select_trampoline_addr is the address of the
// `libc_select_trampoline` symbol, defined in `select_darwin.s`.
//
// We use this to invoke the syscall through syscall_syscall6 imported below.
var libc_select_trampoline_addr uintptr
// Imports the select symbol from libc as `libc_select`.
//
// Note: CGO mechanisms are used in darwin regardless of the CGO_ENABLED value
// or the "cgo" build flag. See /RATIONALE.md for why.
//go:cgo_import_dynamic libc_select select "/usr/lib/libSystem.B.dylib"

View File

@@ -1,8 +0,0 @@
// lifted from golang.org/x/sys unix
#include "textflag.h"
TEXT libc_select_trampoline<>(SB), NOSPLIT, $0-0
JMP libc_select(SB)
GLOBL ·libc_select_trampoline_addr(SB), RODATA, $8
DATA ·libc_select_trampoline_addr(SB)/8, $libc_select_trampoline<>(SB)

View File

@@ -1,19 +0,0 @@
package sysfs
import (
"syscall"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
)
// syscall_select implements _select on Linux
func syscall_select(n int, r, w, e *platform.FdSet, timeoutNanos int32) (bool, sys.Errno) {
var t *syscall.Timeval
if timeoutNanos >= 0 {
tv := syscall.NsecToTimeval(int64(timeoutNanos))
t = &tv
}
n, err := syscall.Select(n, (*syscall.FdSet)(r), (*syscall.FdSet)(w), (*syscall.FdSet)(e), t)
return n > 0, sys.UnwrapOSError(err)
}

View File

@@ -1,87 +0,0 @@
package sysfs
import (
"os"
"runtime"
"testing"
"time"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
"github.com/tetratelabs/wazero/internal/testing/require"
)
func TestSelect(t *testing.T) {
t.Run("should return immediately with no fds and timeoutNanos 0", func(t *testing.T) {
for {
timeoutNanos := int32(0)
ready, errno := _select(0, nil, nil, nil, timeoutNanos)
if errno == sys.EINTR {
t.Logf("Select interrupted")
continue
}
require.EqualErrno(t, 0, errno)
require.False(t, ready)
break
}
})
t.Run("should wait for the given duration", func(t *testing.T) {
timeoutNanos := int32(250 * time.Millisecond)
var took time.Duration
for {
// On some platforms (e.g. Linux), the passed-in timeval is
// updated by select(2). We are not accounting for this
// in our implementation.
start := time.Now()
ready, errno := _select(0, nil, nil, nil, timeoutNanos)
took = time.Since(start)
if errno == sys.EINTR {
t.Logf("Select interrupted after %v", took)
continue
}
require.EqualErrno(t, 0, errno)
require.False(t, ready)
break
}
// On some platforms the actual timeout might be arbitrarily
// less than requested.
if tookNanos := int32(took.Nanoseconds()); tookNanos < timeoutNanos {
if runtime.GOOS == "linux" {
// Linux promises to only return early if a file descriptor
// becomes ready (not applicable here), or the call
// is interrupted by a signal handler (explicitly retried in the loop above),
// or the timeout expires.
t.Errorf("Select: slept for %v, expected %v", tookNanos, timeoutNanos)
} else {
t.Logf("Select: slept for %v, requested %v", tookNanos, timeoutNanos)
}
}
})
t.Run("should return 1 if a given FD has data", func(t *testing.T) {
rr, ww, err := os.Pipe()
require.NoError(t, err)
defer rr.Close()
defer ww.Close()
_, err = ww.Write([]byte("TEST"))
require.NoError(t, err)
rFdSet := &platform.FdSet{}
fd := int(rr.Fd())
rFdSet.Set(fd)
for {
ready, errno := _select(fd+1, rFdSet, nil, nil, -1)
if errno == sys.EINTR {
t.Log("Select interrupted")
continue
}
require.EqualErrno(t, 0, errno)
require.True(t, ready)
break
}
})
}

View File

@@ -1,12 +0,0 @@
//go:build !darwin && !linux && !windows
package sysfs
import (
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
)
func syscall_select(n int, r, w, e *platform.FdSet, timeoutNanos int32) (ready bool, errno sys.Errno) {
return false, sys.ENOSYS
}

View File

@@ -1,178 +0,0 @@
package sysfs
import (
"context"
"syscall"
"time"
"unsafe"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
)
// pollInterval is the interval between each calls to peekNamedPipe in selectAllHandles
const pollInterval = 100 * time.Millisecond
// zeroDuration is the zero value for time.Duration. It is used in selectAllHandles.
var zeroDuration = time.Duration(0)
// syscall_select implements _select on Windows, for a subset of cases.
//
// r, w, e may contain any number of file handles, but regular files and pipes are only processed for r (Read).
// Stdin is a pipe, thus it is checked for readiness when present. Pipes are checked using PeekNamedPipe.
// Regular files always immediately report as ready, regardless their actual state and timeouts.
//
// If n==0 it will wait for the given timeout duration, but it will return sys.ENOSYS if timeout is nil,
// i.e. it won't block indefinitely.
//
// Note: ideas taken from https://stackoverflow.com/questions/6839508/test-if-stdin-has-input-for-c-windows-and-or-linux
// PeekNamedPipe: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
func syscall_select(n int, r, w, e *platform.FdSet, timeoutNanos int32) (ready bool, errno sys.Errno) {
var timeout *time.Duration
if timeoutNanos >= 0 {
duration := time.Duration(timeoutNanos)
timeout = &duration
}
// TODO: This impl was left alone because it will change soon.
// See https://github.com/tetratelabs/wazero/pull/1596
if n == 0 {
// Don't block indefinitely.
if timeout == nil {
return false, sys.ENOSYS
}
time.Sleep(*timeout)
return false, 0
}
n, errno = selectAllHandles(context.TODO(), r, w, e, timeout)
return n > 0, errno
}
// selectAllHandles emulates a general-purpose POSIX select on Windows.
//
// The implementation actually polls every 100 milliseconds until it reaches the given duration.
// The duration may be nil, in which case it will wait undefinely. The given ctx is
// used to allow for cancellation, and it is currently used only in tests.
//
// As indicated in the man page for select [1], r, w, e are modified upon completion:
//
// "Upon successful completion, the pselect() or select() function shall modify the objects pointed to by the readfds,
// writefds, and errorfds arguments to indicate which file descriptors are ready for reading, ready for writing,
// or have an error condition pending, respectively, and shall return the total number of ready descriptors in all the output sets"
//
// However, for our purposes, this may be pedantic because currently we do not check the values of r, w, e
// after the invocation of select; thus, this behavior may be subject to change in the future for the sake of simplicity.
//
// [1]: https://linux.die.net/man/3/select
func selectAllHandles(ctx context.Context, r, w, e *platform.FdSet, duration *time.Duration) (n int, errno sys.Errno) {
r2, w2, e2 := r.Copy(), w.Copy(), e.Copy()
n, errno = peekAllHandles(r2, w2, e2)
// Short circuit when there is an error, there is data or the duration is zero.
if errno != 0 || n > 0 || (duration != nil && *duration == time.Duration(0)) {
r.SetAll(r2)
w.SetAll(w2)
e.SetAll(e2)
return
}
// Ticker that emits at every pollInterval.
tick := time.NewTicker(pollInterval)
tickCh := tick.C
defer tick.Stop()
// Timer that expires after the given duration.
// Initialize afterCh as nil: the select below will wait forever.
var afterCh <-chan time.Time
if duration != nil {
// If duration is not nil, instantiate the timer.
after := time.NewTimer(*duration)
defer after.Stop()
afterCh = after.C
}
for {
select {
case <-ctx.Done():
r.Zero()
w.Zero()
e.Zero()
return
case <-afterCh:
r.Zero()
w.Zero()
e.Zero()
return
case <-tickCh:
r2, w2, e2 = r.Copy(), w.Copy(), e.Copy()
n, errno = peekAllHandles(r2, w2, e2)
if errno != 0 || n > 0 {
r.SetAll(r2)
w.SetAll(w2)
e.SetAll(e2)
return
}
}
}
}
func peekAllHandles(r, w, e *platform.FdSet) (int, sys.Errno) {
// pipes are not checked on w, e
w.Pipes().Zero()
e.Pipes().Zero()
// peek pipes only for reading
errno := peekAllPipes(r.Pipes())
if errno != 0 {
return 0, errno
}
_, errno = winsock_select(r.Sockets(), w.Sockets(), e.Sockets(), &zeroDuration)
if errno != 0 {
return 0, errno
}
return r.Count() + w.Count() + e.Count(), 0
}
func peekAllPipes(pipeHandles *platform.WinSockFdSet) sys.Errno {
ready := &platform.WinSockFdSet{}
for i := 0; i < pipeHandles.Count(); i++ {
h := pipeHandles.Get(i)
bytes, errno := peekNamedPipe(h)
if bytes > 0 {
ready.Set(int(h))
}
if errno != 0 {
return sys.UnwrapOSError(errno)
}
}
*pipeHandles = *ready
return 0
}
func winsock_select(r, w, e *platform.WinSockFdSet, timeout *time.Duration) (int, sys.Errno) {
if r.Count() == 0 && w.Count() == 0 && e.Count() == 0 {
return 0, 0
}
var t *syscall.Timeval
if timeout != nil {
tv := syscall.NsecToTimeval(timeout.Nanoseconds())
t = &tv
}
rp := unsafe.Pointer(r)
wp := unsafe.Pointer(w)
ep := unsafe.Pointer(e)
tp := unsafe.Pointer(t)
r0, _, err := syscall.SyscallN(
procselect.Addr(),
uintptr(0), // the first argument is ignored and exists only for compat with BSD sockets.
uintptr(rp),
uintptr(wp),
uintptr(ep),
uintptr(tp))
return int(r0), sys.UnwrapOSError(err)
}

View File

@@ -5,11 +5,9 @@ package sysfs
import (
"net"
"syscall"
"time"
"unsafe"
"github.com/tetratelabs/wazero/experimental/sys"
"github.com/tetratelabs/wazero/internal/platform"
socketapi "github.com/tetratelabs/wazero/internal/sock"
)
@@ -120,10 +118,7 @@ type winTcpListenerFile struct {
func (f *winTcpListenerFile) Accept() (socketapi.TCPConn, sys.Errno) {
// Ensure we have an incoming connection using winsock_select.
n, errno := syscallConnControl(f.tl, func(fd uintptr) (int, sys.Errno) {
fdSet := platform.WinSockFdSet{}
fdSet.Set(int(fd))
t := time.Duration(0)
return winsock_select(&fdSet, nil, nil, &t)
return poll([]pollFd{newPollFd(fd, _POLLIN, 0)}, 0)
})
// Otherwise return immediately.