1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

linux: don't use sendmmsg() for single datagrams (#4366)

Benchmarking shows that sendmsg() is persistently around 1% faster for
single datagrams, and that kind of stands to reason because there is
less setup overhead, and the kernel has to copy in less data.

Fixes: https://github.com/libuv/libuv/issues/4320
This commit is contained in:
Ben Noordhuis 2024-03-25 12:45:23 +01:00 committed by GitHub
parent d05ed869bb
commit 77e4cd5b18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -275,8 +275,61 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
&& handle->recv_cb != NULL);
}
static void uv__udp_sendmsg(uv_udp_t* handle) {
static void uv__udp_sendmsg_one(uv_udp_t* handle, uv_udp_send_t* req) {
struct uv__queue* q;
struct msghdr h;
ssize_t size;
for (;;) {
memset(&h, 0, sizeof h);
if (req->addr.ss_family == AF_UNSPEC) {
h.msg_name = NULL;
h.msg_namelen = 0;
} else {
h.msg_name = &req->addr;
if (req->addr.ss_family == AF_INET6)
h.msg_namelen = sizeof(struct sockaddr_in6);
else if (req->addr.ss_family == AF_INET)
h.msg_namelen = sizeof(struct sockaddr_in);
else if (req->addr.ss_family == AF_UNIX)
h.msg_namelen = sizeof(struct sockaddr_un);
else {
assert(0 && "unsupported address family");
abort();
}
}
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;
do
size = sendmsg(handle->io_watcher.fd, &h, 0);
while (size == -1 && errno == EINTR);
if (size == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
return;
req->status = (size == -1 ? UV__ERR(errno) : size);
/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
uv__io_feed(handle->loop, &handle->io_watcher);
if (uv__queue_empty(&handle->write_queue))
return;
q = uv__queue_head(&handle->write_queue);
req = uv__queue_data(q, uv_udp_send_t, queue);
}
}
#if defined(__linux__) || defined(__FreeBSD__)
static void uv__udp_sendmsg_many(uv_udp_t* handle) {
uv_udp_send_t* req;
struct mmsghdr h[20];
struct mmsghdr* p;
@ -285,16 +338,11 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
size_t pkts;
size_t i;
if (uv__queue_empty(&handle->write_queue))
return;
write_queue_drain:
for (pkts = 0, q = uv__queue_head(&handle->write_queue);
pkts < ARRAY_SIZE(h) && q != &handle->write_queue;
++pkts, q = uv__queue_head(q)) {
assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
assert(req != NULL);
p = &h[pkts];
memset(p, 0, sizeof(*p));
@ -328,10 +376,7 @@ write_queue_drain:
for (i = 0, q = uv__queue_head(&handle->write_queue);
i < pkts && q != &handle->write_queue;
++i, q = uv__queue_head(&handle->write_queue)) {
assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
assert(req != NULL);
req->status = UV__ERR(errno);
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
@ -346,10 +391,7 @@ write_queue_drain:
for (i = 0, q = uv__queue_head(&handle->write_queue);
i < (size_t)npkts && q != &handle->write_queue;
++i, q = uv__queue_head(&handle->write_queue)) {
assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
assert(req != NULL);
req->status = req->bufs[0].len;
/* Sending a datagram is an atomic operation: either all data
@ -364,61 +406,31 @@ write_queue_drain:
/* couldn't batch everything, continue sending (jump to avoid stack growth) */
if (!uv__queue_empty(&handle->write_queue))
goto write_queue_drain;
uv__io_feed(handle->loop, &handle->io_watcher);
#else /* __linux__ || ____FreeBSD__ */
uv_udp_send_t* req;
struct msghdr h;
struct uv__queue* q;
ssize_t size;
while (!uv__queue_empty(&handle->write_queue)) {
q = uv__queue_head(&handle->write_queue);
assert(q != NULL);
req = uv__queue_data(q, uv_udp_send_t, queue);
assert(req != NULL);
memset(&h, 0, sizeof h);
if (req->addr.ss_family == AF_UNSPEC) {
h.msg_name = NULL;
h.msg_namelen = 0;
} else {
h.msg_name = &req->addr;
if (req->addr.ss_family == AF_INET6)
h.msg_namelen = sizeof(struct sockaddr_in6);
else if (req->addr.ss_family == AF_INET)
h.msg_namelen = sizeof(struct sockaddr_in);
else if (req->addr.ss_family == AF_UNIX)
h.msg_namelen = sizeof(struct sockaddr_un);
else {
assert(0 && "unsupported address family");
abort();
}
}
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;
do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
} while (size == -1 && errno == EINTR);
if (size == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
break;
}
req->status = (size == -1 ? UV__ERR(errno) : size);
/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
uv__io_feed(handle->loop, &handle->io_watcher);
}
}
#endif /* __linux__ || ____FreeBSD__ */
static void uv__udp_sendmsg(uv_udp_t* handle) {
struct uv__queue* q;
uv_udp_send_t* req;
if (uv__queue_empty(&handle->write_queue))
return;
q = uv__queue_head(&handle->write_queue);
req = uv__queue_data(q, uv_udp_send_t, queue);
#if defined(__linux__) || defined(__FreeBSD__)
/* Use sendmmsg() if this send request contains more than one datagram OR
* there is more than one send request (because that automatically implies
* there is more than one datagram.)
*/
if (req->nbufs != 1 || &handle->write_queue != uv__queue_next(&req->queue))
return uv__udp_sendmsg_many(handle);
#endif
return uv__udp_sendmsg_one(handle, req);
}
/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional