1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

Merge 75279c308ebb91c2794d418241aeb17ab644dcbc into c1a9f01f226133e3b8539ce678871fe1c0a68270

This commit is contained in:
Juan José 2025-03-27 08:49:40 +01:00 committed by GitHub
commit 6c30553972
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 282 additions and 0 deletions

View File

@ -647,6 +647,7 @@ if(LIBUV_BUILD_TESTS)
test/test-tcp-connect-timeout.c
test/test-tcp-connect6-error.c
test/test-tcp-create-socket-early.c
test/test-tcp-exportimport.c
test/test-tcp-flags.c
test/test-tcp-oob.c
test/test-tcp-open.c

View File

@ -273,6 +273,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-tcp-connect-error.c \
test/test-tcp-connect-timeout.c \
test/test-tcp-connect6-error.c \
test/test-tcp-exportimport.c \
test/test-tcp-flags.c \
test/test-tcp-open.c \
test/test-tcp-read-stop.c \

View File

@ -637,6 +637,26 @@ UV_EXTERN int uv_tcp_connect(uv_connect_t* req,
const struct sockaddr* addr,
uv_connect_cb cb);
/*
* Exports a uv_tcp_t handle by duplicating its underlying file descriptor.
*
* This allows the socket to be safely imported and used by another
* libuv event loop or thread using `uv_tcp_import()`.
*/
UV_EXTERN int uv_tcp_export(uv_tcp_t* stream, int* fd);
/*
* Imports a TCP socket file descriptor into a libuv TCP handle.
*
* This function initializes a user-provided `uv_tcp_t` structure and binds it
* to an existing, valid file descriptor (mostly obtained via `uv_tcp_export`)
*/
UV_EXTERN int uv_tcp_import(uv_loop_t* loop,
int fd,
uv_tcp_t* out,
unsigned int flags);
/* uv_connect_t is a subclass of uv_req_t. */
struct uv_connect_s {
UV_REQ_FIELDS

View File

@ -28,6 +28,7 @@
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
@ -1477,6 +1478,50 @@ int uv_read_stop(uv_stream_t* stream) {
}
int uv_tcp_export(uv_tcp_t* stream, int* fd) {
#ifndef F_DUPFD_CLOEXEC /* POSIX 2008 */
int err;
#endif
if (stream->type != UV_TCP)
return UV_EINVAL;
/* Clone the inner fd. Start from a safe number (3). */
#ifdef F_DUPFD_CLOEXEC /* POSIX 2008 */
*fd = fcntl(stream->io_watcher.fd, F_DUPFD_CLOEXEC, 3);
#else
*fd = fcntl(stream->io_watcher.fd, F_DUPFD, 3);
#endif
if (*fd == -1)
return UV__ERR(errno);
#ifndef F_DUPFD_CLOEXEC /* POSIX 2008 */
err = uv__cloexec(fd, 1);
if (err != 0) {
uv__close(fd);
return err;
}
#endif
return 0;
}
int uv_tcp_import(uv_loop_t* loop, int fd, uv_tcp_t* out, unsigned int flags) {
int err;
err = uv_tcp_init_ex(loop, out, flags);
if (err)
return err;
err = uv_tcp_open(out, fd);
if (err) {
uv_close((uv_handle_t*)out, NULL);
return err;
}
return 0;
}
int uv_is_readable(const uv_stream_t* stream) {
return !!(stream->flags & UV_HANDLE_READABLE);
}

View File

@ -250,3 +250,13 @@ int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
return 0;
}
int uv_tcp_export(uv_tcp_t* stream, int* fd) {
return UV_ENOSYS;
}
int uv_tcp_import(uv_loop_t* loop, int fd, uv_tcp_t* out, unsigned int flags) {
return UV_ENOSYS;
}

View File

@ -148,6 +148,8 @@ TEST_DECLARE (tcp_create_early_accept)
#ifndef _WIN32
TEST_DECLARE (tcp_close_accept)
TEST_DECLARE (tcp_oob)
TEST_DECLARE (tcp_exportimport_listen_after_write)
TEST_DECLARE (tcp_exportimport_listen_before_write)
#endif
TEST_DECLARE (tcp_flags)
TEST_DECLARE (tcp_write_to_half_open_connection)
@ -770,6 +772,8 @@ TASK_LIST_START
#ifndef _WIN32
TEST_ENTRY (tcp_close_accept)
TEST_ENTRY (tcp_oob)
TEST_ENTRY (tcp_exportimport_listen_after_write)
TEST_ENTRY (tcp_exportimport_listen_before_write)
#endif
TEST_ENTRY (tcp_flags)
TEST_ENTRY (tcp_write_to_half_open_connection)

View File

@ -0,0 +1,201 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
/* Win implementation is not done yet. */
#ifndef _WIN32
#include "uv.h"
#include "runner.h"
#include "task.h"
typedef struct {
uv_loop_t* loop;
uv_thread_t thread;
uv_async_t* recv_channel;
uv_async_t* send_channel;
uv_tcp_t server;
uv_tcp_t conn;
int connection_accepted;
int close_cb_called;
} worker_t;
static uv_async_t send_channel;
static uv_async_t recv_channel;
static worker_t parent;
static worker_t child;
static int dup_fd_handle = -1;
typedef struct {
uv_connect_t conn_req;
uv_tcp_t conn;
} tcp_conn;
#define CONN_COUNT 100
static tcp_conn conns[CONN_COUNT];
static void close_cb(uv_handle_t* handle) {
worker_t* worker = handle->data;
ASSERT_NOT_NULL(worker);
worker->close_cb_called++;
}
static void on_connection(uv_stream_t* server, int status) {
worker_t* worker = container_of(server, worker_t, server);
ASSERT_NOT_NULL(worker);
ASSERT(worker == &parent || worker == &child);
if (!worker->connection_accepted) {
/* Accept the connection and close it. */
ASSERT_OK(status);
ASSERT_OK(uv_tcp_init(server->loop, &worker->conn));
worker->conn.data = worker;
ASSERT_OK(uv_accept(server, (uv_stream_t*)&worker->conn));
worker->connection_accepted = 1;
uv_close((uv_handle_t*)worker->recv_channel, close_cb);
uv_close((uv_handle_t*)&worker->conn, close_cb);
uv_close((uv_handle_t*)server, close_cb);
}
}
static void connect_cb(uv_connect_t* req, int status) {
uv_close((uv_handle_t*)req->handle, NULL);
}
static void make_many_connections(void) {
tcp_conn* conn;
struct sockaddr_in addr;
int i;
for (i = 0; i < (int)ARRAY_SIZE(conns); i++) {
conn = &conns[i];
ASSERT_OK(uv_tcp_init(uv_default_loop(), &conn->conn));
ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
ASSERT_OK(uv_tcp_connect(&conn->conn_req,
(uv_tcp_t*)&conn->conn,
(struct sockaddr*) &addr,
connect_cb));
conn->conn.data = conn;
}
}
void on_parent_msg(uv_async_t* handle) {
parent.server.data = &parent;
/* Import the shared TCP server, and start listening on it. */
ASSERT_OK(uv_tcp_import(parent.loop, dup_fd_handle, &parent.server, 0));
ASSERT_OK(uv_listen((uv_stream_t*)&parent.server, 12, on_connection));
ASSERT_EQ(parent.loop, parent.server.loop);
/* Create a bunch of connections to get both servers to accept. */
make_many_connections();
}
void on_child_msg(uv_async_t* handle) {
ASSERT(!"no");
}
static void child_thread_entry(void* arg) {
int listen_after_write = *(int*) arg;
struct sockaddr_in addr;
ASSERT_OK(uv_tcp_init(child.loop, &child.server));
child.server.data = &child;
ASSERT_OK(uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
ASSERT_OK(uv_tcp_bind(&child.server, (struct sockaddr*) &addr, 0));
if (!listen_after_write)
ASSERT_OK(uv_listen((uv_stream_t*)&child.server, 12, on_connection));
ASSERT_OK(uv_tcp_export(&child.server, &dup_fd_handle));
ASSERT_GT(dup_fd_handle, -1);
ASSERT_OK(uv_async_send(child.send_channel));
if (listen_after_write)
ASSERT_OK(uv_listen((uv_stream_t*)&child.server, 12, on_connection));
ASSERT_OK(uv_run(child.loop, UV_RUN_DEFAULT));
ASSERT(child.connection_accepted == 1);
ASSERT(child.close_cb_called == 3);
}
static void run_tcp_exportimport_test(int listen_after_write) {
parent.send_channel = &send_channel;
parent.recv_channel = &recv_channel;
child.send_channel = &recv_channel;
child.recv_channel = &send_channel;
parent.loop = uv_default_loop();
child.loop = uv_loop_new();
ASSERT(child.loop);
ASSERT_OK(uv_async_init(parent.loop, parent.recv_channel, on_parent_msg));
parent.recv_channel->data = &parent;
ASSERT_OK(uv_async_init(child.loop, child.recv_channel, on_child_msg));
child.recv_channel->data = &child;
ASSERT_OK(uv_thread_create(&child.thread,
child_thread_entry,
&listen_after_write));
ASSERT_OK(uv_run(parent.loop, UV_RUN_DEFAULT));
MAKE_VALGRIND_HAPPY(parent.loop);
ASSERT_EQ(parent.connection_accepted, 1);
ASSERT_EQ(parent.close_cb_called, 3);
ASSERT_OK(uv_thread_join(&child.thread));
MAKE_VALGRIND_HAPPY(child.loop);
}
TEST_IMPL(tcp_exportimport_listen_after_write) {
run_tcp_exportimport_test(1);
return 0;
}
TEST_IMPL(tcp_exportimport_listen_before_write) {
run_tcp_exportimport_test(0);
return 0;
}
#endif /* !_WIN32 */