From 1ee1063402c7af60b26661bf16cb525e4a08dd0a Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Thu, 30 May 2024 15:53:44 +0800 Subject: [PATCH] test: fix the flaky test-tcp-reuseport Start connecting to the peers after all threads to poll for accepting connections. Ref: #4407 --- test/test-tcp-reuseport.c | 110 ++++++++++++++++++++++++-------------- 1 file changed, 69 insertions(+), 41 deletions(-) diff --git a/test/test-tcp-reuseport.c b/test/test-tcp-reuseport.c index 296301b8..2e5adec2 100644 --- a/test/test-tcp-reuseport.c +++ b/test/test-tcp-reuseport.c @@ -53,31 +53,35 @@ TEST_IMPL(tcp_reuseport) { #else +#define NUM_LISTENING_THREADS 2 #define MAX_TCP_CLIENTS 10 static uv_tcp_t tcp_connect_handles[MAX_TCP_CLIENTS]; static uv_connect_t tcp_connect_requests[MAX_TCP_CLIENTS]; -static unsigned int main_loop_accepted; -static unsigned int thread_loop_accepted; -static unsigned int connected; +static uv_sem_t semaphore; static uv_mutex_t mutex; static unsigned int accepted; +static unsigned int thread_loop1_accepted; +static unsigned int thread_loop2_accepted; +static unsigned int connected; + static uv_loop_t* main_loop; -static uv_loop_t* thread_loop; -static uv_tcp_t main_handle; -static uv_tcp_t thread_handle; -static uv_timer_t main_timer_handle; -static uv_timer_t thread_timer_handle; +static uv_loop_t* thread_loop1; +static uv_loop_t* thread_loop2; +static uv_tcp_t thread_handle1; +static uv_tcp_t thread_handle2; +static uv_timer_t thread_timer_handle1; +static uv_timer_t thread_timer_handle2; static void on_close(uv_handle_t* handle) { free(handle); } static void ticktack(uv_timer_t* timer) { - ASSERT(timer == &main_timer_handle || timer == &thread_timer_handle); + ASSERT(timer == &thread_timer_handle1 || timer == &thread_timer_handle2); int done = 0; uv_mutex_lock(&mutex); @@ -88,29 +92,29 @@ static void ticktack(uv_timer_t* timer) { if (done) { uv_close((uv_handle_t*) timer, NULL); - if (timer->loop == main_loop) - uv_close((uv_handle_t*) &main_handle, NULL); - if (timer->loop == thread_loop) - uv_close((uv_handle_t*) &thread_handle, NULL); + if (timer->loop == thread_loop1) + uv_close((uv_handle_t*) &thread_handle1, NULL); + if (timer->loop == thread_loop2) + uv_close((uv_handle_t*) &thread_handle2, NULL); } } static void on_connection(uv_stream_t* server, int status) { ASSERT_OK(status); - ASSERT(server == (uv_stream_t*) &main_handle || \ - server == (uv_stream_t*) &thread_handle); + ASSERT(server == (uv_stream_t*) &thread_handle1 || \ + server == (uv_stream_t*) &thread_handle2); uv_tcp_t *client = malloc(sizeof(uv_tcp_t)); ASSERT_OK(uv_tcp_init(server->loop, client)); ASSERT_OK(uv_accept(server, (uv_stream_t*) client)); uv_close((uv_handle_t*) client, on_close); - if (server->loop == main_loop) - main_loop_accepted++; + if (server->loop == thread_loop1) + thread_loop1_accepted++; - if (server->loop == thread_loop) - thread_loop_accepted++; + if (server->loop == thread_loop2) + thread_loop2_accepted++; uv_mutex_lock(&mutex); accepted++; @@ -129,8 +133,10 @@ static void on_connect(uv_connect_t* req, int status) { static void run_event_loop(void* arg) { int r; uv_loop_t* loop = (uv_loop_t*) arg; - ASSERT_PTR_EQ(loop, thread_loop); + ASSERT(loop == thread_loop1 || loop == thread_loop2); + /* Notify the main thread to start connecting. */ + uv_sem_post(&semaphore); r = uv_run(loop, UV_RUN_DEFAULT); ASSERT_OK(r); } @@ -154,26 +160,47 @@ static void create_listener(uv_loop_t* loop, uv_tcp_t* handle) { TEST_IMPL(tcp_reuseport) { struct sockaddr_in addr; int r; + int i; r = uv_mutex_init(&mutex); - /* Create listener per event loop. */ + r = uv_sem_init(&semaphore, 0); + ASSERT_OK(r); + main_loop = uv_default_loop(); ASSERT_NOT_NULL(main_loop); - create_listener(main_loop, &main_handle); - uv_timer_init(main_loop, &main_timer_handle); - uv_timer_start(&main_timer_handle, ticktack, 0, 10); - thread_loop = uv_loop_new(); - ASSERT_NOT_NULL(thread_loop); - create_listener(thread_loop, &thread_handle); - uv_timer_init(thread_loop, &thread_timer_handle); - uv_timer_start(&thread_timer_handle, ticktack, 0, 10); + /* Create listener per event loop. */ - /* Connect to the peers. */ + thread_loop1 = uv_loop_new(); + ASSERT_NOT_NULL(thread_loop1); + create_listener(thread_loop1, &thread_handle1); + uv_timer_init(thread_loop1, &thread_timer_handle1); + uv_timer_start(&thread_timer_handle1, ticktack, 0, 10); + + thread_loop2 = uv_loop_new(); + ASSERT_NOT_NULL(thread_loop2); + create_listener(thread_loop2, &thread_handle2); + uv_timer_init(thread_loop2, &thread_timer_handle2); + uv_timer_start(&thread_timer_handle2, ticktack, 0, 10); + + /* Run event loops of listeners in separate threads. */ + uv_thread_t thread_loop_id1; + uv_thread_t thread_loop_id2; + uv_thread_create(&thread_loop_id1, run_event_loop, thread_loop1); + uv_thread_create(&thread_loop_id2, run_event_loop, thread_loop2); + + /* Wait until all threads to poll for accepting connections + * before we start to connect. Otherwise the incoming connections + * might not be distributed across all listening threads. */ + for (i = 0; i < NUM_LISTENING_THREADS; i++) + uv_sem_wait(&semaphore); + /* Now we know all threads are up and entering the uv_run(), + * but we still sleep a little bit just for dual fail-safe. */ + uv_sleep(100); + + /* Start connecting to the peers. */ ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); - - int i; for (i = 0; i < MAX_TCP_CLIENTS; i++) { r = uv_tcp_init(main_loop, &tcp_connect_handles[i]); ASSERT_OK(r); @@ -184,14 +211,12 @@ TEST_IMPL(tcp_reuseport) { ASSERT_OK(r); } - /* Run event loops and wait for them to exit. */ - uv_thread_t thread_loop_id; - uv_thread_create(&thread_loop_id, run_event_loop, thread_loop); - r = uv_run(main_loop, UV_RUN_DEFAULT); ASSERT_OK(r); - uv_thread_join(&thread_loop_id); + /* Wait for all threads to exit. */ + uv_thread_join(&thread_loop_id1); + uv_thread_join(&thread_loop_id2); /* Verify if each listener per event loop accepted connections * and the amount of accepted connections matches the one of @@ -199,14 +224,17 @@ TEST_IMPL(tcp_reuseport) { */ ASSERT_EQ(accepted, MAX_TCP_CLIENTS); ASSERT_EQ(connected, MAX_TCP_CLIENTS); - ASSERT_GT(main_loop_accepted, 0); - ASSERT_GT(thread_loop_accepted, 0); - ASSERT_EQ(main_loop_accepted + thread_loop_accepted, connected); + ASSERT_GT(thread_loop1_accepted, 0); + ASSERT_GT(thread_loop2_accepted, 0); + ASSERT_EQ(thread_loop1_accepted + thread_loop2_accepted, connected); /* Clean up. */ uv_mutex_destroy(&mutex); - uv_loop_delete(thread_loop); + uv_sem_destroy(&semaphore); + + uv_loop_delete(thread_loop1); + uv_loop_delete(thread_loop2); MAKE_VALGRIND_HAPPY(main_loop); return 0;