mirror of
https://github.com/libuv/libuv
synced 2025-03-28 21:13:16 +00:00
windows: relay TCP bind errors via ipc
This is the libuv side of the fix for Node's cluster module on Windows. https://github.com/joyent/node/issues/7691 Windows and Unix return certain socket errors (i.e. EADDRINUSE) at different times: bind on Windows, and listen on Unix. In an effort to hide this difference, libuv on Windows stores such errors in the bind_error field of uv_tcp_t, to defer raising it at listen time. This worked fine except for the case in which a socket is shared in a Node cluster and a bind error occurs. A previous attempt to fix this (d1e6be1460
3da36fe00e
) was flawed becaused in an attempt to relay the error at the JS level it caused the master to start accepting connections. With this new approach, libuv itself is relaying the bind errors, providing for a uniform behavior of uv_tcp_listen.
This commit is contained in:
parent
3d73d55627
commit
6d3a051eb4
@ -409,7 +409,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
|
||||
|
||||
#define UV_TCP_PRIVATE_FIELDS \
|
||||
SOCKET socket; \
|
||||
int bind_error; \
|
||||
int delayed_error; \
|
||||
union { \
|
||||
struct { uv_tcp_server_fields }; \
|
||||
struct { uv_tcp_connection_fields }; \
|
||||
|
@ -75,7 +75,6 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
|
||||
/* Used by streams and UDP handles. */
|
||||
#define UV_HANDLE_READING 0x00000100
|
||||
#define UV_HANDLE_BOUND 0x00000200
|
||||
#define UV_HANDLE_BIND_ERROR 0x00000400
|
||||
#define UV_HANDLE_LISTENING 0x00000800
|
||||
#define UV_HANDLE_CONNECTION 0x00001000
|
||||
#define UV_HANDLE_CONNECTED 0x00002000
|
||||
@ -125,6 +124,12 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
|
||||
/*
|
||||
* TCP
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
WSAPROTOCOL_INFOW socket_info;
|
||||
int delayed_error;
|
||||
} uv__ipc_socket_info_ex;
|
||||
|
||||
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
|
||||
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client);
|
||||
int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
|
||||
@ -143,7 +148,7 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp);
|
||||
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
|
||||
|
||||
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
||||
int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex,
|
||||
int tcp_connection);
|
||||
|
||||
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
|
@ -35,10 +35,10 @@ typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
|
||||
|
||||
struct uv__ipc_queue_item_s {
|
||||
/*
|
||||
* NOTE: It is important for socket_info to be the first field,
|
||||
* NOTE: It is important for socket_info_ex to be the first field,
|
||||
* because we will we assigning it to the pending_ipc_info.socket_info
|
||||
*/
|
||||
WSAPROTOCOL_INFOW socket_info;
|
||||
uv__ipc_socket_info_ex socket_info_ex;
|
||||
QUEUE member;
|
||||
int tcp_connection;
|
||||
};
|
||||
@ -73,7 +73,7 @@ typedef struct {
|
||||
/* IPC frame, which contains an imported TCP socket stream. */
|
||||
typedef struct {
|
||||
uv_ipc_frame_header_t header;
|
||||
WSAPROTOCOL_INFOW socket_info;
|
||||
uv__ipc_socket_info_ex socket_info_ex;
|
||||
} uv_ipc_frame_uv_stream;
|
||||
|
||||
static void eof_timer_init(uv_pipe_t* pipe);
|
||||
@ -408,7 +408,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
&item->socket_info,
|
||||
&item->socket_info_ex.socket_info,
|
||||
0,
|
||||
WSA_FLAG_OVERLAPPED);
|
||||
free(item);
|
||||
@ -787,7 +787,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
||||
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
||||
|
||||
err = uv_tcp_import((uv_tcp_t*)client,
|
||||
&item->socket_info,
|
||||
&item->socket_info_ex,
|
||||
item->tcp_connection);
|
||||
if (err != 0)
|
||||
return err;
|
||||
@ -1132,10 +1132,13 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
|
||||
tcp_send_handle = (uv_tcp_t*)send_handle;
|
||||
|
||||
err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
|
||||
&ipc_frame.socket_info);
|
||||
&ipc_frame.socket_info_ex.socket_info);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error;
|
||||
|
||||
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
|
||||
|
||||
if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
|
||||
@ -1395,7 +1398,7 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
|
||||
|
||||
void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
|
||||
WSAPROTOCOL_INFOW* info,
|
||||
uv__ipc_socket_info_ex* info,
|
||||
int tcp_connection) {
|
||||
uv__ipc_queue_item_t* item;
|
||||
|
||||
@ -1403,7 +1406,7 @@ void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
|
||||
if (item == NULL)
|
||||
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
|
||||
|
||||
memcpy(&item->socket_info, info, sizeof(item->socket_info));
|
||||
memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex));
|
||||
item->tcp_connection = tcp_connection;
|
||||
QUEUE_INSERT_TAIL(&handle->pending_ipc_info.queue, &item->member);
|
||||
handle->pending_ipc_info.queue_len++;
|
||||
@ -1469,11 +1472,11 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
|
||||
if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
|
||||
assert(avail - sizeof(ipc_frame.header) >=
|
||||
sizeof(ipc_frame.socket_info));
|
||||
sizeof(ipc_frame.socket_info_ex));
|
||||
|
||||
/* Read the TCP socket info. */
|
||||
if (!ReadFile(handle->handle,
|
||||
&ipc_frame.socket_info,
|
||||
&ipc_frame.socket_info_ex,
|
||||
sizeof(ipc_frame) - sizeof(ipc_frame.header),
|
||||
&bytes,
|
||||
NULL)) {
|
||||
@ -1487,7 +1490,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
/* Store the pending socket info. */
|
||||
uv__pipe_insert_pending_socket(
|
||||
handle,
|
||||
&ipc_frame.socket_info,
|
||||
&ipc_frame.socket_info_ex,
|
||||
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
|
||||
}
|
||||
|
||||
|
@ -156,6 +156,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
|
||||
handle->func_acceptex = NULL;
|
||||
handle->func_connectex = NULL;
|
||||
handle->processed_accepts = 0;
|
||||
handle->delayed_error = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -302,8 +303,7 @@ static int uv_tcp_try_bind(uv_tcp_t* handle,
|
||||
err = WSAGetLastError();
|
||||
if (err == WSAEADDRINUSE) {
|
||||
/* Some errors are not to be reported until connect() or listen() */
|
||||
handle->bind_error = err;
|
||||
handle->flags |= UV_HANDLE_BIND_ERROR;
|
||||
handle->delayed_error = err;
|
||||
} else {
|
||||
return err;
|
||||
}
|
||||
@ -528,8 +528,8 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
|
||||
return WSAEISCONN;
|
||||
}
|
||||
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR) {
|
||||
return handle->bind_error;
|
||||
if (handle->delayed_error) {
|
||||
return handle->delayed_error;
|
||||
}
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_BOUND)) {
|
||||
@ -539,8 +539,8 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
|
||||
0);
|
||||
if (err)
|
||||
return err;
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR)
|
||||
return handle->bind_error;
|
||||
if (handle->delayed_error)
|
||||
return handle->delayed_error;
|
||||
}
|
||||
|
||||
if (!handle->func_acceptex) {
|
||||
@ -712,8 +712,8 @@ static int uv_tcp_try_connect(uv_connect_t* req,
|
||||
DWORD bytes;
|
||||
int err;
|
||||
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR) {
|
||||
return handle->bind_error;
|
||||
if (handle->delayed_error) {
|
||||
return handle->delayed_error;
|
||||
}
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_BOUND)) {
|
||||
@ -727,8 +727,8 @@ static int uv_tcp_try_connect(uv_connect_t* req,
|
||||
err = uv_tcp_try_bind(handle, bind_addr, addrlen, 0);
|
||||
if (err)
|
||||
return err;
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR)
|
||||
return handle->bind_error;
|
||||
if (handle->delayed_error)
|
||||
return handle->delayed_error;
|
||||
}
|
||||
|
||||
if (!handle->func_connectex) {
|
||||
@ -777,8 +777,8 @@ int uv_tcp_getsockname(const uv_tcp_t* handle,
|
||||
return UV_EINVAL;
|
||||
}
|
||||
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR) {
|
||||
return uv_translate_sys_error(handle->bind_error);
|
||||
if (handle->delayed_error) {
|
||||
return uv_translate_sys_error(handle->delayed_error);
|
||||
}
|
||||
|
||||
result = getsockname(handle->socket, name, namelen);
|
||||
@ -799,8 +799,8 @@ int uv_tcp_getpeername(const uv_tcp_t* handle,
|
||||
return UV_EINVAL;
|
||||
}
|
||||
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR) {
|
||||
return uv_translate_sys_error(handle->bind_error);
|
||||
if (handle->delayed_error) {
|
||||
return uv_translate_sys_error(handle->delayed_error);
|
||||
}
|
||||
|
||||
result = getpeername(handle->socket, name, namelen);
|
||||
@ -1117,14 +1117,13 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
||||
int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex,
|
||||
int tcp_connection) {
|
||||
int err;
|
||||
|
||||
SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
socket_protocol_info,
|
||||
&socket_info_ex->socket_info,
|
||||
0,
|
||||
WSA_FLAG_OVERLAPPED);
|
||||
|
||||
@ -1141,7 +1140,7 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
||||
err = uv_tcp_set_socket(tcp->loop,
|
||||
tcp,
|
||||
socket,
|
||||
socket_protocol_info->iAddressFamily,
|
||||
socket_info_ex->socket_info.iAddressFamily,
|
||||
1);
|
||||
if (err) {
|
||||
closesocket(socket);
|
||||
@ -1156,6 +1155,8 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
||||
tcp->flags |= UV_HANDLE_BOUND;
|
||||
tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
tcp->delayed_error = socket_info_ex->delayed_error;
|
||||
|
||||
tcp->loop->active_tcp_streams++;
|
||||
return 0;
|
||||
}
|
||||
@ -1216,13 +1217,10 @@ int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
return ERROR_INVALID_PARAMETER;
|
||||
}
|
||||
|
||||
/* Report any deferred bind errors now. */
|
||||
if (handle->flags & UV_HANDLE_BIND_ERROR) {
|
||||
return handle->bind_error;
|
||||
}
|
||||
|
||||
if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
|
||||
return WSAGetLastError();
|
||||
if (!(handle->delayed_error)) {
|
||||
if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
|
||||
handle->delayed_error = WSAGetLastError();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -39,6 +39,7 @@
|
||||
int ipc_helper(int listen_after_write);
|
||||
int ipc_helper_tcp_connection(void);
|
||||
int ipc_send_recv_helper(void);
|
||||
int ipc_helper_bind_twice(void);
|
||||
int stdio_over_pipes_helper(void);
|
||||
|
||||
static int maybe_run_test(int argc, char **argv);
|
||||
@ -82,6 +83,10 @@ static int maybe_run_test(int argc, char **argv) {
|
||||
return ipc_helper_tcp_connection();
|
||||
}
|
||||
|
||||
if (strcmp(argv[1], "ipc_helper_bind_twice") == 0) {
|
||||
return ipc_helper_bind_twice();
|
||||
}
|
||||
|
||||
if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) {
|
||||
return stdio_over_pipes_helper();
|
||||
}
|
||||
|
125
test/test-ipc.c
125
test/test-ipc.c
@ -27,6 +27,7 @@
|
||||
|
||||
static uv_pipe_t channel;
|
||||
static uv_tcp_t tcp_server;
|
||||
static uv_tcp_t tcp_server2;
|
||||
static uv_tcp_t tcp_connection;
|
||||
|
||||
static int exit_cb_called;
|
||||
@ -38,8 +39,6 @@ static int local_conn_accepted;
|
||||
static int remote_conn_accepted;
|
||||
static int tcp_server_listening;
|
||||
static uv_write_t write_req;
|
||||
static uv_pipe_t channel;
|
||||
static uv_tcp_t tcp_server;
|
||||
static uv_write_t conn_notify_req;
|
||||
static int close_cb_called;
|
||||
static int connection_accepted;
|
||||
@ -205,6 +204,71 @@ static void on_read(uv_stream_t* handle,
|
||||
free(buf->base);
|
||||
}
|
||||
|
||||
#ifdef _WIN32
|
||||
static void on_read_listen_after_bound_twice(uv_stream_t* handle,
|
||||
ssize_t nread,
|
||||
const uv_buf_t* buf) {
|
||||
int r;
|
||||
uv_pipe_t* pipe;
|
||||
uv_handle_type pending;
|
||||
|
||||
pipe = (uv_pipe_t*) handle;
|
||||
|
||||
if (nread == 0) {
|
||||
/* Everything OK, but nothing read. */
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread < 0) {
|
||||
if (nread == UV_EOF) {
|
||||
free(buf->base);
|
||||
return;
|
||||
}
|
||||
|
||||
printf("error recving on channel: %s\n", uv_strerror(nread));
|
||||
abort();
|
||||
}
|
||||
|
||||
fprintf(stderr, "got %d bytes\n", (int)nread);
|
||||
|
||||
ASSERT(uv_pipe_pending_count(pipe) > 0);
|
||||
pending = uv_pipe_pending_type(pipe);
|
||||
ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
|
||||
read_cb_called++;
|
||||
|
||||
if (read_cb_called == 1) {
|
||||
/* Accept the first TCP server, and start listening on it. */
|
||||
ASSERT(pending == UV_TCP);
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, on_connection);
|
||||
ASSERT(r == 0);
|
||||
} else if (read_cb_called == 2) {
|
||||
/* Accept the second TCP server, and start listening on it. */
|
||||
ASSERT(pending == UV_TCP);
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server2);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_listen((uv_stream_t*)&tcp_server2, 12, on_connection);
|
||||
ASSERT(r == UV_EADDRINUSE);
|
||||
|
||||
uv_close((uv_handle_t*)&tcp_server, NULL);
|
||||
uv_close((uv_handle_t*)&tcp_server2, NULL);
|
||||
ASSERT(0 == uv_pipe_pending_count(pipe));
|
||||
uv_close((uv_handle_t*)&channel, NULL);
|
||||
}
|
||||
|
||||
free(buf->base);
|
||||
}
|
||||
#endif
|
||||
|
||||
void spawn_helper(uv_pipe_t* channel,
|
||||
uv_process_t* process,
|
||||
@ -424,6 +488,13 @@ TEST_IMPL(listen_no_simultaneous_accepts) {
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
||||
TEST_IMPL(ipc_listen_after_bind_twice) {
|
||||
int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
|
||||
ASSERT(read_cb_called == 2);
|
||||
ASSERT(exit_cb_called == 1);
|
||||
return r;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
@ -608,7 +679,7 @@ int ipc_helper(int listen_after_write) {
|
||||
|
||||
int ipc_helper_tcp_connection(void) {
|
||||
/*
|
||||
* This is launched from test-ipc.c. stdin is a duplex channel that we
|
||||
* This is launched from test-ipc.c. stdin is a duplex channel
|
||||
* over which a handle will be transmitted.
|
||||
*/
|
||||
|
||||
@ -657,3 +728,51 @@ int ipc_helper_tcp_connection(void) {
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ipc_helper_bind_twice(void) {
|
||||
/*
|
||||
* This is launched from test-ipc.c. stdin is a duplex channel
|
||||
* over which two handles will be transmitted.
|
||||
*/
|
||||
struct sockaddr_in addr;
|
||||
uv_write_t write_req;
|
||||
uv_write_t write_req2;
|
||||
int r;
|
||||
uv_buf_t buf;
|
||||
|
||||
ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
||||
|
||||
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_pipe_open(&channel, 0);
|
||||
|
||||
ASSERT(1 == uv_is_readable((uv_stream_t*) &channel));
|
||||
ASSERT(1 == uv_is_writable((uv_stream_t*) &channel));
|
||||
ASSERT(0 == uv_is_closing((uv_handle_t*) &channel));
|
||||
|
||||
buf = uv_buf_init("hello\n", 6);
|
||||
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
||||
ASSERT(r == 0);
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server2);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
|
||||
ASSERT(r == 0);
|
||||
r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
|
||||
(uv_stream_t*)&tcp_server, NULL);
|
||||
ASSERT(r == 0);
|
||||
r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
|
||||
(uv_stream_t*)&tcp_server2, NULL);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||
ASSERT(r == 0);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
|
@ -258,6 +258,7 @@ TEST_DECLARE (listen_with_simultaneous_accepts)
|
||||
TEST_DECLARE (listen_no_simultaneous_accepts)
|
||||
TEST_DECLARE (fs_stat_root)
|
||||
TEST_DECLARE (spawn_with_an_odd_path)
|
||||
TEST_DECLARE (ipc_listen_after_bind_twice)
|
||||
#else
|
||||
TEST_DECLARE (emfile)
|
||||
TEST_DECLARE (close_fd)
|
||||
@ -526,6 +527,7 @@ TASK_LIST_START
|
||||
TEST_ENTRY (listen_no_simultaneous_accepts)
|
||||
TEST_ENTRY (fs_stat_root)
|
||||
TEST_ENTRY (spawn_with_an_odd_path)
|
||||
TEST_ENTRY (ipc_listen_after_bind_twice)
|
||||
#else
|
||||
TEST_ENTRY (emfile)
|
||||
TEST_ENTRY (close_fd)
|
||||
|
Loading…
x
Reference in New Issue
Block a user