From 7b4cf04a914116d998c5ebcbf2d53b6848b2f65e Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Tue, 10 Dec 2024 23:36:32 +0100 Subject: [PATCH] unix: refactor udp sendmsg code Shuffle around and DRY the sendmsg logic in preparation for uv_udp_try_send2(). NFC barring bugs. This work was sponsored by ISC, the Internet Systems Consortium. --- include/uv/unix.h | 5 +- src/unix/udp.c | 369 +++++++++++++++++++++++----------------------- 2 files changed, 188 insertions(+), 186 deletions(-) diff --git a/include/uv/unix.h b/include/uv/unix.h index 538f98b6..7c972026 100644 --- a/include/uv/unix.h +++ b/include/uv/unix.h @@ -271,7 +271,10 @@ typedef struct { #define UV_UDP_SEND_PRIVATE_FIELDS \ struct uv__queue queue; \ - struct sockaddr_storage addr; \ + union { \ + struct sockaddr addr; \ + struct sockaddr_storage storage; \ + } u; \ unsigned int nbufs; \ uv_buf_t* bufs; \ ssize_t status; \ diff --git a/src/unix/udp.c b/src/unix/udp.c index c3e30314..045e5fe8 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -47,6 +47,10 @@ static void uv__udp_sendmsg(uv_udp_t* handle); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain, unsigned int flags); +static int uv__udp_sendmsg1(int fd, + const uv_buf_t* bufs, + unsigned int nbufs, + const struct sockaddr* addr); void uv__udp_close(uv_udp_t* handle) { @@ -282,169 +286,6 @@ static void uv__udp_recvmsg(uv_udp_t* handle) { && handle->recv_cb != NULL); } -static void uv__udp_sendmsg_one(uv_udp_t* handle, uv_udp_send_t* req) { - struct uv__queue* q; - struct msghdr h; - ssize_t size; - - for (;;) { - memset(&h, 0, sizeof h); - if (req->addr.ss_family == AF_UNSPEC) { - h.msg_name = NULL; - h.msg_namelen = 0; - } else { - h.msg_name = &req->addr; - if (req->addr.ss_family == AF_INET6) - h.msg_namelen = sizeof(struct sockaddr_in6); - else if (req->addr.ss_family == AF_INET) - h.msg_namelen = sizeof(struct sockaddr_in); - else if (req->addr.ss_family == AF_UNIX) - h.msg_namelen = sizeof(struct sockaddr_un); - else { - assert(0 && "unsupported address family"); - abort(); - } - } - h.msg_iov = (struct iovec*) req->bufs; - h.msg_iovlen = req->nbufs; - - do - size = sendmsg(handle->io_watcher.fd, &h, 0); - while (size == -1 && errno == EINTR); - - if (size == -1) - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) - return; - - req->status = (size == -1 ? UV__ERR(errno) : size); - - /* Sending a datagram is an atomic operation: either all data - * is written or nothing is (and EMSGSIZE is raised). That is - * why we don't handle partial writes. Just pop the request - * off the write queue and onto the completed queue, done. - */ - uv__queue_remove(&req->queue); - uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); - uv__io_feed(handle->loop, &handle->io_watcher); - - if (uv__queue_empty(&handle->write_queue)) - return; - - q = uv__queue_head(&handle->write_queue); - req = uv__queue_data(q, uv_udp_send_t, queue); - } -} - -#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) -static void uv__udp_sendmsg_many(uv_udp_t* handle) { - uv_udp_send_t* req; - struct mmsghdr h[20]; - struct mmsghdr* p; - struct uv__queue* q; - ssize_t npkts; - size_t pkts; - size_t i; - -write_queue_drain: - for (pkts = 0, q = uv__queue_head(&handle->write_queue); - pkts < ARRAY_SIZE(h) && q != &handle->write_queue; - ++pkts, q = uv__queue_head(q)) { - req = uv__queue_data(q, uv_udp_send_t, queue); - - p = &h[pkts]; - memset(p, 0, sizeof(*p)); - if (req->addr.ss_family == AF_UNSPEC) { - p->msg_hdr.msg_name = NULL; - p->msg_hdr.msg_namelen = 0; - } else { - p->msg_hdr.msg_name = &req->addr; - if (req->addr.ss_family == AF_INET6) - p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6); - else if (req->addr.ss_family == AF_INET) - p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in); - else if (req->addr.ss_family == AF_UNIX) - p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un); - else { - assert(0 && "unsupported address family"); - abort(); - } - } - h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs; - h[pkts].msg_hdr.msg_iovlen = req->nbufs; - } - -#if defined(__APPLE__) - do - npkts = sendmsg_x(handle->io_watcher.fd, h, pkts, MSG_DONTWAIT); - while (npkts == -1 && errno == EINTR); -#else - do - npkts = sendmmsg(handle->io_watcher.fd, h, pkts, 0); - while (npkts == -1 && errno == EINTR); -#endif - - if (npkts < 1) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) - return; - for (i = 0, q = uv__queue_head(&handle->write_queue); - i < pkts && q != &handle->write_queue; - ++i, q = uv__queue_head(&handle->write_queue)) { - req = uv__queue_data(q, uv_udp_send_t, queue); - req->status = UV__ERR(errno); - uv__queue_remove(&req->queue); - uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); - } - uv__io_feed(handle->loop, &handle->io_watcher); - return; - } - - /* Safety: npkts known to be >0 below. Hence cast from ssize_t - * to size_t safe. - */ - for (i = 0, q = uv__queue_head(&handle->write_queue); - i < (size_t)npkts && q != &handle->write_queue; - ++i, q = uv__queue_head(&handle->write_queue)) { - req = uv__queue_data(q, uv_udp_send_t, queue); - req->status = req->bufs[0].len; - - /* Sending a datagram is an atomic operation: either all data - * is written or nothing is (and EMSGSIZE is raised). That is - * why we don't handle partial writes. Just pop the request - * off the write queue and onto the completed queue, done. - */ - uv__queue_remove(&req->queue); - uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); - } - - /* couldn't batch everything, continue sending (jump to avoid stack growth) */ - if (!uv__queue_empty(&handle->write_queue)) - goto write_queue_drain; - - uv__io_feed(handle->loop, &handle->io_watcher); -} -#endif /* __linux__ || ____FreeBSD__ || __APPLE__ */ - -static void uv__udp_sendmsg(uv_udp_t* handle) { - struct uv__queue* q; - uv_udp_send_t* req; - - if (uv__queue_empty(&handle->write_queue)) - return; - - q = uv__queue_head(&handle->write_queue); - req = uv__queue_data(q, uv_udp_send_t, queue); - -#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) - /* Use sendmmsg() if this send request contains more than one datagram OR - * there is more than one send request (because that automatically implies - * there is more than one datagram.) - */ - if (req->nbufs != 1 || &handle->write_queue != uv__queue_next(&req->queue)) - return uv__udp_sendmsg_many(handle); -#endif - - return uv__udp_sendmsg_one(handle, req); -} /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional * refinements for programs that use multicast. Therefore we preferentially @@ -743,11 +584,11 @@ int uv__udp_send(uv_udp_send_t* req, empty_queue = (handle->send_queue_count == 0); uv__req_init(handle->loop, req, UV_UDP_SEND); - assert(addrlen <= sizeof(req->addr)); + assert(addrlen <= sizeof(req->u.storage)); if (addr == NULL) - req->addr.ss_family = AF_UNSPEC; + req->u.storage.ss_family = AF_UNSPEC; else - memcpy(&req->addr, addr, addrlen); + memcpy(&req->u.storage, addr, addrlen); req->send_cb = send_cb; req->handle = handle; req->nbufs = nbufs; @@ -790,8 +631,6 @@ int uv__udp_try_send(uv_udp_t* handle, const struct sockaddr* addr, unsigned int addrlen) { int err; - struct msghdr h; - ssize_t size; if (nbufs < 1) return UV_EINVAL; @@ -808,24 +647,11 @@ int uv__udp_try_send(uv_udp_t* handle, assert(handle->flags & UV_HANDLE_UDP_CONNECTED); } - memset(&h, 0, sizeof h); - h.msg_name = (struct sockaddr*) addr; - h.msg_namelen = addrlen; - h.msg_iov = (struct iovec*) bufs; - h.msg_iovlen = nbufs; + err = uv__udp_sendmsg1(handle->io_watcher.fd, bufs, nbufs, addr); + if (err > 0) + return uv__count_bufs(bufs, nbufs); - do { - size = sendmsg(handle->io_watcher.fd, &h, 0); - } while (size == -1 && errno == EINTR); - - if (size == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) - return UV_EAGAIN; - else - return UV__ERR(errno); - } - - return size; + return err; } @@ -1402,3 +1228,176 @@ int uv__udp_recv_stop(uv_udp_t* handle) { return 0; } + + +static int uv__udp_prep_pkt(struct msghdr* h, + const uv_buf_t* bufs, + const unsigned int nbufs, + const struct sockaddr* addr) { + memset(h, 0, sizeof(*h)); + h->msg_name = (void*) addr; + h->msg_iov = (void*) bufs; + h->msg_iovlen = nbufs; + if (addr == NULL) + return 0; + switch (addr->sa_family) { + case AF_INET: + h->msg_namelen = sizeof(struct sockaddr_in); + return 0; + case AF_INET6: + h->msg_namelen = sizeof(struct sockaddr_in6); + return 0; + case AF_UNIX: + h->msg_namelen = sizeof(struct sockaddr_un); + return 0; + case AF_UNSPEC: + h->msg_name = NULL; + return 0; + } + return UV_EINVAL; +} + + +static int uv__udp_sendmsg1(int fd, + const uv_buf_t* bufs, + unsigned int nbufs, + const struct sockaddr* addr) { + struct msghdr h; + int r; + + if ((r = uv__udp_prep_pkt(&h, bufs, nbufs, addr))) + return r; + + do + r = sendmsg(fd, &h, 0); + while (r == -1 && errno == EINTR); + + if (r < 0) { + r = UV__ERR(errno); + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) + r = UV_EAGAIN; + return r; + } + + /* UDP sockets don't EOF so we don't have to handle r=0 specially, + * that only happens when the input was a zero-sized buffer. + */ + return 1; +} + + +static int uv__udp_sendmsgv(int fd, + unsigned int count, + uv_buf_t* bufs[/*count*/], + unsigned int nbufs[/*count*/], + struct sockaddr* addrs[/*count*/]) { + unsigned int i; + int nsent; + int r; + + r = 0; + nsent = 0; + +#if defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) + if (count > 1) { + for (i = 0; i < count; /*empty*/) { + struct mmsghdr m[20]; + unsigned int n; + + for (n = 0; i < count && n < ARRAY_SIZE(m); i++, n++) + if ((r = uv__udp_prep_pkt(&m[n].msg_hdr, bufs[i], nbufs[i], addrs[i]))) + goto exit; + + do +#if defined(__APPLE__) + r = sendmsg_x(fd, m, n, MSG_DONTWAIT); +#else + r = sendmmsg(fd, m, n, 0); +#endif + while (r == -1 && errno == EINTR); + + if (r < 1) + goto exit; + + nsent += r; + i += r; + } + + goto exit; + } +#endif /* defined(__linux__) || defined(__FreeBSD__) || defined(__APPLE__) */ + + for (i = 0; i < count; i++, nsent++) + if ((r = uv__udp_sendmsg1(fd, bufs[i], nbufs[i], addrs[i]))) + goto exit; /* goto to avoid unused label warning. */ + +exit: + + if (nsent > 0) + return nsent; + + if (r < 0) { + r = UV__ERR(errno); + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) + r = UV_EAGAIN; + } + + return r; +} + + +static void uv__udp_sendmsg(uv_udp_t* handle) { + static const int N = 20; + struct sockaddr* addrs[N]; + unsigned int nbufs[N]; + uv_buf_t* bufs[N]; + struct uv__queue* q; + uv_udp_send_t* req; + int n; + + if (uv__queue_empty(&handle->write_queue)) + return; + +again: + n = 0; + q = uv__queue_head(&handle->write_queue); + do { + req = uv__queue_data(q, uv_udp_send_t, queue); + addrs[n] = &req->u.addr; + nbufs[n] = req->nbufs; + bufs[n] = req->bufs; + q = uv__queue_next(q); + n++; + } while (n < N && q != &handle->write_queue); + + n = uv__udp_sendmsgv(handle->io_watcher.fd, n, bufs, nbufs, addrs); + while (n > 0) { + q = uv__queue_head(&handle->write_queue); + req = uv__queue_data(q, uv_udp_send_t, queue); + req->status = uv__count_bufs(req->bufs, req->nbufs); + uv__queue_remove(&req->queue); + uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); + n--; + } + + if (n == 0) { + if (uv__queue_empty(&handle->write_queue)) + goto feed; + goto again; + } + + if (n == UV_EAGAIN) + return; + + /* Register the error against first request in queue because that + * is the request that uv__udp_sendmsgv tried but failed to send, + * because if it did send any requests, it won't return an error. + */ + q = uv__queue_head(&handle->write_queue); + req = uv__queue_data(q, uv_udp_send_t, queue); + req->status = n; + uv__queue_remove(&req->queue); + uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); +feed: + uv__io_feed(handle->loop, &handle->io_watcher); +}