wasi: fix nonblocking sockets on *NIX (gotip net/http) (#1503)

Signed-off-by: Adrian Cole <adrian@tetrate.io>
Co-authored-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
This commit is contained in:
Crypt Keeper
2023-06-13 04:51:32 +08:00
committed by GitHub
parent b4d97e5e69
commit f3778cae08
10 changed files with 471 additions and 186 deletions

View File

@@ -556,9 +556,9 @@ func Test_fdFdstatSetFlags(t *testing.T) {
// Let's remove O_APPEND. // Let's remove O_APPEND.
requireErrnoResult(t, wasip1.ErrnoSuccess, mod, wasip1.FdFdstatSetFlagsName, uint64(fd), uint64(0)) requireErrnoResult(t, wasip1.ErrnoSuccess, mod, wasip1.FdFdstatSetFlagsName, uint64(fd), uint64(0))
require.Equal(t, ` require.Equal(t, `
==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=0) ==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=)
<== errno=ESUCCESS <== errno=ESUCCESS
`, "\n"+log.String()) `, "\n"+log.String()) // FIXME? flags==0 prints 'flags='
log.Reset() log.Reset()
// Without O_APPEND flag, the data is written at the beginning. // Without O_APPEND flag, the data is written at the beginning.
@@ -568,9 +568,9 @@ func Test_fdFdstatSetFlags(t *testing.T) {
// Restore the O_APPEND flag. // Restore the O_APPEND flag.
requireErrnoResult(t, wasip1.ErrnoSuccess, mod, wasip1.FdFdstatSetFlagsName, uint64(fd), uint64(wasip1.FD_APPEND)) requireErrnoResult(t, wasip1.ErrnoSuccess, mod, wasip1.FdFdstatSetFlagsName, uint64(fd), uint64(wasip1.FD_APPEND))
require.Equal(t, ` require.Equal(t, `
==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=1) ==> wasi_snapshot_preview1.fd_fdstat_set_flags(fd=4,flags=APPEND)
<== errno=ESUCCESS <== errno=ESUCCESS
`, "\n"+log.String()) `, "\n"+log.String()) // FIXME? flags==1 prints 'flags=APPEND'
log.Reset() log.Reset()
// with O_APPEND flag, the data is appended to buffer. // with O_APPEND flag, the data is appended to buffer.

View File

@@ -2,8 +2,11 @@ package main
import ( import (
"fmt" "fmt"
"io"
"net" "net"
"net/http"
"os" "os"
"syscall"
) )
func main() { func main() {
@@ -12,9 +15,14 @@ func main() {
if err := mainSock(); err != nil { if err := mainSock(); err != nil {
panic(err) panic(err)
} }
case "http":
if err := mainHTTP(); err != nil {
panic(err)
}
} }
} }
// mainSock is an explicit test of a blocking socket.
func mainSock() error { func mainSock() error {
// Get a listener from the pre-opened file descriptor. // Get a listener from the pre-opened file descriptor.
// The listener is the first pre-open, with a file-descriptor of 3. // The listener is the first pre-open, with a file-descriptor of 3.
@@ -43,3 +51,52 @@ func mainSock() error {
fmt.Println(string(buf[:n])) fmt.Println(string(buf[:n]))
return nil return nil
} }
// mainHTTP implicitly tests non-blocking sockets, as they are needed for
// middleware.
func mainHTTP() error {
// Get the file representing a pre-opened TCP socket.
// The socket (listener) is the first pre-open, with a file-descriptor of
// 3 because the host didn't add any pre-opened files.
listenerFD := 3
f := os.NewFile(uintptr(listenerFD), "")
// Wasm runs similarly to GOMAXPROCS=1, so multiple goroutines cannot work
// in parallel. non-blocking allows the poller to park the go-routine
// accepting connections while work is done on one.
if err := syscall.SetNonblock(listenerFD, true); err != nil {
return err
}
// Convert the file representing the pre-opened socket to a listener, so
// that we can integrate it with HTTP middleware.
ln, err := net.FileListener(f)
defer f.Close()
if err != nil {
return err
}
defer ln.Close()
// Serve middleware that echos the request body to the response once, then quits.
h := &echoOnce{ch: make(chan struct{}, 1)}
go http.Serve(ln, h)
<-h.ch
return nil
}
type echoOnce struct {
ch chan struct{}
}
func (e echoOnce) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Copy up to 32 bytes from the request to the response, appending a newline.
// Note: the test should write: "wazero", so that's all we should read.
var buf [32]byte
if n, err := r.Body.Read(buf[:]); err != nil && err != io.EOF {
panic(err)
} else if n, err = w.Write(append(buf[:n], '\n')); err != nil {
panic(err)
}
// Once one request was served, close the channel.
close(e.ch)
}

View File

@@ -7,6 +7,8 @@ import (
"io" "io"
"io/fs" "io/fs"
"net" "net"
"net/http"
"runtime"
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
@@ -23,6 +25,10 @@ import (
"github.com/tetratelabs/wazero/sys" "github.com/tetratelabs/wazero/sys"
) )
// sleepALittle directly slows down test execution. So, use this sparingly and
// only when so where proper signals are unavailable.
var sleepALittle = func() { time.Sleep(500 * time.Millisecond) }
// This file ensures that the behavior we've implemented not only the wasi // This file ensures that the behavior we've implemented not only the wasi
// spec, but also at least two compilers use of sdks. // spec, but also at least two compilers use of sdks.
@@ -383,7 +389,7 @@ func testSock(t *testing.T, bin []byte) {
tcpAddr := <-tcpAddrCh tcpAddr := <-tcpAddrCh
// Give a little time for _start to complete // Give a little time for _start to complete
time.Sleep(800 * time.Millisecond) sleepALittle()
// Now dial to the initial address, which should be now held by wazero. // Now dial to the initial address, which should be now held by wazero.
conn, err := net.Dial("tcp", tcpAddr.String()) conn, err := net.Dial("tcp", tcpAddr.String())
@@ -396,3 +402,58 @@ func testSock(t *testing.T, bin []byte) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "wazero\n", console) require.Equal(t, "wazero\n", console)
} }
func Test_HTTP(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("syscall.Nonblocking() is not supported on wasip1+windows.")
}
toolchains := map[string][]byte{}
if wasmGotip != nil {
toolchains["gotip"] = wasmGotip
}
for toolchain, bin := range toolchains {
toolchain := toolchain
bin := bin
t.Run(toolchain, func(t *testing.T) {
testHTTP(t, bin)
})
}
}
func testHTTP(t *testing.T, bin []byte) {
sockCfg := experimentalsock.NewConfig().WithTCPListener("127.0.0.1", 0)
ctx := experimentalsock.WithConfig(testCtx, sockCfg)
moduleConfig := wazero.NewModuleConfig().
WithSysWalltime().WithSysNanotime(). // HTTP middleware uses both clocks
WithArgs("wasi", "http")
tcpAddrCh := make(chan *net.TCPAddr, 1)
ch := make(chan string, 1)
go func() {
ch <- compileAndRunWithPreStart(t, ctx, moduleConfig, bin, func(t *testing.T, mod api.Module) {
tcpAddrCh <- requireTCPListenerAddr(t, mod)
})
}()
tcpAddr := <-tcpAddrCh
// Give a little time for _start to complete
sleepALittle()
// Now, send a POST to the address which we had pre-opened.
body := bytes.NewReader([]byte("wazero"))
req, err := http.NewRequest(http.MethodPost, "http://"+tcpAddr.String(), body)
require.NoError(t, err)
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, 200, resp.StatusCode)
b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, "wazero\n", string(b))
console := <-ch
require.Equal(t, "", console)
}

View File

@@ -7,14 +7,13 @@ import (
"strings" "strings"
"syscall" "syscall"
"testing" "testing"
"time"
"github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/internal/testing/require" "github.com/tetratelabs/wazero/internal/testing/require"
) )
func Test_Nonblock(t *testing.T) { func Test_NonblockingFile(t *testing.T) {
const fifo = "/test-fifo" const fifo = "/test-fifo"
tempDir := t.TempDir() tempDir := t.TempDir()
fifoAbsPath := tempDir + fifo fifoAbsPath := tempDir + fifo
@@ -38,9 +37,9 @@ func Test_Nonblock(t *testing.T) {
// Wait for the dummy value, then start the sleep. // Wait for the dummy value, then start the sleep.
require.Equal(t, "ready", <-ch) require.Equal(t, "ready", <-ch)
// The test writes a few dots on the console until the pipe has data ready for reading, // The test writes a few dots on the console until the pipe has data ready
// so we wait for a little to ensure those dots are printed. // for reading. So, so we wait to ensure those dots are printed.
time.Sleep(500 * time.Millisecond) sleepALittle()
f, err := os.OpenFile(fifoAbsPath, os.O_APPEND|os.O_WRONLY, 0) f, err := os.OpenFile(fifoAbsPath, os.O_APPEND|os.O_WRONLY, 0)
require.NoError(t, err) require.NoError(t, err)

View File

@@ -6,149 +6,32 @@ import (
"syscall" "syscall"
"github.com/tetratelabs/wazero/internal/fsapi" "github.com/tetratelabs/wazero/internal/fsapi"
"github.com/tetratelabs/wazero/internal/platform"
socketapi "github.com/tetratelabs/wazero/internal/sock" socketapi "github.com/tetratelabs/wazero/internal/sock"
) )
// NewTCPListenerFile creates a socketapi.TCPSock for a given *net.TCPListener.
func NewTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock { func NewTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
return &tcpListenerFile{tl: tl} return newTCPListenerFile(tl)
} }
var _ socketapi.TCPSock = (*tcpListenerFile)(nil) // baseSockFile implements base behavior for all TCPSock, TCPConn files,
// regardless the platform.
type tcpListenerFile struct { type baseSockFile struct {
fsapi.UnimplementedFile fsapi.UnimplementedFile
tl *net.TCPListener
} }
// Accept implements the same method as documented on socketapi.TCPSock var _ fsapi.File = (*baseSockFile)(nil)
func (f *tcpListenerFile) Accept() (socketapi.TCPConn, syscall.Errno) {
conn, err := f.tl.Accept()
if err != nil {
return nil, platform.UnwrapOSError(err)
}
return &tcpConnFile{tc: conn.(*net.TCPConn)}, 0
}
// IsDir implements the same method as documented on File.IsDir // IsDir implements the same method as documented on File.IsDir
func (*tcpListenerFile) IsDir() (bool, syscall.Errno) { func (*baseSockFile) IsDir() (bool, syscall.Errno) {
// We need to override this method because WASI-libc prestats the FD // We need to override this method because WASI-libc prestats the FD
// and the default impl returns ENOSYS otherwise. // and the default impl returns ENOSYS otherwise.
return false, 0 return false, 0
} }
// Stat implements the same method as documented on File.Stat // Stat implements the same method as documented on File.Stat
func (f *tcpListenerFile) Stat() (fs fsapi.Stat_t, errno syscall.Errno) { func (f *baseSockFile) Stat() (fs fsapi.Stat_t, errno syscall.Errno) {
// The mode is not really important, but it should be neither a regular file nor a directory. // The mode is not really important, but it should be neither a regular file nor a directory.
fs.Mode = os.ModeIrregular fs.Mode = os.ModeIrregular
return return
} }
// Close implements the same method as documented on fsapi.File
func (f *tcpListenerFile) Close() syscall.Errno {
return platform.UnwrapOSError(f.tl.Close())
}
// Addr is exposed for testing.
func (f *tcpListenerFile) Addr() *net.TCPAddr {
return f.tl.Addr().(*net.TCPAddr)
}
var _ socketapi.TCPConn = (*tcpConnFile)(nil)
type tcpConnFile struct {
fsapi.UnimplementedFile
tc *net.TCPConn
// closed is true when closed was called. This ensures proper syscall.EBADF
closed bool
}
// IsDir implements the same method as documented on File.IsDir
func (*tcpConnFile) IsDir() (bool, syscall.Errno) {
// We need to override this method because WASI-libc prestats the FD
// and the default impl returns ENOSYS otherwise.
return false, 0
}
// Stat implements the same method as documented on File.Stat
func (f *tcpConnFile) Stat() (fs fsapi.Stat_t, errno syscall.Errno) {
// The mode is not really important, but it should be neither a regular file nor a directory.
fs.Mode = os.ModeIrregular
return
}
// SetNonblock implements the same method as documented on fsapi.File
func (f *tcpConnFile) SetNonblock(enabled bool) (errno syscall.Errno) {
syscallConn, err := f.tc.SyscallConn()
if err != nil {
return platform.UnwrapOSError(err)
}
// Prioritize the error from setNonblock over Control
if controlErr := syscallConn.Control(func(fd uintptr) {
errno = platform.UnwrapOSError(setNonblock(fd, enabled))
}); errno == 0 {
errno = platform.UnwrapOSError(controlErr)
}
return
}
// Read implements the same method as documented on fsapi.File
func (f *tcpConnFile) Read(buf []byte) (n int, errno syscall.Errno) {
if n, errno = read(f.tc, buf); errno != 0 {
// Defer validation overhead until we've already had an error.
errno = fileError(f, f.closed, errno)
}
return
}
// Write implements the same method as documented on fsapi.File
func (f *tcpConnFile) Write(buf []byte) (n int, errno syscall.Errno) {
if n, errno = write(f.tc, buf); errno != 0 {
// Defer validation overhead until we've alwritey had an error.
errno = fileError(f, f.closed, errno)
}
return
}
// Recvfrom implements the same method as documented on socketapi.TCPConn
func (f *tcpConnFile) Recvfrom(p []byte, flags int) (n int, errno syscall.Errno) {
if flags != MSG_PEEK {
errno = syscall.EINVAL
return
}
return recvfromPeek(f.tc, p)
}
// Shutdown implements the same method as documented on fsapi.Conn
func (f *tcpConnFile) Shutdown(how int) syscall.Errno {
// FIXME: can userland shutdown listeners?
var err error
switch how {
case syscall.SHUT_RD:
err = f.tc.CloseRead()
case syscall.SHUT_WR:
err = f.tc.CloseWrite()
case syscall.SHUT_RDWR:
return f.close()
default:
return syscall.EINVAL
}
return platform.UnwrapOSError(err)
}
// Close implements the same method as documented on fsapi.File
func (f *tcpConnFile) Close() syscall.Errno {
return f.close()
}
func (f *tcpConnFile) close() syscall.Errno {
if f.closed {
return 0
}
f.closed = true
return f.Shutdown(syscall.SHUT_RDWR)
}

View File

@@ -2,7 +2,9 @@ package sysfs
import ( import (
"net" "net"
"syscall"
"testing" "testing"
"time"
"github.com/tetratelabs/wazero/internal/testing/require" "github.com/tetratelabs/wazero/internal/testing/require"
) )
@@ -18,10 +20,18 @@ func TestTcpConnFile_Write(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer tcp.Close() //nolint defer tcp.Close() //nolint
file := tcpConnFile{tc: tcp} file := newTcpConn(tcp)
n, errno := file.Write([]byte("wazero")) errno := syscall.Errno(0)
// Ensure we don't interrupt until we get a non-zero errno,
// and we retry on EAGAIN (i.e. when nonblocking is true).
for {
_, errno = file.Write([]byte("wazero"))
if errno != syscall.EAGAIN {
break
}
time.Sleep(100 * time.Millisecond)
}
require.Zero(t, errno) require.Zero(t, errno)
require.NotEqual(t, 0, n)
conn, err := listen.Accept() conn, err := listen.Accept()
require.NoError(t, err) require.NoError(t, err)
@@ -29,7 +39,7 @@ func TestTcpConnFile_Write(t *testing.T) {
bytes := make([]byte, 4) bytes := make([]byte, 4)
n, err = conn.Read(bytes) n, err := conn.Read(bytes)
require.NoError(t, err) require.NoError(t, err)
require.NotEqual(t, 0, n) require.NotEqual(t, 0, n)
@@ -57,11 +67,20 @@ func TestTcpConnFile_Read(t *testing.T) {
bytes := make([]byte, 4) bytes := make([]byte, 4)
file := tcpConnFile{tc: conn.(*net.TCPConn)} require.NoError(t, err)
n, errno := file.Read(bytes) errno := syscall.Errno(0)
file := newTcpConn(conn.(*net.TCPConn))
// Ensure we don't interrupt until we get a non-zero errno,
// and we retry on EAGAIN (i.e. when nonblocking is true).
for {
_, errno = file.Read(bytes)
if errno != syscall.EAGAIN {
break
}
time.Sleep(100 * time.Millisecond)
}
require.Zero(t, errno) require.Zero(t, errno)
require.NotEqual(t, 0, n) require.NoError(t, err)
require.Equal(t, "waze", string(bytes)) require.Equal(t, "waze", string(bytes))
} }
@@ -80,7 +99,7 @@ func TestTcpConnFile_Stat(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() defer conn.Close()
file := tcpConnFile{tc: conn.(*net.TCPConn)} file := newTcpConn(tcp)
_, errno := file.Stat() _, errno := file.Stat()
require.Zero(t, errno, "Stat should not fail") require.Zero(t, errno, "Stat should not fail")
} }

View File

@@ -7,24 +7,149 @@ import (
"syscall" "syscall"
"github.com/tetratelabs/wazero/internal/platform" "github.com/tetratelabs/wazero/internal/platform"
socketapi "github.com/tetratelabs/wazero/internal/sock"
) )
// MSG_PEEK is the constant syscall.MSG_PEEK
const MSG_PEEK = syscall.MSG_PEEK const MSG_PEEK = syscall.MSG_PEEK
// recvfromPeek exposes syscall.Recvfrom with flag MSG_PEEK on POSIX systems. // newTCPListenerFile is a constructor for a socketapi.TCPSock.
func recvfromPeek(conn *net.TCPConn, p []byte) (n int, errno syscall.Errno) { //
syscallConn, err := conn.SyscallConn() // Note: the implementation of socketapi.TCPSock goes straight
// to the syscall layer, bypassing most of the Go library.
// For an alternative approach, consider winTcpListenerFile
// where most APIs are implemented with regular Go std-lib calls.
func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
conn, err := tl.File()
if err != nil { if err != nil {
return 0, platform.UnwrapOSError(err) panic(err)
} }
fd := conn.Fd()
// Prioritize the error from Recvfrom over Control // We need to duplicate this file handle, or the lifecycle will be tied
if controlErr := syscallConn.Control(func(fd uintptr) { // to the TCPListener. We rely on the TCPListener only to set up
var recvfromErr error // the connection correctly and parse/resolve the TCP Address
n, _, recvfromErr = syscall.Recvfrom(int(fd), p, MSG_PEEK) // (notice we actually rely on the listener in the Windows implementation).
errno = platform.UnwrapOSError(recvfromErr) sysfd, err := syscall.Dup(int(fd))
}); errno == 0 { if err != nil {
errno = platform.UnwrapOSError(controlErr) panic(err)
} }
return return &tcpListenerFile{fd: uintptr(sysfd), addr: tl.Addr().(*net.TCPAddr)}
}
var _ socketapi.TCPSock = (*tcpListenerFile)(nil)
type tcpListenerFile struct {
baseSockFile
fd uintptr
addr *net.TCPAddr
}
// Accept implements the same method as documented on socketapi.TCPSock
func (f *tcpListenerFile) Accept() (socketapi.TCPConn, syscall.Errno) {
nfd, _, err := syscall.Accept(int(f.fd))
errno := platform.UnwrapOSError(err)
if errno != 0 {
return nil, errno
}
return &tcpConnFile{fd: uintptr(nfd)}, 0
}
// SetNonblock implements the same method as documented on fsapi.File
func (f *tcpListenerFile) SetNonblock(enabled bool) syscall.Errno {
return platform.UnwrapOSError(setNonblock(f.fd, enabled))
}
// Close implements the same method as documented on fsapi.File
func (f *tcpListenerFile) Close() syscall.Errno {
return platform.UnwrapOSError(syscall.Close(int(f.fd)))
}
// Addr is exposed for testing.
func (f *tcpListenerFile) Addr() *net.TCPAddr {
return f.addr
}
var _ socketapi.TCPConn = (*tcpConnFile)(nil)
type tcpConnFile struct {
baseSockFile
fd uintptr
// closed is true when closed was called. This ensures proper syscall.EBADF
closed bool
}
func newTcpConn(tc *net.TCPConn) socketapi.TCPConn {
f, err := tc.File()
if err != nil {
panic(err)
}
return &tcpConnFile{fd: f.Fd()}
}
// SetNonblock implements the same method as documented on fsapi.File
func (f *tcpConnFile) SetNonblock(enabled bool) (errno syscall.Errno) {
return platform.UnwrapOSError(setNonblock(f.fd, enabled))
}
// Read implements the same method as documented on fsapi.File
func (f *tcpConnFile) Read(buf []byte) (n int, errno syscall.Errno) {
n, err := syscall.Read(int(f.fd), buf)
if err != nil {
// Defer validation overhead until we've already had an error.
errno = platform.UnwrapOSError(err)
errno = fileError(f, f.closed, errno)
}
return n, errno
}
// Write implements the same method as documented on fsapi.File
func (f *tcpConnFile) Write(buf []byte) (n int, errno syscall.Errno) {
n, err := syscall.Write(int(f.fd), buf)
if err != nil {
// Defer validation overhead until we've already had an error.
errno = platform.UnwrapOSError(err)
errno = fileError(f, f.closed, errno)
}
return n, errno
}
// Recvfrom implements the same method as documented on socketapi.TCPConn
func (f *tcpConnFile) Recvfrom(p []byte, flags int) (n int, errno syscall.Errno) {
if flags != MSG_PEEK {
errno = syscall.EINVAL
return
}
n, _, recvfromErr := syscall.Recvfrom(int(f.fd), p, MSG_PEEK)
errno = platform.UnwrapOSError(recvfromErr)
return n, errno
}
// Shutdown implements the same method as documented on fsapi.Conn
func (f *tcpConnFile) Shutdown(how int) syscall.Errno {
var err error
switch how {
case syscall.SHUT_RD, syscall.SHUT_WR:
err = syscall.Shutdown(int(f.fd), how)
case syscall.SHUT_RDWR:
return f.close()
default:
return syscall.EINVAL
}
return platform.UnwrapOSError(err)
}
// Close implements the same method as documented on fsapi.File
func (f *tcpConnFile) Close() syscall.Errno {
return f.close()
}
func (f *tcpConnFile) close() syscall.Errno {
if f.closed {
return 0
}
f.closed = true
return platform.UnwrapOSError(syscall.Shutdown(int(f.fd), syscall.SHUT_RDWR))
} }

View File

@@ -5,11 +5,22 @@ package sysfs
import ( import (
"net" "net"
"syscall" "syscall"
socketapi "github.com/tetratelabs/wazero/internal/sock"
) )
// MSG_PEEK is a filler value // MSG_PEEK is a filler value.
const MSG_PEEK = 0x2 const MSG_PEEK = 0x2
func recvfromPeek(conn *net.TCPConn, p []byte) (n int, errno syscall.Errno) { func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
return 0, syscall.ENOSYS return &unsupportedSockFile{}
}
type unsupportedSockFile struct {
baseSockFile
}
// Accept implements the same method as documented on socketapi.TCPSock
func (f *unsupportedSockFile) Accept() (socketapi.TCPConn, syscall.Errno) {
return nil, syscall.ENOSYS
} }

View File

@@ -8,31 +8,13 @@ import (
"unsafe" "unsafe"
"github.com/tetratelabs/wazero/internal/platform" "github.com/tetratelabs/wazero/internal/platform"
socketapi "github.com/tetratelabs/wazero/internal/sock"
) )
// MSG_PEEK is the flag PEEK for syscall.Recvfrom on Windows. // MSG_PEEK is the flag PEEK for syscall.Recvfrom on Windows.
// This constant is not exported on this platform. // This constant is not exported on this platform.
const MSG_PEEK = 0x2 const MSG_PEEK = 0x2
// recvfromPeek exposes syscall.Recvfrom with flag MSG_PEEK on Windows.
func recvfromPeek(conn *net.TCPConn, p []byte) (n int, errno syscall.Errno) {
syscallConn, err := conn.SyscallConn()
if err != nil {
errno = platform.UnwrapOSError(err)
return
}
// Prioritize the error from recvfrom over Control
if controlErr := syscallConn.Control(func(fd uintptr) {
var recvfromErr error
n, recvfromErr = recvfrom(syscall.Handle(fd), p, MSG_PEEK)
errno = platform.UnwrapOSError(recvfromErr)
}); errno == 0 {
errno = platform.UnwrapOSError(controlErr)
}
return
}
var ( var (
// modws2_32 is WinSock. // modws2_32 is WinSock.
modws2_32 = syscall.NewLazyDLL("ws2_32.dll") modws2_32 = syscall.NewLazyDLL("ws2_32.dll")
@@ -61,3 +43,150 @@ func recvfrom(s syscall.Handle, buf []byte, flags int32) (n int, errno syscall.E
0) // fromlen *int (optional) 0) // fromlen *int (optional)
return int(r0), e1 return int(r0), e1
} }
// newTCPListenerFile is a constructor for a socketapi.TCPSock.
//
// Note: currently the Windows implementation of socketapi.TCPSock
// returns a winTcpListenerFile, which is a specialized TCPSock
// that delegates to a .net.TCPListener.
// The current strategy is to delegate most behavior to the Go
// standard library, instead of invoke syscalls/Win32 APIs
// because they are sensibly different from Unix's.
func newTCPListenerFile(tl *net.TCPListener) socketapi.TCPSock {
return &winTcpListenerFile{tl: tl}
}
var _ socketapi.TCPSock = (*winTcpListenerFile)(nil)
type winTcpListenerFile struct {
baseSockFile
tl *net.TCPListener
}
// Accept implements the same method as documented on socketapi.TCPSock
func (f *winTcpListenerFile) Accept() (socketapi.TCPConn, syscall.Errno) {
conn, err := f.tl.Accept()
if err != nil {
return nil, platform.UnwrapOSError(err)
}
return &winTcpConnFile{tc: conn.(*net.TCPConn)}, 0
}
// SetNonblock implements the same method as documented on fsapi.File
func (f *winTcpListenerFile) SetNonblock(enabled bool) syscall.Errno {
return 0 // setNonblock() is a no-op on Windows
}
// Close implements the same method as documented on fsapi.File
func (f *winTcpListenerFile) Close() syscall.Errno {
return platform.UnwrapOSError(f.tl.Close())
}
// Addr is exposed for testing.
func (f *winTcpListenerFile) Addr() *net.TCPAddr {
return f.tl.Addr().(*net.TCPAddr)
}
var _ socketapi.TCPConn = (*winTcpConnFile)(nil)
type winTcpConnFile struct {
baseSockFile
tc *net.TCPConn
// closed is true when closed was called. This ensures proper syscall.EBADF
closed bool
}
func newTcpConn(tc *net.TCPConn) socketapi.TCPConn {
return &winTcpConnFile{tc: tc}
}
// SetNonblock implements the same method as documented on fsapi.File
func (f *winTcpConnFile) SetNonblock(enabled bool) (errno syscall.Errno) {
syscallConn, err := f.tc.SyscallConn()
if err != nil {
return platform.UnwrapOSError(err)
}
// Prioritize the error from setNonblock over Control
if controlErr := syscallConn.Control(func(fd uintptr) {
errno = platform.UnwrapOSError(setNonblock(fd, enabled))
}); errno == 0 {
errno = platform.UnwrapOSError(controlErr)
}
return
}
// Read implements the same method as documented on fsapi.File
func (f *winTcpConnFile) Read(buf []byte) (n int, errno syscall.Errno) {
if n, errno = read(f.tc, buf); errno != 0 {
// Defer validation overhead until we've already had an error.
errno = fileError(f, f.closed, errno)
}
return
}
// Write implements the same method as documented on fsapi.File
func (f *winTcpConnFile) Write(buf []byte) (n int, errno syscall.Errno) {
if n, errno = write(f.tc, buf); errno != 0 {
// Defer validation overhead until we've already had an error.
errno = fileError(f, f.closed, errno)
}
return
}
// Recvfrom implements the same method as documented on socketapi.TCPConn
func (f *winTcpConnFile) Recvfrom(p []byte, flags int) (n int, errno syscall.Errno) {
if flags != MSG_PEEK {
errno = syscall.EINVAL
return
}
conn := f.tc
syscallConn, err := conn.SyscallConn()
if err != nil {
errno = platform.UnwrapOSError(err)
return
}
// Prioritize the error from recvfrom over Control
if controlErr := syscallConn.Control(func(fd uintptr) {
var recvfromErr error
n, recvfromErr = recvfrom(syscall.Handle(fd), p, MSG_PEEK)
errno = platform.UnwrapOSError(recvfromErr)
}); errno == 0 {
errno = platform.UnwrapOSError(controlErr)
}
return
}
// Shutdown implements the same method as documented on fsapi.Conn
func (f *winTcpConnFile) Shutdown(how int) syscall.Errno {
// FIXME: can userland shutdown listeners?
var err error
switch how {
case syscall.SHUT_RD:
err = f.tc.CloseRead()
case syscall.SHUT_WR:
err = f.tc.CloseWrite()
case syscall.SHUT_RDWR:
return f.close()
default:
return syscall.EINVAL
}
return platform.UnwrapOSError(err)
}
// Close implements the same method as documented on fsapi.File
func (f *winTcpConnFile) Close() syscall.Errno {
return f.close()
}
func (f *winTcpConnFile) close() syscall.Errno {
if f.closed {
return 0
}
f.closed = true
return f.Shutdown(syscall.SHUT_RDWR)
}

View File

@@ -131,19 +131,20 @@ func Config(fnd api.FunctionDefinition) (pSampler logging.ParamSampler, pLoggers
switch name { switch name {
case "id": case "id":
logger = logClockId(idx).Log logger = logClockId(idx).Log
pLoggers = append(pLoggers, logger)
case "result.resolution": case "result.resolution":
name = resultParamName(name) name = resultParamName(name)
logger = logMemI32(idx).Log logger = logMemI32(idx).Log
rLoggers = append(rLoggers, resultParamLogger(name, logger)) rLoggers = append(rLoggers, resultParamLogger(name, logger))
continue
case "result.timestamp": case "result.timestamp":
name = resultParamName(name) name = resultParamName(name)
logger = logMemI64(idx).Log logger = logMemI64(idx).Log
rLoggers = append(rLoggers, resultParamLogger(name, logger)) rLoggers = append(rLoggers, resultParamLogger(name, logger))
continue
default: default:
logger = logging.NewParamLogger(idx, name, types[idx]) logger = logging.NewParamLogger(idx, name, types[idx])
pLoggers = append(pLoggers, logger)
} }
pLoggers = append(pLoggers, logger)
continue continue
} }
@@ -151,33 +152,33 @@ func Config(fnd api.FunctionDefinition) (pSampler logging.ParamSampler, pLoggers
switch name { switch name {
case "flags": case "flags":
logger = logFlags(idx).Log logger = logFlags(idx).Log
pLoggers = append(pLoggers, logger)
case "ri_flags": case "ri_flags":
logger = logRiFlags(idx).Log logger = logRiFlags(idx).Log
pLoggers = append(pLoggers, logger)
case "si_flags": case "si_flags":
logger = logSiFlags(idx).Log logger = logSiFlags(idx).Log
pLoggers = append(pLoggers, logger)
case "how": case "how":
logger = logSdFlags(idx).Log logger = logSdFlags(idx).Log
pLoggers = append(pLoggers, logger)
case "result.fd", "result.ro_datalen", "result.so_datalen": case "result.fd", "result.ro_datalen", "result.so_datalen":
name = resultParamName(name) name = resultParamName(name)
logger = logMemI32(idx).Log logger = logMemI32(idx).Log
rLoggers = append(rLoggers, resultParamLogger(name, logger)) rLoggers = append(rLoggers, resultParamLogger(name, logger))
continue
case "result.ro_flags": case "result.ro_flags":
logger = logRoFlags(idx).Log logger = logRoFlags(idx).Log
rLoggers = append(rLoggers, resultParamLogger("ro_flags", logger)) rLoggers = append(rLoggers, resultParamLogger("ro_flags", logger))
continue
default: default:
logger = logging.NewParamLogger(idx, name, types[idx]) logger = logging.NewParamLogger(idx, name, types[idx])
pLoggers = append(pLoggers, logger)
} }
pLoggers = append(pLoggers, logger)
continue continue
} }
switch name { switch name {
case "fdflags": case "fdflags":
logger = logFdflags(idx).Log logger = logFdflags(idx).Log
case "flags":
logger = logFlags(idx).Log
case "fst_flags": case "fst_flags":
logger = logFstflags(idx).Log logger = logFstflags(idx).Log
case "oflags": case "oflags":