1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

unix: refactor udp sendmsg code

Shuffle around and DRY the sendmsg logic in preparation for
uv_udp_try_send2(). NFC barring bugs.

This work was sponsored by ISC, the Internet Systems Consortium.
This commit is contained in:
Ben Noordhuis 2024-12-10 23:36:32 +01:00
parent acebb97490
commit 7b4cf04a91
2 changed files with 188 additions and 186 deletions

View File

@ -271,7 +271,10 @@ typedef struct {
#define UV_UDP_SEND_PRIVATE_FIELDS \
struct uv__queue queue; \
struct sockaddr_storage addr; \
union { \
struct sockaddr addr; \
struct sockaddr_storage storage; \
} u; \
unsigned int nbufs; \
uv_buf_t* bufs; \
ssize_t status; \

View File

@ -47,6 +47,10 @@ static void uv__udp_sendmsg(uv_udp_t* handle);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
int domain,
unsigned int flags);
static int uv__udp_sendmsg1(int fd,
const uv_buf_t* bufs,
unsigned int nbufs,
const struct sockaddr* addr);
void uv__udp_close(uv_udp_t* handle) {
@ -282,169 +286,6 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
&& handle->recv_cb != NULL);
}
static void uv__udp_sendmsg_one(uv_udp_t* handle, uv_udp_send_t* req) {
struct uv__queue* q;
struct msghdr h;
ssize_t size;
for (;;) {
memset(&h, 0, sizeof h);
if (req->addr.ss_family == AF_UNSPEC) {
h.msg_name = NULL;
h.msg_namelen = 0;
} else {
h.msg_name = &req->addr;
if (req->addr.ss_family == AF_INET6)
h.msg_namelen = sizeof(struct sockaddr_in6);
else if (req->addr.ss_family == AF_INET)
h.msg_namelen = sizeof(struct sockaddr_in);
else if (req->addr.ss_family == AF_UNIX)
h.msg_namelen = sizeof(struct sockaddr_un);
else {
assert(0 && "unsupported address family");
abort();
}
}
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;
do
size = sendmsg(handle->io_watcher.fd, &h, 0);
while (size == -1 && errno == EINTR);
if (size == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
return;
req->status = (size == -1 ? UV__ERR(errno) : size);
/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
uv__io_feed(handle->loop, &handle->io_watcher);
if (uv__queue_empty(&handle->write_queue))
return;
q = uv__queue_head(&handle->write_queue);
req = uv__queue_data(q, uv_udp_send_t, queue);
}
}
#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
static void uv__udp_sendmsg_many(uv_udp_t* handle) {
uv_udp_send_t* req;
struct mmsghdr h[20];
struct mmsghdr* p;
struct uv__queue* q;
ssize_t npkts;
size_t pkts;
size_t i;
write_queue_drain:
for (pkts = 0, q = uv__queue_head(&handle->write_queue);
pkts < ARRAY_SIZE(h) && q != &handle->write_queue;
++pkts, q = uv__queue_head(q)) {
req = uv__queue_data(q, uv_udp_send_t, queue);
p = &h[pkts];
memset(p, 0, sizeof(*p));
if (req->addr.ss_family == AF_UNSPEC) {
p->msg_hdr.msg_name = NULL;
p->msg_hdr.msg_namelen = 0;
} else {
p->msg_hdr.msg_name = &req->addr;
if (req->addr.ss_family == AF_INET6)
p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
else if (req->addr.ss_family == AF_INET)
p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
else if (req->addr.ss_family == AF_UNIX)
p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
else {
assert(0 && "unsupported address family");
abort();
}
}
h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
h[pkts].msg_hdr.msg_iovlen = req->nbufs;
}
#if defined(__APPLE__)
do
npkts = sendmsg_x(handle->io_watcher.fd, h, pkts, MSG_DONTWAIT);
while (npkts == -1 && errno == EINTR);
#else
do
npkts = sendmmsg(handle->io_watcher.fd, h, pkts, 0);
while (npkts == -1 && errno == EINTR);
#endif
if (npkts < 1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
return;
for (i = 0, q = uv__queue_head(&handle->write_queue);
i < pkts && q != &handle->write_queue;
++i, q = uv__queue_head(&handle->write_queue)) {
req = uv__queue_data(q, uv_udp_send_t, queue);
req->status = UV__ERR(errno);
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
}
uv__io_feed(handle->loop, &handle->io_watcher);
return;
}
/* Safety: npkts known to be >0 below. Hence cast from ssize_t
* to size_t safe.
*/
for (i = 0, q = uv__queue_head(&handle->write_queue);
i < (size_t)npkts && q != &handle->write_queue;
++i, q = uv__queue_head(&handle->write_queue)) {
req = uv__queue_data(q, uv_udp_send_t, queue);
req->status = req->bufs[0].len;
/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
}
/* couldn't batch everything, continue sending (jump to avoid stack growth) */
if (!uv__queue_empty(&handle->write_queue))
goto write_queue_drain;
uv__io_feed(handle->loop, &handle->io_watcher);
}
#endif /* __linux__ || ____FreeBSD__ || __APPLE__ */
static void uv__udp_sendmsg(uv_udp_t* handle) {
struct uv__queue* q;
uv_udp_send_t* req;
if (uv__queue_empty(&handle->write_queue))
return;
q = uv__queue_head(&handle->write_queue);
req = uv__queue_data(q, uv_udp_send_t, queue);
#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
/* Use sendmmsg() if this send request contains more than one datagram OR
* there is more than one send request (because that automatically implies
* there is more than one datagram.)
*/
if (req->nbufs != 1 || &handle->write_queue != uv__queue_next(&req->queue))
return uv__udp_sendmsg_many(handle);
#endif
return uv__udp_sendmsg_one(handle, req);
}
/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
* refinements for programs that use multicast. Therefore we preferentially
@ -743,11 +584,11 @@ int uv__udp_send(uv_udp_send_t* req,
empty_queue = (handle->send_queue_count == 0);
uv__req_init(handle->loop, req, UV_UDP_SEND);
assert(addrlen <= sizeof(req->addr));
assert(addrlen <= sizeof(req->u.storage));
if (addr == NULL)
req->addr.ss_family = AF_UNSPEC;
req->u.storage.ss_family = AF_UNSPEC;
else
memcpy(&req->addr, addr, addrlen);
memcpy(&req->u.storage, addr, addrlen);
req->send_cb = send_cb;
req->handle = handle;
req->nbufs = nbufs;
@ -790,8 +631,6 @@ int uv__udp_try_send(uv_udp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen) {
int err;
struct msghdr h;
ssize_t size;
if (nbufs < 1)
return UV_EINVAL;
@ -808,24 +647,11 @@ int uv__udp_try_send(uv_udp_t* handle,
assert(handle->flags & UV_HANDLE_UDP_CONNECTED);
}
memset(&h, 0, sizeof h);
h.msg_name = (struct sockaddr*) addr;
h.msg_namelen = addrlen;
h.msg_iov = (struct iovec*) bufs;
h.msg_iovlen = nbufs;
err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr);
if (err > 0)
return uv__count_bufs(bufs, nbufs);
do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
} while (size == -1 && errno == EINTR);
if (size == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
return UV_EAGAIN;
else
return UV__ERR(errno);
}
return size;
return err;
}
@ -1402,3 +1228,176 @@ int uv__udp_recv_stop(uv_udp_t* handle) {
return 0;
}
static int uv__udp_prep_pkt(struct msghdr* h,
const uv_buf_t* bufs,
const unsigned int nbufs,
const struct sockaddr* addr) {
memset(h, 0, sizeof(*h));
h->msg_name = (void*) addr;
h->msg_iov = (void*) bufs;
h->msg_iovlen = nbufs;
if (addr == NULL)
return 0;
switch (addr->sa_family) {
case AF_INET:
h->msg_namelen = sizeof(struct sockaddr_in);
return 0;
case AF_INET6:
h->msg_namelen = sizeof(struct sockaddr_in6);
return 0;
case AF_UNIX:
h->msg_namelen = sizeof(struct sockaddr_un);
return 0;
case AF_UNSPEC:
h->msg_name = NULL;
return 0;
}
return UV_EINVAL;
}
static int uv__udp_sendmsg1(int fd,
const uv_buf_t* bufs,
unsigned int nbufs,
const struct sockaddr* addr) {
struct msghdr h;
int r;
if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr)))
return r;
do
r = sendmsg(fd, &h, 0);
while (r == -1 && errno == EINTR);
if (r < 0) {
r = UV__ERR(errno);
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
r = UV_EAGAIN;
return r;
}
/* UDP sockets don't EOF so we don't have to handle r=0 specially,
* that only happens when the input was a zero-sized buffer.
*/
return 1;
}
static int uv__udp_sendmsgv(int fd,
unsigned int count,
uv_buf_t* bufs[/*count*/],
unsigned int nbufs[/*count*/],
struct sockaddr* addrs[/*count*/]) {
unsigned int i;
int nsent;
int r;
r = 0;
nsent = 0;
#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__)
if (count > 1) {
for (i = 0; i < count; /*empty*/) {
struct mmsghdr m[20];
unsigned int n;
for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++)
if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i])))
goto exit;
do
#if defined(__APPLE__)
r = sendmsg_x(fd, m, n, MSG_DONTWAIT);
#else
r = sendmmsg(fd, m, n, 0);
#endif
while (r == -1 && errno == EINTR);
if (r < 1)
goto exit;
nsent += r;
i += r;
}
goto exit;
}
#endif /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) */
for (i = 0; i < count; i++, nsent++)
if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i])))
goto exit; /* goto to avoid unused label warning. */
exit:
if (nsent > 0)
return nsent;
if (r < 0) {
r = UV__ERR(errno);
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
r = UV_EAGAIN;
}
return r;
}
static void uv__udp_sendmsg(uv_udp_t* handle) {
static const int N = 20;
struct sockaddr* addrs[N];
unsigned int nbufs[N];
uv_buf_t* bufs[N];
struct uv__queue* q;
uv_udp_send_t* req;
int n;
if (uv__queue_empty(&handle->write_queue))
return;
again:
n = 0;
q = uv__queue_head(&handle->write_queue);
do {
req = uv__queue_data(q, uv_udp_send_t, queue);
addrs[n] = &req->u.addr;
nbufs[n] = req->nbufs;
bufs[n] = req->bufs;
q = uv__queue_next(q);
n++;
} while (n < N && q != &handle->write_queue);
n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs);
while (n > 0) {
q = uv__queue_head(&handle->write_queue);
req = uv__queue_data(q, uv_udp_send_t, queue);
req->status = uv__count_bufs(req->bufs, req->nbufs);
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
n--;
}
if (n == 0) {
if (uv__queue_empty(&handle->write_queue))
goto feed;
goto again;
}
if (n == UV_EAGAIN)
return;
/* Register the error against first request in queue because that
* is the request that uv__udp_sendmsgv tried but failed to send,
* because if it did send any requests, it won't return an error.
*/
q = uv__queue_head(&handle->write_queue);
req = uv__queue_data(q, uv_udp_send_t, queue);
req->status = n;
uv__queue_remove(&req->queue);
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
feed:
uv__io_feed(handle->loop, &handle->io_watcher);
}