1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

allocate windows pipe handles on demand

This commit is contained in:
Igor Zinkovsky 2011-07-13 12:35:16 -07:00 committed by Ryan Dahl
parent 9aff11026b
commit f5ff869488
7 changed files with 154 additions and 132 deletions

View File

@ -46,13 +46,18 @@ typedef struct uv_buf_t {
* Private uv_pipe_instance state.
*/
typedef enum {
UV_PIPEINSTANCE_DISCONNECTED = 0,
UV_PIPEINSTANCE_PENDING,
UV_PIPEINSTANCE_WAITING,
UV_PIPEINSTANCE_ACCEPTED,
UV_PIPEINSTANCE_CONNECTED = 0,
UV_PIPEINSTANCE_DISCONNECTED,
UV_PIPEINSTANCE_ACTIVE
} uv_pipeinstance_state;
/* Used to store active pipe instances inside a linked list. */
typedef struct uv_pipe_instance_s {
HANDLE handle;
uv_pipeinstance_state state;
struct uv_pipe_instance_s* next;
} uv_pipe_instance_t;
#define UV_REQ_PRIVATE_FIELDS \
union { \
/* Used by I/O operations */ \
@ -93,23 +98,14 @@ typedef enum {
#define uv_pipe_server_fields \
char* name; \
int connectionCount; \
uv_pipe_instance_t* connections; \
uv_pipe_instance_t* acceptConnection; \
uv_pipe_instance_t connectionsBuffer[4];
struct uv_req_s accept_reqs[4];
#define uv_pipe_connection_fields \
uv_pipe_t* server; \
uv_pipe_instance_t* connection; \
uv_pipe_instance_t clientConnection;
#define UV_PIPE_PRIVATE_TYPEDEF \
typedef struct uv_pipe_instance_s { \
HANDLE handle; \
uv_pipeinstance_state state; \
uv_req_t accept_req; \
} uv_pipe_instance_t;
#define UV_PIPE_PRIVATE_FIELDS \
union { \
struct { uv_pipe_server_fields }; \

View File

@ -298,19 +298,17 @@ int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen);
/*
* A subclass of uv_stream_t representing a pipe stream or pipe server.
*/
UV_PIPE_PRIVATE_TYPEDEF
struct uv_pipe_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
UV_PIPE_PRIVATE_FIELDS
struct uv_pipe_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
UV_PIPE_PRIVATE_FIELDS
};
int uv_pipe_init(uv_pipe_t* handle);
int uv_pipe_bind(uv_pipe_t* handle, const char* name);
int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb);
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb);
int uv_pipe_connect(uv_req_t* req, const char* name);
@ -494,6 +492,7 @@ extern uint64_t uv_hrtime(void);
/* the presence of this union forces similar struct layout */
union uv_any_handle {
uv_tcp_t tcp;
uv_pipe_t pipe;
uv_prepare_t prepare;
uv_check_t check;
uv_idle_t idle;

View File

@ -29,6 +29,8 @@
#include "uv.h"
#define COUNTOF(a) (sizeof(a) / sizeof(a[0]))
/*
* Subclass of uv_handle_t. Used for integration of c-ares.
*/

View File

@ -1629,7 +1629,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
}
int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
assert(0 && "implement me");
}

View File

@ -377,6 +377,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
case ERROR_INVALID_PARAMETER: return UV_EINVAL;
case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET;
case ERROR_BROKEN_PIPE: return UV_EOF;
case ERROR_PIPE_BUSY: return UV_EBUSY;
default: return UV_UNKNOWN;
}
}
@ -531,11 +532,6 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) {
}
static uv_pipe_instance_t* uv_req_to_pipeinstance(uv_req_t* req) {
return CONTAINING_RECORD(req, uv_pipe_instance_t, accept_req);
}
static void uv_insert_pending_req(uv_req_t* req) {
req->next_req = NULL;
if (uv_pending_reqs_tail_) {
@ -1009,29 +1005,38 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
static void uv_pipe_queue_accept(uv_pipe_t* handle) {
uv_req_t* req;
uv_pipe_instance_t* instance;
HANDLE pipeHandle;
int i;
assert(handle->flags & UV_HANDLE_LISTENING);
/* This loop goes through every pipe instance and calls ConnectNamedPipe for every pending instance.
* TODO: Make this faster (we could maintain a linked list of pending instances).
*/
for (i = 0; i < handle->connectionCount; i++) {
instance = &handle->connections[i];
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
req = &handle->accept_reqs[i];
if (!(req->flags & UV_REQ_PENDING)) {
pipeHandle = CreateNamedPipe(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES,
65536,
65536,
0,
NULL);
if (instance->state == UV_PIPEINSTANCE_PENDING) {
/* Prepare the uv_req structure. */
req = &instance->accept_req;
uv_req_init(req, (uv_handle_t*)handle, NULL);
assert(!(req->flags & UV_REQ_PENDING));
req->type = UV_ACCEPT;
req->flags |= UV_REQ_PENDING;
if (pipeHandle == INVALID_HANDLE_VALUE) {
continue;
}
if (CreateIoCompletionPort(pipeHandle,
uv_iocp_,
(ULONG_PTR)handle,
0) == NULL) {
continue;
}
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (!ConnectNamedPipe(instance->handle, &req->overlapped) &&
if (!ConnectNamedPipe(pipeHandle, &req->overlapped) &&
GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(GetLastError());
@ -1040,9 +1045,9 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle) {
continue;
}
instance->state = UV_PIPEINSTANCE_WAITING;
handle->reqs_pending++;
req->data = pipeHandle;
req->flags |= UV_REQ_PENDING;
handle->reqs_pending++;
}
}
}
@ -1174,16 +1179,28 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
assert(server->acceptConnection);
uv_pipe_instance_t* connection = server->connections;
/* Find a connection instance that has been connected, but not yet accepted. */
while (connection) {
if (connection->state == UV_PIPEINSTANCE_CONNECTED) {
break;
}
connection = connection->next;
}
if (!connection) {
/* No valid connections found, so we error out. */
uv_set_sys_error(UV_ENOTCONN);
return -1;
}
/* Make the connection instance active */
server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE;
/* Move the connection instance from server to client */
client->connection = server->acceptConnection;
server->acceptConnection = NULL;
/* Remember the server */
connection->state = UV_PIPEINSTANCE_ACTIVE;
/* Assign the connection to the client. */
client->connection = connection;
client->server = server;
uv_init_connection((uv_stream_t*)client);
@ -1858,15 +1875,30 @@ static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) {
static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) {
uv_pipe_instance_t* pipeInstance;
assert(handle->type == UV_NAMED_PIPE);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
if (req->error.code == UV_OK) {
/* Put the connection instance into accept state */
handle->acceptConnection = uv_req_to_pipeinstance(req);
handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED;
assert(req->data);
/* Create the connection instance and add it to the connections list. */
pipeInstance = (uv_pipe_instance_t*)malloc(sizeof(uv_pipe_instance_t));
if (!pipeInstance) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
pipeInstance->handle = req->data;
pipeInstance->state = UV_PIPEINSTANCE_CONNECTED;
pipeInstance->next = handle->connections;
handle->connections = pipeInstance;
/* Clear the request. */
req->data = NULL;
req->flags = 0;
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
@ -2985,13 +3017,23 @@ int uv_pipe_init(uv_pipe_t* handle) {
/* Creates a pipe server. */
/* TODO: make this work with UTF8 name */
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
int i;
if (!name) {
return -1;
}
handle->connections = NULL;
handle->acceptConnection = NULL;
handle->connectionCount = 0;
/* Initialize accept requests. */
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
handle->accept_reqs[i].flags = 0;
handle->accept_reqs[i].type = UV_ACCEPT;
handle->accept_reqs[i].handle = (uv_handle_t*)handle;
handle->accept_reqs[i].cb = NULL;
handle->accept_reqs[i].data = NULL;
uv_counters()->req_init++;
}
/* Make our own copy of the pipe name */
handle->name = (char*)malloc(MAX_PIPENAME_LEN);
@ -3007,7 +3049,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
/* Starts listening for connections for the given pipe. */
int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
int i, maxInstances, errno;
HANDLE pipeHandle;
uv_pipe_instance_t* pipeInstance;
@ -3023,61 +3065,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
return -1;
}
if (instanceCount <= sizeof(handle->connectionsBuffer)) {
/* Use preallocated connections buffer */
handle->connections = handle->connectionsBuffer;
} else {
handle->connections = (uv_pipe_instance_t*)malloc(instanceCount * sizeof(uv_pipe_instance_t));
if (!handle->connections) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
}
maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount;
for (i = 0; i < instanceCount; i++) {
pipeHandle = CreateNamedPipe(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
maxInstances,
65536,
65536,
0,
NULL);
if (pipeHandle == INVALID_HANDLE_VALUE) {
errno = GetLastError();
goto error;
}
if (CreateIoCompletionPort(pipeHandle,
uv_iocp_,
(ULONG_PTR)handle,
0) == NULL) {
errno = GetLastError();
goto error;
}
pipeInstance = &handle->connections[i];
pipeInstance->handle = pipeHandle;
pipeInstance->state = UV_PIPEINSTANCE_PENDING;
}
/* We don't need the pipe name anymore. */
free(handle->name);
handle->name = NULL;
handle->connectionCount = instanceCount;
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
uv_pipe_queue_accept(handle);
return 0;
error:
close_pipe(handle, NULL, NULL);
uv_set_sys_error(errno);
return -1;
}
/* TODO: make this work with UTF8 name */
@ -3140,28 +3132,46 @@ error:
/* Cleans up uv_pipe_t (server or connection) and all resources associated with it */
static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
uv_pipe_instance_t* connection;
uv_pipe_instance_t* connection, *next, *cur, **prev;
HANDLE pipeHandle;
int i;
if (handle->flags & UV_HANDLE_PIPESERVER) {
if (handle->flags & UV_HANDLE_CONNECTION) {
/*
* The handle is for a connection instance on the pipe server.
* To clean-up, we call DisconnectNamedPipe, and return the instance to pending state,
* which will be ready to accept another pipe connection in uv_pipe_queue_accept.
* To clean-up, we call DisconnectNamedPipe, and then uv_pipe_queue_accept will cleanup the allocated uv_pipe_instance_t.
*/
connection = handle->connection;
if (connection && connection->state != UV_PIPEINSTANCE_PENDING && connection->handle != INVALID_HANDLE_VALUE) {
/* Disconnect the connection intance and return it to pending state */
if (connection && connection->handle != INVALID_HANDLE_VALUE) {
/* Disconnect the connection intance and return it to pending state. */
if (DisconnectNamedPipe(connection->handle)) {
connection->state = UV_PIPEINSTANCE_PENDING;
handle->connection = NULL;
if (status) *status = 0;
} else {
if (status) *status = -1;
if (err) *err = uv_new_sys_error(GetLastError());
}
connection->state = UV_PIPEINSTANCE_DISCONNECTED;
connection->handle = NULL;
cur = handle->connections;
handle->connection = NULL;
prev = &handle->server->connections;
/* Remove the connection from the list. */
while (connection) {
if (cur == connection) {
*prev = connection->next;
free(connection);
break;
} else {
prev = &connection->next;
connection = connection->next;
}
}
/* Queue accept now that the instance is in pending state. */
if (!(handle->server->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(handle->server);
@ -3170,7 +3180,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
} else {
/*
* The handle is for the pipe server.
* To clean-up we close every connection instance that was made in uv_pipe_listen.
* To clean-up we close every active and pending connection instance.
*/
if (handle->name) {
@ -3178,23 +3188,31 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
handle->name = NULL;
}
if (handle->connections) {
/* Go through the list of connections, and close each one with CloseHandle. */
for (i = 0; i < handle->connectionCount; i++) {
connection = &handle->connections[i];
if (connection->state != UV_PIPEINSTANCE_DISCONNECTED && connection->handle != INVALID_HANDLE_VALUE) {
CloseHandle(connection->handle);
connection->state = UV_PIPEINSTANCE_DISCONNECTED;
connection->handle = INVALID_HANDLE_VALUE;
}
connection = handle->connections;
while (connection) {
pipeHandle = connection->handle;
if (pipeHandle) {
DisconnectNamedPipe(pipeHandle);
CloseHandle(pipeHandle);
}
/* Free the connections buffer. */
if (handle->connections != handle->connectionsBuffer) {
free(handle->connections);
}
next = connection->next;
free(connection);
connection = next;
}
handle->connections = NULL;
handle->connections = NULL;
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
if (handle->accept_reqs[i].flags & UV_REQ_PENDING) {
pipeHandle = handle->accept_reqs[i].data;
assert(pipeHandle);
DisconnectNamedPipe(pipeHandle);
CloseHandle(pipeHandle);
handle->accept_reqs[i].flags = 0;
handle->reqs_pending--;
}
}
if (status) *status = 0;
@ -3202,7 +3220,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
} else {
/*
* The handle is for a connection instance on the pipe client.
* To clean-up
* To clean-up we close the pipe handle.
*/
connection = handle->connection;
if (connection && connection->handle != INVALID_HANDLE_VALUE) {

View File

@ -265,6 +265,13 @@ static void maybe_connect_some() {
uv_req_init(req, (uv_handle_t*)pipe, connect_cb);
r = uv_pipe_connect(req, TEST_PIPENAME);
ASSERT(r == 0);
#ifdef _WIN32
/* HACK: This is temporary to give the pipes server enough time to create new handles.
* This will go away once uv_pipe_connect can deal with UV_EBUSY.
*/
Sleep(1);
#endif
}
}
}
@ -403,7 +410,7 @@ HELPER_IMPL(pipe_pump_server) {
ASSERT(r == 0);
r = uv_pipe_bind(&pipeServer, TEST_PIPENAME);
ASSERT(r == 0);
r = uv_pipe_listen(&pipeServer, MAX_WRITE_HANDLES, connection_cb);
r = uv_pipe_listen(&pipeServer, connection_cb);
ASSERT(r == 0);
uv_run();

View File

@ -247,7 +247,7 @@ static int pipe_echo_start(char* pipeName) {
return 1;
}
r = uv_pipe_listen(&pipeServer, 1, on_connection);
r = uv_pipe_listen(&pipeServer, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error on IPv6\n");