wasi: nonblocking pipes on Windows (#1570)

Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com>
This commit is contained in:
Edoardo Vacchi
2023-07-10 00:50:26 +02:00
committed by GitHub
parent 0d0db0e12f
commit d3f09bdcff
9 changed files with 128 additions and 46 deletions

View File

@@ -498,18 +498,42 @@ func Test_Stdin(t *testing.T) {
}
func testStdin(t *testing.T, bin []byte) {
r, w, err := os.Pipe()
stdinReader, stdinWriter, err := os.Pipe()
require.NoError(t, err)
stdoutReader, stdoutWriter, err := os.Pipe()
require.NoError(t, err)
defer func() {
stdinReader.Close()
stdinWriter.Close()
stdoutReader.Close()
stdoutReader.Close()
}()
require.NoError(t, err)
moduleConfig := wazero.NewModuleConfig().
WithSysNanotime(). // poll_oneoff requires nanotime.
WithArgs("wasi", "stdin").
WithStdin(r)
ch := make(chan string, 1)
WithStdin(stdinReader).
WithStdout(stdoutWriter)
ch := make(chan struct{}, 1)
go func() {
ch <- compileAndRun(t, testCtx, moduleConfig, bin)
defer close(ch)
r := wazero.NewRuntime(testCtx)
defer r.Close(testCtx)
_, err := wasi_snapshot_preview1.Instantiate(testCtx, r)
require.NoError(t, err)
_, err = r.InstantiateWithConfig(testCtx, bin, moduleConfig)
require.NoError(t, err)
}()
time.Sleep(1 * time.Second)
_, _ = w.WriteString("foo")
s := <-ch
require.Equal(t, "waiting for stdin...\nfoo", s)
buf := make([]byte, 21)
_, _ = stdoutReader.Read(buf)
require.Equal(t, "waiting for stdin...\n", string(buf))
_, _ = stdinWriter.WriteString("foo")
_ = stdinWriter.Close()
buf = make([]byte, 3)
_, _ = stdoutReader.Read(buf)
require.Equal(t, "foo", string(buf))
<-ch
}

View File

@@ -87,11 +87,7 @@ func TestReadFdNonblock(t *testing.T) {
// Read from the file without ever writing to it should not block.
buf := make([]byte, 8)
_, e := readFd(fd, buf)
if runtime.GOOS == "windows" {
require.EqualErrno(t, syscall.ENOSYS, e)
} else {
require.EqualErrno(t, syscall.EAGAIN, e)
}
}
func TestFileSetAppend(t *testing.T) {

View File

@@ -1,4 +1,4 @@
//go:build !unix && !linux && !darwin
//go:build !unix && !linux && !darwin && !windows
package sysfs

View File

@@ -0,0 +1,61 @@
package sysfs
import (
"syscall"
"unsafe"
"github.com/tetratelabs/wazero/internal/platform"
)
const NonBlockingFileIoSupported = true
var kernel32 = syscall.NewLazyDLL("kernel32.dll")
// procPeekNamedPipe is the syscall.LazyProc in kernel32 for PeekNamedPipe
var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe")
// readFd returns ENOSYS on unsupported platforms.
//
// PeekNamedPipe: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
// "GetFileType can assist in determining what device type the handle refers to. A console handle presents as FILE_TYPE_CHAR."
// https://learn.microsoft.com/en-us/windows/console/console-handles
func readFd(fd uintptr, buf []byte) (int, syscall.Errno) {
handle := syscall.Handle(fd)
fileType, err := syscall.GetFileType(syscall.Stdin)
if err != nil {
return 0, platform.UnwrapOSError(err)
}
if fileType&syscall.FILE_TYPE_CHAR == 0 {
return -1, syscall.ENOSYS
}
n, err := peekNamedPipe(handle)
if err != nil {
errno := platform.UnwrapOSError(err)
if errno == syscall.ERROR_BROKEN_PIPE {
return 0, 0
}
}
if n == 0 {
return -1, syscall.EAGAIN
}
un, err := syscall.Read(handle, buf[0:n])
return un, platform.UnwrapOSError(err)
}
// peekNamedPipe partially exposes PeekNamedPipe from the Win32 API
// see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
func peekNamedPipe(handle syscall.Handle) (uint32, error) {
var totalBytesAvail uint32
totalBytesPtr := unsafe.Pointer(&totalBytesAvail)
_, _, err := procPeekNamedPipe.Call(
uintptr(handle), // [in] HANDLE hNamedPipe,
0, // [out, optional] LPVOID lpBuffer,
0, // [in] DWORD nBufferSize,
0, // [out, optional] LPDWORD lpBytesRead
uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail,
0) // [out, optional] LPDWORD lpBytesLeftThisMessage
if err == syscall.Errno(0) {
return totalBytesAvail, nil
}
return totalBytesAvail, err
}

View File

@@ -2,8 +2,16 @@
package sysfs
import "syscall"
import (
"syscall"
"github.com/tetratelabs/wazero/internal/fsapi"
)
func setNonblock(fd uintptr, enable bool) error {
return syscall.SetNonblock(int(fd), enable)
}
func isNonblock(f *osFile) bool {
return f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK
}

View File

@@ -2,8 +2,24 @@
package sysfs
import "syscall"
import (
"io/fs"
"syscall"
"github.com/tetratelabs/wazero/internal/fsapi"
)
func setNonblock(fd uintptr, enable bool) error {
// We invoke the syscall, but this is currently no-op.
return syscall.SetNonblock(syscall.Handle(fd), enable)
}
func isNonblock(f *osFile) bool {
// On Windows, we support non-blocking reads only on named pipes.
isValid := false
st, errno := f.Stat()
if errno == 0 {
isValid = st.Mode&fs.ModeNamedPipe != 0
}
return isValid && f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK
}

View File

@@ -106,7 +106,7 @@ func (f *osFile) reopen() (errno syscall.Errno) {
// IsNonblock implements the same method as documented on fsapi.File
func (f *osFile) IsNonblock() bool {
return f.flag&fsapi.O_NONBLOCK == fsapi.O_NONBLOCK
return isNonblock(f)
}
// SetNonblock implements the same method as documented on fsapi.File

View File

@@ -4,7 +4,6 @@ import (
"context"
"syscall"
"time"
"unsafe"
"github.com/tetratelabs/wazero/internal/platform"
)
@@ -16,11 +15,6 @@ const wasiFdStdin = 0
// pollInterval is the interval between each calls to peekNamedPipe in pollNamedPipe
const pollInterval = 100 * time.Millisecond
var kernel32 = syscall.NewLazyDLL("kernel32.dll")
// procPeekNamedPipe is the syscall.LazyProc in kernel32 for PeekNamedPipe
var procPeekNamedPipe = kernel32.NewProc("PeekNamedPipe")
// syscall_select emulates the select syscall on Windows for two, well-known cases, returns syscall.ENOSYS for all others.
// If r contains fd 0, and it is a regular file, then it immediately returns 1 (data ready on stdin)
// and r will have the fd 0 bit set.
@@ -72,7 +66,8 @@ func syscall_select(n int, r, w, e *platform.FdSet, timeout *time.Duration) (int
func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *time.Duration) (bool, error) {
// Short circuit when the duration is zero.
if duration != nil && *duration == time.Duration(0) {
return peekNamedPipe(pipeHandle)
bytes, err := peekNamedPipe(pipeHandle)
return bytes > 0, err
}
// Ticker that emits at every pollInterval.
@@ -101,27 +96,9 @@ func pollNamedPipe(ctx context.Context, pipeHandle syscall.Handle, duration *tim
if err != nil {
return false, err
}
if res {
if res > 0 {
return true, nil
}
}
}
}
// peekNamedPipe partially exposes PeekNamedPipe from the Win32 API
// see https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe
func peekNamedPipe(handle syscall.Handle) (bool, error) {
var totalBytesAvail uint32
totalBytesPtr := unsafe.Pointer(&totalBytesAvail)
_, _, err := procPeekNamedPipe.Call(
uintptr(handle), // [in] HANDLE hNamedPipe,
0, // [out, optional] LPVOID lpBuffer,
0, // [in] DWORD nBufferSize,
0, // [out, optional] LPDWORD lpBytesRead
uintptr(totalBytesPtr), // [out, optional] LPDWORD lpTotalBytesAvail,
0) // [out, optional] LPDWORD lpBytesLeftThisMessage
if err == syscall.Errno(0) {
return totalBytesAvail > 0, nil
}
return totalBytesAvail > 0, err
}

View File

@@ -33,9 +33,9 @@ func TestSelect_Windows(t *testing.T) {
wh := syscall.Handle(w.Fd())
// Ensure the pipe has data.
hasData, err := peekNamedPipe(rh)
n, err := peekNamedPipe(rh)
require.NoError(t, err)
require.False(t, hasData)
require.NotEqual(t, 0, n)
// Write to the channel.
msg, err := syscall.ByteSliceFromString("test\n")
@@ -44,9 +44,9 @@ func TestSelect_Windows(t *testing.T) {
require.NoError(t, err)
// Ensure the pipe has data.
hasData, err = peekNamedPipe(rh)
n, err = peekNamedPipe(rh)
require.NoError(t, err)
require.True(t, hasData)
require.NotEqual(t, 0, n)
})
t.Run("pollNamedPipe should return immediately when duration is nil (no data)", func(t *testing.T) {