1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

src: fix events/events_waiting metrics counter (#3957)

The worker pool calls all callbacks locally within the queue. So the
value of nevents doesn't properly reflect that case. Increase the number
of events directly from the worker pool's callback to correct this.

In order to properly determine if the events_waiting counter needs to be
incremented, store the timeout value at the time the event provider was
called.
This commit is contained in:
Trevor Norris 2023-04-17 12:48:39 -06:00
parent b60f08e782
commit e02642cf3b
10 changed files with 240 additions and 7 deletions

View File

@ -311,12 +311,15 @@ void uv__work_done(uv_async_t* handle) {
QUEUE* q;
QUEUE wq;
int err;
int nevents;
loop = container_of(handle, uv_loop_t, wq_async);
uv_mutex_lock(&loop->wq_mutex);
QUEUE_MOVE(&loop->wq, &wq);
uv_mutex_unlock(&loop->wq_mutex);
nevents = 0;
while (!QUEUE_EMPTY(&wq)) {
q = QUEUE_HEAD(&wq);
QUEUE_REMOVE(q);
@ -324,6 +327,20 @@ void uv__work_done(uv_async_t* handle) {
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
nevents++;
}
/* This check accomplishes 2 things:
* 1. Even if the queue was empty, the call to uv__work_done() should count
* as an event. Which will have been added by the event loop when
* calling this callback.
* 2. Prevents accidental wrap around in case nevents == 0 events == 0.
*/
if (nevents > 1) {
/* Subtract 1 to counter the call to uv__work_done(). */
uv__metrics_inc_events(loop, nevents - 1);
if (uv__get_internal_fields(loop)->current_timeout == 0)
uv__metrics_inc_events_waiting(loop, nevents - 1);
}
}

View File

@ -131,6 +131,7 @@ int uv__io_check_fd(uv_loop_t* loop, int fd) {
void uv__io_poll(uv_loop_t* loop, int timeout) {
uv__loop_internal_fields_t* lfields;
struct pollfd events[1024];
struct pollfd pqry;
struct pollfd* pe;
@ -154,6 +155,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
return;
}
lfields = uv__get_internal_fields(loop);
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
@ -217,7 +220,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
if (lfields->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
@ -232,6 +235,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (timeout != 0)
uv__metrics_set_provider_entry_time(loop);
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
nfds = pollset_poll(loop->backend_fd,
events,
ARRAY_SIZE(events),

View File

@ -127,6 +127,7 @@ static void uv__kqueue_delete(int kqfd, const struct kevent *ev) {
void uv__io_poll(uv_loop_t* loop, int timeout) {
uv__loop_internal_fields_t* lfields;
struct kevent events[1024];
struct kevent* ev;
struct timespec spec;
@ -155,6 +156,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
return;
}
lfields = uv__get_internal_fields(loop);
nevents = 0;
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
@ -222,7 +224,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
if (lfields->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
@ -245,6 +247,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (pset != NULL)
pthread_sigmask(SIG_BLOCK, pset, NULL);
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
nfds = kevent(loop->backend_fd,
events,
nevents,

View File

@ -834,12 +834,14 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {
uint32_t tail;
uint32_t mask;
uint32_t i;
int nevents;
head = *iou->cqhead;
tail = atomic_load_explicit((_Atomic uint32_t*) iou->cqtail,
memory_order_acquire);
mask = iou->cqmask;
cqe = iou->cqe;
nevents = 0;
for (i = head; i != tail; i++) {
e = &cqe[i & mask];
@ -865,13 +867,16 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {
uv__metrics_update_idle_time(loop);
req->cb(req);
nevents++;
}
atomic_store_explicit((_Atomic uint32_t*) iou->cqhead,
tail,
memory_order_release);
uv__metrics_inc_events(loop, 1);
uv__metrics_inc_events(loop, nevents);
if (uv__get_internal_fields(loop)->current_timeout == 0)
uv__metrics_inc_events_waiting(loop, nevents);
}
@ -970,6 +975,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (timeout != 0)
uv__metrics_set_provider_entry_time(loop);
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
nfds = epoll_pwait(loop->backend_fd,
events,
ARRAY_SIZE(events),

View File

@ -808,6 +808,7 @@ static int os390_message_queue_handler(uv__os390_epoll* ep) {
void uv__io_poll(uv_loop_t* loop, int timeout) {
static const int max_safe_timeout = 1789569;
uv__loop_internal_fields_t* lfields;
struct epoll_event events[1024];
struct epoll_event* pe;
struct epoll_event e;
@ -830,6 +831,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
return;
}
lfields = uv__get_internal_fields(loop);
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
uv_stream_t* stream;
@ -877,7 +880,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
int nevents = 0;
have_signals = 0;
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
if (lfields->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
@ -896,6 +899,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout)
timeout = max_safe_timeout;
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
nfds = epoll_wait(loop->ep, events,
ARRAY_SIZE(events), timeout);

View File

@ -132,6 +132,7 @@ static void uv__pollfds_del(uv_loop_t* loop, int fd) {
void uv__io_poll(uv_loop_t* loop, int timeout) {
uv__loop_internal_fields_t* lfields;
sigset_t* pset;
sigset_t set;
uint64_t time_base;
@ -152,6 +153,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
return;
}
lfields = uv__get_internal_fields(loop);
/* Take queued watchers and add their fds to our poll fds array. */
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
@ -179,7 +182,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
assert(timeout >= -1);
time_base = loop->time;
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
if (lfields->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
@ -198,6 +201,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (timeout != 0)
uv__metrics_set_provider_entry_time(loop);
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
if (pset != NULL)
if (pthread_sigmask(SIG_BLOCK, pset, NULL))
abort();

View File

@ -421,6 +421,7 @@ struct uv__iou {
struct uv__loop_internal_fields_s {
unsigned int flags;
uv__loop_metrics_t loop_metrics;
int current_timeout;
#ifdef __linux__
struct uv__iou iou;
#endif /* __linux__ */

View File

@ -424,6 +424,7 @@ int uv_backend_timeout(const uv_loop_t* loop) {
static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) {
uv__loop_internal_fields_t* lfields;
DWORD bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
@ -433,9 +434,10 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) {
uint64_t user_timeout;
int reset_timeout;
lfields = uv__get_internal_fields(loop);
timeout_time = loop->time + timeout;
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
if (lfields->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
@ -450,6 +452,12 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) {
if (timeout != 0)
uv__metrics_set_provider_entry_time(loop);
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
GetQueuedCompletionStatus(loop->iocp,
&bytes,
&key,
@ -507,6 +515,7 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) {
static void uv__poll(uv_loop_t* loop, DWORD timeout) {
uv__loop_internal_fields_t* lfields;
BOOL success;
uv_req_t* req;
OVERLAPPED_ENTRY overlappeds[128];
@ -518,9 +527,10 @@ static void uv__poll(uv_loop_t* loop, DWORD timeout) {
uint64_t actual_timeout;
int reset_timeout;
lfields = uv__get_internal_fields(loop);
timeout_time = loop->time + timeout;
if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) {
if (lfields->flags & UV_METRICS_IDLE_TIME) {
reset_timeout = 1;
user_timeout = timeout;
timeout = 0;
@ -537,6 +547,12 @@ static void uv__poll(uv_loop_t* loop, DWORD timeout) {
if (timeout != 0)
uv__metrics_set_provider_entry_time(loop);
/* Store the current timeout in a location that's globally accessible so
* other locations like uv__work_done() can determine whether the queue
* of events in the callback were waiting when poll was called.
*/
lfields->current_timeout = timeout;
success = pGetQueuedCompletionStatusEx(loop->iocp,
overlappeds,
ARRAY_SIZE(overlappeds),

View File

@ -556,6 +556,7 @@ TEST_DECLARE (utf8_decode1_overrun)
TEST_DECLARE (uname)
TEST_DECLARE (metrics_info_check)
TEST_DECLARE (metrics_pool_events)
TEST_DECLARE (metrics_idle_time)
TEST_DECLARE (metrics_idle_time_thread)
TEST_DECLARE (metrics_idle_time_zero)
@ -1192,6 +1193,7 @@ TASK_LIST_START
TEST_HELPER (readable_on_eof, tcp4_echo_server)
TEST_ENTRY (metrics_info_check)
TEST_ENTRY (metrics_pool_events)
TEST_ENTRY (metrics_idle_time)
TEST_ENTRY (metrics_idle_time_thread)
TEST_ENTRY (metrics_idle_time_zero)

View File

@ -34,6 +34,7 @@ typedef struct {
static uint64_t last_events_count;
static char test_buf[] = "test-buffer\n";
static fs_reqs_t fs_reqs;
static int pool_events_counter;
static void timer_spin_cb(uv_timer_t* handle) {
@ -239,3 +240,153 @@ TEST_IMPL(metrics_info_check) {
MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}
static void fs_prepare_cb(uv_prepare_t* handle) {
uv_metrics_t metrics;
ASSERT_OK(uv_metrics_info(uv_default_loop(), &metrics));
if (pool_events_counter == 1)
ASSERT_EQ(metrics.events, metrics.events_waiting);
if (pool_events_counter < 7)
return;
uv_prepare_stop(handle);
pool_events_counter = -42;
}
static void fs_stat_cb(uv_fs_t* req) {
uv_fs_req_cleanup(req);
pool_events_counter++;
}
static void fs_work_cb(uv_work_t* req) {
}
static void fs_after_work_cb(uv_work_t* req, int status) {
free(req);
pool_events_counter++;
}
static void fs_write_cb(uv_fs_t* req) {
uv_work_t* work1 = malloc(sizeof(*work1));
uv_work_t* work2 = malloc(sizeof(*work2));
pool_events_counter++;
uv_fs_req_cleanup(req);
ASSERT_OK(uv_queue_work(uv_default_loop(),
work1,
fs_work_cb,
fs_after_work_cb));
ASSERT_OK(uv_queue_work(uv_default_loop(),
work2,
fs_work_cb,
fs_after_work_cb));
}
static void fs_random_cb(uv_random_t* req, int status, void* buf, size_t len) {
pool_events_counter++;
}
static void fs_addrinfo_cb(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res) {
uv_freeaddrinfo(req->addrinfo);
pool_events_counter++;
}
TEST_IMPL(metrics_pool_events) {
uv_buf_t iov;
uv_fs_t open_req;
uv_fs_t stat1_req;
uv_fs_t stat2_req;
uv_fs_t unlink_req;
uv_fs_t write_req;
uv_getaddrinfo_t addrinfo_req;
uv_metrics_t metrics;
uv_prepare_t prepare;
uv_random_t random_req;
int fd;
char rdata;
ASSERT_OK(uv_loop_configure(uv_default_loop(), UV_METRICS_IDLE_TIME));
uv_fs_unlink(NULL, &unlink_req, "test_file", NULL);
uv_fs_req_cleanup(&unlink_req);
ASSERT_OK(uv_prepare_init(uv_default_loop(), &prepare));
ASSERT_OK(uv_prepare_start(&prepare, fs_prepare_cb));
pool_events_counter = 0;
fd = uv_fs_open(NULL,
&open_req,
"test_file",
O_WRONLY | O_CREAT,
S_IRUSR | S_IWUSR,
NULL);
ASSERT_GT(fd, 0);
uv_fs_req_cleanup(&open_req);
iov = uv_buf_init(test_buf, sizeof(test_buf));
ASSERT_OK(uv_fs_write(uv_default_loop(),
&write_req,
fd,
&iov,
1,
0,
fs_write_cb));
ASSERT_OK(uv_fs_stat(uv_default_loop(),
&stat1_req,
"test_file",
fs_stat_cb));
ASSERT_OK(uv_fs_stat(uv_default_loop(),
&stat2_req,
"test_file",
fs_stat_cb));
ASSERT_OK(uv_random(uv_default_loop(),
&random_req,
&rdata,
1,
0,
fs_random_cb));
ASSERT_OK(uv_getaddrinfo(uv_default_loop(),
&addrinfo_req,
fs_addrinfo_cb,
"example.invalid",
NULL,
NULL));
/* Sleep for a moment to hopefully force the events to complete before
* entering the event loop. */
uv_sleep(100);
ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT));
ASSERT_OK(uv_metrics_info(uv_default_loop(), &metrics));
/* It's possible for uv__work_done() to execute one extra time even though the
* QUEUE has already been cleared out. This has to do with the way we use an
* uv_async to tell the event loop thread to process the worker pool QUEUE. */
ASSERT_GE(metrics.events, 7);
/* It's possible one of the other events also got stuck in the event queue, so
* check GE instead of EQ. Reason for 4 instead of 5 is because the call to
* uv_getaddrinfo() is racey and slow. So can't guarantee that it'll always
* execute before sleep completes. */
ASSERT_GE(metrics.events_waiting, 4);
ASSERT_EQ(pool_events_counter, -42);
uv_fs_unlink(NULL, &unlink_req, "test_file", NULL);
uv_fs_req_cleanup(&unlink_req);
MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}