1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

win,pipe: fix race with concurrent readers (#4470)

This fixes a race condition if multiple threads are reading from the
same NamedPipe, which could previously lead to a deadlock situation. We
also substantially improve performance now also, since the PeekFile
call is unnecessary overhead with this change. This API was added in
Windows Vista.

Related to #4467, though doesn't address any of the problems there. I
believe that someone could now implement uv__pipe_try_write using
this same code pattern however.
This commit is contained in:
Jameson Nash 2024-08-02 10:50:32 -04:00 committed by GitHub
parent 4e310d0f90
commit 727ee7237e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -667,15 +667,10 @@ void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
}
handle->pipe.conn.ipc_xfer_queue_length = 0;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
if (handle->read_req.event_handle != NULL) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
assert(handle->read_req.wait_handle == INVALID_HANDLE_VALUE);
if (handle->read_req.event_handle != NULL) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
@ -1417,13 +1412,12 @@ static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle == INVALID_HANDLE_VALUE) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_read_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
assert(req->wait_handle == INVALID_HANDLE_VALUE);
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_read_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
}
}
@ -1451,16 +1445,16 @@ int uv__pipe_read_start(uv_pipe_t* handle,
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
if (handle->read_req.event_handle == NULL) {
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
if (handle->read_req.event_handle == NULL) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
/* If reading was stopped and then started again, there could still be a read
* request pending. */
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
handle->read_req.event_handle == NULL) {
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
if (handle->read_req.event_handle == NULL) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
uv__pipe_queue_read(loop, handle);
}
@ -1713,7 +1707,7 @@ static int uv__pipe_write_data(uv_loop_t* loop,
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->event_handle, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) {
return GetLastError();
}
}
@ -1889,7 +1883,7 @@ static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
int error, uv_buf_t buf) {
DWORD error, uv_buf_t buf) {
if (error == ERROR_BROKEN_PIPE) {
uv__pipe_read_eof(loop, handle, buf);
} else {
@ -1919,17 +1913,25 @@ static void uv__pipe_queue_ipc_xfer_info(
/* Read an exact number of bytes from a pipe. If an error or end-of-file is
* encountered before the requested number of bytes are read, an error is
* returned. */
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
DWORD bytes_read, bytes_read_now;
static DWORD uv__pipe_read_exactly(uv_pipe_t* handle, void* buffer, DWORD count) {
uv_read_t* req;
DWORD bytes_read;
DWORD bytes_read_now;
bytes_read = 0;
while (bytes_read < count) {
if (!ReadFile(h,
req = &handle->read_req;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
if (!ReadFile(handle->handle,
(char*) buffer + bytes_read,
count - bytes_read,
&bytes_read_now,
NULL)) {
return GetLastError();
&req->u.io.overlapped)) {
if (GetLastError() != ERROR_IO_PENDING)
return GetLastError();
if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, &bytes_read_now, TRUE))
return GetLastError();
}
bytes_read += bytes_read_now;
@ -1940,16 +1942,17 @@ static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
}
static DWORD uv__pipe_read_data(uv_loop_t* loop,
uv_pipe_t* handle,
DWORD suggested_bytes,
DWORD max_bytes) {
DWORD bytes_read;
static int uv__pipe_read_data(uv_loop_t* loop,
uv_pipe_t* handle,
DWORD* bytes_read,
DWORD max_bytes) {
uv_buf_t buf;
uv_read_t* req;
DWORD r;
/* Ask the user for a buffer to read data into. */
buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
handle->alloc_cb((uv_handle_t*) handle, *bytes_read, &buf);
if (buf.base == NULL || buf.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
return 0; /* Break out of read loop. */
@ -1962,29 +1965,50 @@ static DWORD uv__pipe_read_data(uv_loop_t* loop,
if (max_bytes > buf.len)
max_bytes = buf.len;
/* Read into the user buffer. */
if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
return 0; /* Break out of read loop. */
/* Read into the user buffer.
* Prepare an Event so that we can cancel if it doesn't complete immediately.
*/
req = &handle->read_req;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
if (ReadFile(handle->handle, buf.base, max_bytes, bytes_read, &req->u.io.overlapped)) {
r = ERROR_SUCCESS;
} else {
r = GetLastError();
*bytes_read = 0;
if (r == ERROR_IO_PENDING) {
r = CancelIoEx(handle->handle, &req->u.io.overlapped);
assert(r || GetLastError() == ERROR_NOT_FOUND);
if (!GetOverlappedResult(handle->handle, &req->u.io.overlapped, bytes_read, TRUE)) {
r = GetLastError();
*bytes_read = 0;
}
}
}
/* Call the read callback. */
handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
if (r == ERROR_SUCCESS || r == ERROR_OPERATION_ABORTED)
handle->read_cb((uv_stream_t*) handle, *bytes_read, &buf);
else
uv__pipe_read_error_or_eof(loop, handle, r, buf);
return bytes_read;
return *bytes_read == max_bytes;
}
static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
int err;
static int uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
uint32_t* data_remaining;
DWORD err;
DWORD more;
DWORD bytes_read;
data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
if (*data_remaining > 0) {
/* Read frame data payload. */
DWORD bytes_read =
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
bytes_read = *data_remaining;
more = uv__pipe_read_data(loop, handle, &bytes_read, bytes_read);
*data_remaining -= bytes_read;
return bytes_read;
} else {
/* Start of a new IPC frame. */
@ -1995,7 +2019,7 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
/* Read the IPC frame header. */
err = uv__pipe_read_exactly(
handle->handle, &frame_header, sizeof frame_header);
handle, &frame_header, sizeof frame_header);
if (err)
goto error;
@ -2031,21 +2055,28 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
/* If no socket xfer info follows, return here. Data will be read in a
* subsequent invocation of uv__pipe_read_ipc(). */
if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
return sizeof frame_header; /* Number of bytes read. */
if (xfer_type != UV__IPC_SOCKET_XFER_NONE) {
/* Read transferred socket information. */
err = uv__pipe_read_exactly(handle, &xfer_info, sizeof xfer_info);
if (err)
goto error;
/* Read transferred socket information. */
err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
if (err)
goto error;
/* Store the pending socket info. */
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
}
/* Store the pending socket info. */
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
/* Return number of bytes read. */
return sizeof frame_header + sizeof xfer_info;
more = 1;
}
/* Return whether the caller should immediately try another read call to get
* more data. */
if (more && *data_remaining == 0) {
/* TODO: use PeekNamedPipe to see if it is really worth trying to do
* another ReadFile call. */
}
return more;
invalid:
/* Invalid frame. */
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
@ -2059,12 +2090,20 @@ error:
void uv__process_pipe_read_req(uv_loop_t* loop,
uv_pipe_t* handle,
uv_req_t* req) {
DWORD err;
DWORD more;
DWORD bytes_requested;
assert(handle->type == UV_NAMED_PIPE);
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
DECREASE_PENDING_REQ_COUNT(handle);
eof_timer_stop(handle);
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
/* At this point, we're done with bookkeeping. If the user has stopped
* reading the pipe in the meantime, there is nothing left to do, since there
* is no callback that we can call. */
@ -2073,7 +2112,7 @@ void uv__process_pipe_read_req(uv_loop_t* loop,
if (!REQ_SUCCESS(req)) {
/* An error occurred doing the zero-read. */
DWORD err = GET_REQ_ERROR(req);
err = GET_REQ_ERROR(req);
/* If the read was cancelled by uv__pipe_interrupt_read(), the request may
* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
@ -2084,34 +2123,18 @@ void uv__process_pipe_read_req(uv_loop_t* loop,
} else {
/* The zero-read completed without error, indicating there is data
* available in the kernel buffer. */
DWORD avail;
/* Get the number of bytes available. */
avail = 0;
if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
/* Read until we've either read all the bytes available, or the 'reading'
* flag is cleared. */
while (avail > 0 && handle->flags & UV_HANDLE_READING) {
while (handle->flags & UV_HANDLE_READING) {
bytes_requested = 65536;
/* Depending on the type of pipe, read either IPC frames or raw data. */
DWORD bytes_read =
handle->ipc ? uv__pipe_read_ipc(loop, handle)
: uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
if (handle->ipc)
more = uv__pipe_read_ipc(loop, handle);
else
more = uv__pipe_read_data(loop, handle, &bytes_requested, INT32_MAX);
/* If no bytes were read, treat this as an indication that an error
* occurred, and break out of the read loop. */
if (bytes_read == 0)
if (more == 0)
break;
/* It is possible that more bytes were read than we thought were
* available. To prevent `avail` from underflowing, break out of the loop
* if this is the case. */
if (bytes_read > avail)
break;
/* Recompute the number of bytes available. */
avail -= bytes_read;
}
}
@ -2134,15 +2157,13 @@ void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
UNREGISTER_HANDLE_REQ(loop, handle);
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
err = GET_REQ_ERROR(req);