1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

windows: make uv_read_stop immediately stop reading

This implements locking around the blocking call to ReadFile to get
around a Windows kernel bug where a blocking ReadFile operation on a
stream can deadlock the thread. This allows uv_read_stop to immediately
cancel a pending IO operation, and allows uv_pipe_getsockname to
"pause" any pending read (from libuv) while it retrieves the
sockname information.

If unsupported by the OS (pre-Vista), this reverts to the old
(e.g. deadlock-prone) behavior

Closes #1313
This commit is contained in:
Jameson Nash 2014-07-15 01:49:11 -04:00 committed by Saúl Ibarra Corretgé
parent 02e1ebd40b
commit 837c62c71b
8 changed files with 180 additions and 4 deletions

View File

@ -443,6 +443,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
int queue_len; \
} pending_ipc_info; \
uv_write_t* non_overlapped_writes_tail; \
uv_mutex_t readfile_mutex; \
volatile HANDLE readfile_thread; \
void* reserved;
#define UV_PIPE_PRIVATE_FIELDS \

View File

@ -100,6 +100,7 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
/* Only used by uv_pipe_t handles. */
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x01000000
#define UV_HANDLE_PIPESERVER 0x02000000
#define UV_HANDLE_PIPE_READ_CANCELABLE 0x04000000
/* Only used by uv_tty_t handles. */
#define UV_HANDLE_TTY_READABLE 0x01000000
@ -181,6 +182,9 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle,
uv_write_cb cb);
void uv__pipe_pause_read(uv_pipe_t* handle);
void uv__pipe_unpause_read(uv_pipe_t* handle);
void uv__pipe_stop_read(uv_pipe_t* handle);
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* req);

View File

@ -101,6 +101,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->pending_ipc_info.queue_len = 0;
handle->ipc = ipc;
handle->non_overlapped_writes_tail = NULL;
handle->readfile_thread = NULL;
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
@ -112,6 +113,12 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) {
uv_connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->eof_timer = NULL;
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
if (pCancelSynchronousIo &&
handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
uv_mutex_init(&handle->readfile_mutex);
handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE;
}
}
@ -321,6 +328,11 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
FILE_PIPE_LOCAL_INFORMATION pipe_info;
uv__ipc_queue_item_t* item;
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE;
uv_mutex_destroy(&handle->readfile_mutex);
}
if ((handle->flags & UV_HANDLE_CONNECTION) &&
handle->shutdown_req != NULL &&
handle->write_reqs_pending == 0) {
@ -658,12 +670,49 @@ error:
}
void uv__pipe_pause_read(uv_pipe_t* handle) {
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
/* Pause the ReadFile task briefly, to work
around the Windows kernel bug that causes
any access to a NamedPipe to deadlock if
any process has called ReadFile */
HANDLE h;
uv_mutex_lock(&handle->readfile_mutex);
h = handle->readfile_thread;
while (h) {
/* spinlock: we expect this to finish quickly,
or we are probably about to deadlock anyways
(in the kernel), so it doesn't matter */
pCancelSynchronousIo(h);
SwitchToThread(); /* yield thread control briefly */
h = handle->readfile_thread;
}
}
}
void uv__pipe_unpause_read(uv_pipe_t* handle) {
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
uv_mutex_unlock(&handle->readfile_mutex);
}
}
void uv__pipe_stop_read(uv_pipe_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
uv__pipe_pause_read((uv_pipe_t*)handle);
uv__pipe_unpause_read((uv_pipe_t*)handle);
}
/* Cleans up uv_pipe_t (server or connection) and all resources associated */
/* with it. */
void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
int i;
HANDLE pipeHandle;
uv__pipe_stop_read(handle);
if (handle->name) {
free(handle->name);
handle->name = NULL;
@ -689,6 +738,7 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
CloseHandle(handle->handle);
handle->handle = INVALID_HANDLE_VALUE;
}
}
@ -867,19 +917,61 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
uv_read_t* req = (uv_read_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
HANDLE hThread = NULL;
DWORD err;
uv_mutex_t *m = &handle->readfile_mutex;
assert(req != NULL);
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);
if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */
if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(),
GetCurrentProcess(), &hThread,
0, TRUE, DUPLICATE_SAME_ACCESS)) {
handle->readfile_thread = hThread;
} else {
hThread = NULL;
}
uv_mutex_unlock(m);
}
restart_readfile:
result = ReadFile(handle->handle,
&uv_zero_,
0,
&bytes,
NULL);
if (!result) {
err = GetLastError();
if (err == ERROR_OPERATION_ABORTED &&
handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) {
if (handle->flags & UV_HANDLE_READING) {
/* just a brief break to do something else */
handle->readfile_thread = NULL;
/* resume after it is finished */
uv_mutex_lock(m);
handle->readfile_thread = hThread;
uv_mutex_unlock(m);
goto restart_readfile;
} else {
result = 1; /* successfully stopped reading */
}
}
}
if (hThread) {
assert(hThread == handle->readfile_thread);
/* mutex does not control clearing readfile_thread */
handle->readfile_thread = NULL;
uv_mutex_lock(m);
/* only when we hold the mutex lock is it safe to
open or close the handle */
CloseHandle(hThread);
uv_mutex_unlock(m);
}
if (!result) {
SET_REQ_ERROR(req, GetLastError());
SET_REQ_ERROR(req, err);
}
POST_COMPLETION_FOR_REQ(loop, req);
@ -1836,6 +1928,8 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
return UV_EINVAL;
}
uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */
nt_status = pNtQueryInformationFile(handle->handle,
&io_status,
&tmp_name_info,
@ -1846,7 +1940,8 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
name_info = malloc(name_size);
if (!name_info) {
*len = 0;
return UV_ENOMEM;
err = UV_ENOMEM;
goto cleanup;
}
nt_status = pNtQueryInformationFile(handle->handle,
@ -1918,10 +2013,14 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
buf[addrlen++] = '\0';
*len = addrlen;
return 0;
err = 0;
goto cleanup;
error:
free(name_info);
cleanup:
uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */
return err;
}

View File

@ -106,7 +106,11 @@ int uv_read_stop(uv_stream_t* handle) {
if (handle->type == UV_TTY) {
err = uv_tty_read_stop((uv_tty_t*) handle);
} else {
handle->flags &= ~UV_HANDLE_READING;
if (handle->type == UV_NAMED_PIPE) {
uv__pipe_stop_read((uv_pipe_t*) handle);
} else {
handle->flags &= ~UV_HANDLE_READING;
}
DECREASE_ACTIVE_COUNT(handle->loop, handle);
}

View File

@ -51,6 +51,7 @@ sSleepConditionVariableCS pSleepConditionVariableCS;
sSleepConditionVariableSRW pSleepConditionVariableSRW;
sWakeAllConditionVariable pWakeAllConditionVariable;
sWakeConditionVariable pWakeConditionVariable;
sCancelSynchronousIo pCancelSynchronousIo;
void uv_winapi_init() {
@ -156,4 +157,7 @@ void uv_winapi_init() {
pWakeConditionVariable = (sWakeConditionVariable)
GetProcAddress(kernel32_module, "WakeConditionVariable");
pCancelSynchronousIo = (sCancelSynchronousIo)
GetProcAddress(kernel32_module, "CancelSynchronousIo");
}

View File

@ -4617,6 +4617,8 @@ typedef VOID (WINAPI* sWakeAllConditionVariable)
typedef VOID (WINAPI* sWakeConditionVariable)
(PCONDITION_VARIABLE ConditionVariable);
typedef BOOL (WINAPI* sCancelSynchronousIo)
(HANDLE hThread);
/* Ntdll function pointers */
extern sRtlNtStatusToDosError pRtlNtStatusToDosError;
@ -4644,5 +4646,6 @@ extern sSleepConditionVariableCS pSleepConditionVariableCS;
extern sSleepConditionVariableSRW pSleepConditionVariableSRW;
extern sWakeAllConditionVariable pWakeAllConditionVariable;
extern sWakeConditionVariable pWakeConditionVariable;
extern sCancelSynchronousIo pCancelSynchronousIo;
#endif /* UV_WIN_WINAPI_H_ */

View File

@ -109,6 +109,7 @@ TEST_DECLARE (pipe_connect_bad_name)
TEST_DECLARE (pipe_connect_to_file)
TEST_DECLARE (pipe_getsockname)
TEST_DECLARE (pipe_getsockname_abstract)
TEST_DECLARE (pipe_getsockname_blocking)
TEST_DECLARE (pipe_sendmsg)
TEST_DECLARE (pipe_server_close)
TEST_DECLARE (connection_fail)
@ -402,6 +403,7 @@ TASK_LIST_START
TEST_ENTRY (pipe_listen_without_bind)
TEST_ENTRY (pipe_getsockname)
TEST_ENTRY (pipe_getsockname_abstract)
TEST_ENTRY (pipe_getsockname_blocking)
TEST_ENTRY (pipe_sendmsg)
TEST_ENTRY (connection_fail)

View File

@ -32,6 +32,8 @@
#ifndef _WIN32
# include <unistd.h> /* close */
#else
# include <fcntl.h>
#endif
@ -120,3 +122,59 @@ TEST_IMPL(pipe_getsockname_abstract) {
#endif
}
TEST_IMPL(pipe_getsockname_blocking) {
#ifdef _WIN32
uv_pipe_t reader;
HANDLE readh, writeh;
int readfd;
char buf1[1024], buf2[1024];
size_t len1, len2;
int r;
r = CreatePipe(&readh, &writeh, NULL, 65536);
ASSERT(r != 0);
r = uv_pipe_init(uv_default_loop(), &reader, 0);
ASSERT(r == 0);
readfd = _open_osfhandle((intptr_t)readh, _O_RDONLY);
ASSERT(r != -1);
r = uv_pipe_open(&reader, readfd);
ASSERT(r == 0);
r = uv_read_start((uv_stream_t*)&reader, NULL, NULL);
ASSERT(r == 0);
Sleep(100);
r = uv_read_stop((uv_stream_t*)&reader);
ASSERT(r == 0);
len1 = sizeof buf1;
r = uv_pipe_getsockname(&reader, buf1, &len1);
ASSERT(r == 0);
r = uv_read_start((uv_stream_t*)&reader, NULL, NULL);
ASSERT(r == 0);
Sleep(100);
len2 = sizeof buf2;
r = uv_pipe_getsockname(&reader, buf2, &len2);
ASSERT(r == 0);
r = uv_read_stop((uv_stream_t*)&reader);
ASSERT(r == 0);
ASSERT(len1 == len2);
ASSERT(memcmp(buf1, buf2, len1) == 0);
close_cb_called = 0;
uv_close((uv_handle_t*)&reader, close_cb);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
ASSERT(close_cb_called == 1);
_close(readfd);
CloseHandle(writeh);
#endif
MAKE_VALGRIND_HAPPY();
return 0;
}