1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

linux: use io_uring to batch epoll_ctl calls (#3979)

This work was sponsored by ISC, the Internet Systems Consortium.
This commit is contained in:
Ben Noordhuis 2023-05-01 09:00:08 +02:00 committed by GitHub
parent f272082240
commit 6e073ef5da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 288 additions and 124 deletions

View File

@ -149,9 +149,11 @@ enum {
UV__IORING_OP_OPENAT = 18,
UV__IORING_OP_CLOSE = 19,
UV__IORING_OP_STATX = 21,
UV__IORING_OP_EPOLL_CTL = 29,
};
enum {
UV__IORING_ENTER_GETEVENTS = 1u,
UV__IORING_ENTER_SQ_WAKEUP = 2u,
};
@ -248,6 +250,10 @@ STATIC_ASSERT(40 + 40 + 40 == sizeof(struct uv__io_uring_params));
STATIC_ASSERT(40 == offsetof(struct uv__io_uring_params, sq_off));
STATIC_ASSERT(80 == offsetof(struct uv__io_uring_params, cq_off));
STATIC_ASSERT(EPOLL_CTL_ADD < 4);
STATIC_ASSERT(EPOLL_CTL_DEL < 4);
STATIC_ASSERT(EPOLL_CTL_MOD < 4);
struct watcher_list {
RB_ENTRY(watcher_list) entry;
QUEUE watchers;
@ -269,6 +275,17 @@ static int compare_watchers(const struct watcher_list* a,
static void maybe_free_watcher_list(struct watcher_list* w,
uv_loop_t* loop);
static void uv__epoll_ctl_flush(int epollfd,
struct uv__iou* ctl,
struct epoll_event (*events)[256]);
static void uv__epoll_ctl_prep(int epollfd,
struct uv__iou* ctl,
struct epoll_event (*events)[256],
int op,
int fd,
struct epoll_event* e);
RB_GENERATE_STATIC(watcher_root, watcher_list, entry, compare_watchers)
@ -384,45 +401,40 @@ static int uv__use_io_uring(void) {
}
int uv__platform_loop_init(uv_loop_t* loop) {
static void uv__iou_init(int epollfd,
struct uv__iou* iou,
uint32_t entries,
uint32_t flags) {
struct uv__io_uring_params params;
struct epoll_event e;
struct uv__iou* iou;
size_t cqlen;
size_t sqlen;
size_t maxlen;
size_t sqelen;
uint32_t i;
char* sq;
char* sqe;
int ringfd;
iou = &uv__get_internal_fields(loop)->iou;
iou->ringfd = -1;
loop->inotify_watchers = NULL;
loop->inotify_fd = -1;
loop->backend_fd = epoll_create1(O_CLOEXEC);
if (loop->backend_fd == -1)
return UV__ERR(errno);
if (!uv__use_io_uring())
return 0;
sq = MAP_FAILED;
sqe = MAP_FAILED;
if (!uv__use_io_uring())
return;
/* SQPOLL required CAP_SYS_NICE until linux v5.12 relaxed that requirement.
* Mostly academic because we check for a v5.13 kernel afterwards anyway.
*/
memset(&params, 0, sizeof(params));
params.flags = UV__IORING_SETUP_SQPOLL;
params.sq_thread_idle = 10; /* milliseconds */
params.flags = flags;
if (flags & UV__IORING_SETUP_SQPOLL)
params.sq_thread_idle = 10; /* milliseconds */
/* Kernel returns a file descriptor with O_CLOEXEC flag set. */
ringfd = uv__io_uring_setup(64, &params);
ringfd = uv__io_uring_setup(entries, &params);
if (ringfd == -1)
return 0; /* Not an error, falls back to thread pool. */
return;
/* IORING_FEAT_RSRC_TAGS is used to detect linux v5.13 but what we're
* actually detecting is whether IORING_OP_STATX works with SQPOLL.
@ -461,6 +473,18 @@ int uv__platform_loop_init(uv_loop_t* loop) {
if (sq == MAP_FAILED || sqe == MAP_FAILED)
goto fail;
if (flags & UV__IORING_SETUP_SQPOLL) {
/* Only interested in completion events. To get notified when
* the kernel pulls items from the submission ring, add POLLOUT.
*/
memset(&e, 0, sizeof(e));
e.events = POLLIN;
e.data.fd = ringfd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, ringfd, &e))
goto fail;
}
iou->sqhead = (uint32_t*) (sq + params.sq_off.head);
iou->sqtail = (uint32_t*) (sq + params.sq_off.tail);
iou->sqmask = *(uint32_t*) (sq + params.sq_off.ring_mask);
@ -479,17 +503,10 @@ int uv__platform_loop_init(uv_loop_t* loop) {
iou->ringfd = ringfd;
iou->in_flight = 0;
/* Only interested in completion events. To get notified when
* the kernel pulls items from the submission ring, add POLLOUT.
*/
memset(&e, 0, sizeof(e));
e.events = POLLIN;
e.data.fd = ringfd;
for (i = 0; i <= iou->sqmask; i++)
iou->sqarray[i] = i; /* Slot -> sqe identity mapping. */
if (epoll_ctl(loop->backend_fd, EPOLL_CTL_ADD, ringfd, &e))
goto fail;
return 0;
return;
fail:
if (sq != MAP_FAILED)
@ -499,8 +516,37 @@ fail:
munmap(sqe, sqelen);
uv__close(ringfd);
}
return 0; /* Not an error, falls back to thread pool. */
static void uv__iou_delete(struct uv__iou* iou) {
if (iou->ringfd != -1) {
munmap(iou->sq, iou->maxlen);
munmap(iou->sqe, iou->sqelen);
uv__close(iou->ringfd);
iou->ringfd = -1;
}
}
int uv__platform_loop_init(uv_loop_t* loop) {
uv__loop_internal_fields_t* lfields;
lfields = uv__get_internal_fields(loop);
lfields->ctl.ringfd = -1;
lfields->iou.ringfd = -1;
loop->inotify_watchers = NULL;
loop->inotify_fd = -1;
loop->backend_fd = epoll_create1(O_CLOEXEC);
if (loop->backend_fd == -1)
return UV__ERR(errno);
uv__iou_init(loop->backend_fd, &lfields->iou, 64, UV__IORING_SETUP_SQPOLL);
uv__iou_init(loop->backend_fd, &lfields->ctl, 256, 0);
return 0;
}
@ -525,54 +571,62 @@ int uv__io_fork(uv_loop_t* loop) {
void uv__platform_loop_delete(uv_loop_t* loop) {
struct uv__iou* iou;
uv__loop_internal_fields_t* lfields;
iou = &uv__get_internal_fields(loop)->iou;
lfields = uv__get_internal_fields(loop);
uv__iou_delete(&lfields->ctl);
uv__iou_delete(&lfields->iou);
if (loop->inotify_fd != -1) {
uv__io_stop(loop, &loop->inotify_read_watcher, POLLIN);
uv__close(loop->inotify_fd);
loop->inotify_fd = -1;
}
if (iou->ringfd != -1) {
munmap(iou->sq, iou->maxlen);
munmap(iou->sqe, iou->sqelen);
uv__close(iou->ringfd);
iou->ringfd = -1;
}
}
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
struct uv__invalidate {
struct epoll_event (*prep)[256];
struct epoll_event* events;
int nfds;
};
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
uv__loop_internal_fields_t* lfields;
struct uv__invalidate* inv;
struct epoll_event dummy;
uintptr_t i;
uintptr_t nfds;
int i;
assert(loop->watchers != NULL);
assert(fd >= 0);
lfields = uv__get_internal_fields(loop);
inv = lfields->inv;
events = (struct epoll_event*) loop->watchers[loop->nwatchers];
nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1];
if (events != NULL)
/* Invalidate events with same file descriptor */
for (i = 0; i < nfds; i++)
if (events[i].data.fd == fd)
events[i].data.fd = -1;
/* Invalidate events with same file descriptor */
if (inv != NULL)
for (i = 0; i < inv->nfds; i++)
if (inv->events[i].data.fd == fd)
inv->events[i].data.fd = -1;
/* Remove the file descriptor from the epoll.
* This avoids a problem where the same file description remains open
* in another process, causing repeated junk epoll events.
*
* We pass in a dummy epoll_event, to work around a bug in old kernels.
*
* Work around a bug in kernels 3.10 to 3.19 where passing a struct that
* has the EPOLLWAKEUP flag set generates spurious audit syslog warnings.
*/
if (loop->backend_fd >= 0) {
/* Work around a bug in kernels 3.10 to 3.19 where passing a struct that
* has the EPOLLWAKEUP flag set generates spurious audit syslog warnings.
*/
memset(&dummy, 0, sizeof(dummy));
memset(&dummy, 0, sizeof(dummy));
if (inv == NULL) {
epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, &dummy);
} else {
uv__epoll_ctl_prep(loop->backend_fd,
&lfields->ctl,
inv->prep,
EPOLL_CTL_DEL,
fd,
&dummy);
}
}
@ -620,8 +674,6 @@ static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou,
return NULL; /* No room in ring buffer. TODO(bnoordhuis) maybe flush it? */
slot = tail & mask;
iou->sqarray[slot] = slot; /* Identity mapping of index -> sqe. */
sqe = iou->sqe;
sqe = &sqe[slot];
memset(sqe, 0, sizeof(*sqe));
@ -652,7 +704,7 @@ static void uv__iou_submit(struct uv__iou* iou) {
if (flags & UV__IORING_SQ_NEED_WAKEUP)
if (uv__io_uring_enter(iou->ringfd, 0, 0, UV__IORING_ENTER_SQ_WAKEUP))
perror("libuv: io_uring_enter"); /* Can't happen. */
perror("libuv: io_uring_enter(wakeup)"); /* Can't happen. */
}
@ -885,11 +937,135 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {
}
static void uv__epoll_ctl_prep(int epollfd,
struct uv__iou* ctl,
struct epoll_event (*events)[256],
int op,
int fd,
struct epoll_event* e) {
struct uv__io_uring_sqe* sqe;
struct epoll_event* pe;
uint32_t mask;
uint32_t slot;
if (ctl->ringfd == -1) {
if (!epoll_ctl(epollfd, op, fd, e))
return;
if (op == EPOLL_CTL_DEL)
return; /* Ignore errors, may be racing with another thread. */
if (op != EPOLL_CTL_ADD)
abort();
if (errno != EEXIST)
abort();
/* File descriptor that's been watched before, update event mask. */
if (!epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, e))
return;
abort();
} else {
mask = ctl->sqmask;
slot = (*ctl->sqtail)++ & mask;
pe = &(*events)[slot];
*pe = *e;
sqe = ctl->sqe;
sqe = &sqe[slot];
memset(sqe, 0, sizeof(*sqe));
sqe->addr = (uintptr_t) pe;
sqe->fd = epollfd;
sqe->len = op;
sqe->off = fd;
sqe->opcode = UV__IORING_OP_EPOLL_CTL;
sqe->user_data = op | slot << 2 | (int64_t) fd << 32;
if ((*ctl->sqhead & mask) == (*ctl->sqtail & mask))
uv__epoll_ctl_flush(epollfd, ctl, events);
}
}
static void uv__epoll_ctl_flush(int epollfd,
struct uv__iou* ctl,
struct epoll_event (*events)[256]) {
struct epoll_event oldevents[256];
struct uv__io_uring_cqe* cqe;
uint32_t oldslot;
uint32_t slot;
uint32_t n;
int fd;
int op;
int rc;
STATIC_ASSERT(sizeof(oldevents) == sizeof(*events));
assert(ctl->ringfd != -1);
assert(*ctl->sqhead != *ctl->sqtail);
n = *ctl->sqtail - *ctl->sqhead;
do
rc = uv__io_uring_enter(ctl->ringfd, n, n, UV__IORING_ENTER_GETEVENTS);
while (rc == -1 && errno == EINTR);
if (rc < 0)
perror("libuv: io_uring_enter(getevents)"); /* Can't happen. */
if (rc != (int) n)
abort();
assert(*ctl->sqhead == *ctl->sqtail);
memcpy(oldevents, *events, sizeof(*events));
/* Failed submissions are either EPOLL_CTL_DEL commands for file descriptors
* that have been closed, or EPOLL_CTL_ADD commands for file descriptors
* that we are already watching. Ignore the former and retry the latter
* with EPOLL_CTL_MOD.
*/
while (*ctl->cqhead != *ctl->cqtail) {
slot = (*ctl->cqhead)++ & ctl->cqmask;
cqe = ctl->cqe;
cqe = &cqe[slot];
if (cqe->res == 0)
continue;
fd = cqe->user_data >> 32;
op = 3 & cqe->user_data;
oldslot = 255 & (cqe->user_data >> 2);
if (op == EPOLL_CTL_DEL)
continue;
if (op != EPOLL_CTL_ADD)
abort();
if (cqe->res != -EEXIST)
abort();
uv__epoll_ctl_prep(epollfd,
ctl,
events,
EPOLL_CTL_MOD,
fd,
&oldevents[oldslot]);
}
}
void uv__io_poll(uv_loop_t* loop, int timeout) {
uv__loop_internal_fields_t* lfields;
struct epoll_event events[1024];
struct epoll_event prep[256];
struct uv__invalidate inv;
struct epoll_event* pe;
struct epoll_event e;
struct uv__iou* ctl;
struct uv__iou* iou;
int real_timeout;
QUEUE* q;
@ -900,6 +1076,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
int have_iou_events;
int have_signals;
int nevents;
int epollfd;
int count;
int nfds;
int fd;
@ -909,45 +1086,9 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
int reset_timeout;
lfields = uv__get_internal_fields(loop);
ctl = &lfields->ctl;
iou = &lfields->iou;
memset(&e, 0, sizeof(e));
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);
e.events = w->pevents;
e.data.fd = w->fd;
if (w->events == 0)
op = EPOLL_CTL_ADD;
else
op = EPOLL_CTL_MOD;
/* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
* events, skip the syscall and squelch the events after epoll_wait().
*/
if (epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();
assert(op == EPOLL_CTL_ADD);
/* We've reactivated a file descriptor that's been watched before. */
if (epoll_ctl(loop->backend_fd, EPOLL_CTL_MOD, w->fd, &e))
abort();
}
w->events = w->pevents;
}
sigmask = NULL;
if (loop->flags & UV_LOOP_BLOCK_SIGPROF) {
sigemptyset(&sigset);
@ -969,10 +1110,42 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
user_timeout = 0;
}
epollfd = loop->backend_fd;
memset(&e, 0, sizeof(e));
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
op = EPOLL_CTL_MOD;
if (w->events == 0)
op = EPOLL_CTL_ADD;
w->events = w->pevents;
e.events = w->pevents;
e.data.fd = w->fd;
uv__epoll_ctl_prep(epollfd, ctl, &prep, op, w->fd, &e);
}
inv.events = events;
inv.prep = &prep;
inv.nfds = -1;
for (;;) {
if (loop->nfds == 0)
if (iou->in_flight == 0)
return;
break;
/* All event mask mutations should be visible to the kernel before
* we enter epoll_pwait().
*/
if (ctl->ringfd != -1)
while (*ctl->sqhead != *ctl->sqtail)
uv__epoll_ctl_flush(epollfd, ctl, &prep);
/* Only need to set the provider_entry_time if timeout != 0. The function
* will return early if the loop isn't configured with UV_METRICS_IDLE_TIME.
@ -986,11 +1159,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
*/
lfields->current_timeout = timeout;
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout,
sigmask);
nfds = epoll_pwait(epollfd, events, ARRAY_SIZE(events), timeout, sigmask);
/* Update loop->time unconditionally. It's tempting to skip the update when
* timeout == 0 (i.e. non-blocking poll) but there is no guarantee that the
@ -1010,7 +1179,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
continue;
if (timeout == 0)
return;
break;
/* We may have been inside the system call for longer than |timeout|
* milliseconds so we need to update the timestamp to avoid drift.
@ -1031,7 +1200,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
continue;
if (timeout == 0)
return;
break;
/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
@ -1041,18 +1210,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
have_signals = 0;
nevents = 0;
{
/* Squelch a -Waddress-of-packed-member warning with gcc >= 9. */
union {
struct epoll_event* events;
uv__io_t* watchers;
} x;
x.events = events;
assert(loop->watchers != NULL);
loop->watchers[loop->nwatchers] = x.watchers;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
}
inv.nfds = nfds;
lfields->inv = &inv;
for (i = 0; i < nfds; i++) {
pe = events + i;
@ -1079,7 +1238,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
* Ignore all errors because we may be racing with another thread
* when the file descriptor is closed.
*/
epoll_ctl(loop->backend_fd, EPOLL_CTL_DEL, fd, pe);
uv__epoll_ctl_prep(epollfd, ctl, &prep, EPOLL_CTL_DEL, fd, pe);
continue;
}
@ -1136,14 +1295,13 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
loop->signal_io_watcher.cb(loop, &loop->signal_io_watcher, POLLIN);
}
loop->watchers[loop->nwatchers] = NULL;
loop->watchers[loop->nwatchers + 1] = NULL;
lfields->inv = NULL;
if (have_iou_events != 0)
return; /* Event loop should cycle now so don't poll again. */
break; /* Event loop should cycle now so don't poll again. */
if (have_signals != 0)
return; /* Event loop should cycle now so don't poll again. */
break; /* Event loop should cycle now so don't poll again. */
if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
@ -1151,11 +1309,11 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
timeout = 0;
continue;
}
return;
break;
}
if (timeout == 0)
return;
break;
if (timeout == -1)
continue;
@ -1165,10 +1323,14 @@ update_timeout:
real_timeout -= (loop->time - base);
if (real_timeout <= 0)
return;
break;
timeout = real_timeout;
}
if (ctl->ringfd != -1)
while (*ctl->sqhead != *ctl->sqtail)
uv__epoll_ctl_flush(epollfd, ctl, &prep);
}
uint64_t uv__hrtime(uv_clocktype_t type) {

View File

@ -423,7 +423,9 @@ struct uv__loop_internal_fields_s {
uv__loop_metrics_t loop_metrics;
int current_timeout;
#ifdef __linux__
struct uv__iou ctl;
struct uv__iou iou;
void* inv; /* used by uv__platform_invalidate_fd() */
#endif /* __linux__ */
};