1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

Split pump bench into two processes.

And add a pump test with 1 client.
This commit is contained in:
Ryan Dahl 2011-05-27 01:32:38 -07:00
parent e3923b76b4
commit f0ebf0c7fc
2 changed files with 141 additions and 77 deletions

View File

@ -21,7 +21,9 @@
BENCHMARK_DECLARE (sizes)
BENCHMARK_DECLARE (ping_pongs)
BENCHMARK_DECLARE (pump)
BENCHMARK_DECLARE (pump100_client)
BENCHMARK_DECLARE (pump1_client)
HELPER_DECLARE (pump_server)
HELPER_DECLARE (echo_server)
TASK_LIST_START
@ -30,5 +32,9 @@ TASK_LIST_START
BENCHMARK_ENTRY (ping_pongs)
BENCHMARK_HELPER (ping_pongs, echo_server)
BENCHMARK_ENTRY (pump)
BENCHMARK_ENTRY (pump100_client)
BENCHMARK_HELPER (pump100_client, pump_server)
BENCHMARK_ENTRY (pump1_client)
BENCHMARK_HELPER (pump1_client, pump_server)
TASK_LIST_END

View File

@ -26,7 +26,7 @@
#include <stdio.h>
#define TARGET_CONNECTIONS 100
static int TARGET_CONNECTIONS;
#define WRITE_BUFFER_SIZE 8192
#define MAX_SIMULTANEOUS_CONNECTS 100
@ -45,12 +45,14 @@ static uv_buf_t buf_alloc(uv_handle_t* handle, size_t size);
static void buf_free(uv_buf_t uv_buf_t);
static uv_handle_t server;
static struct sockaddr_in listen_addr;
static struct sockaddr_in connect_addr;
static int64_t start_time;
static int max_connect_socket = 0;
static int max_read_sockets = 0;
static int read_sockets = 0;
static int write_sockets = 0;
@ -63,8 +65,10 @@ static int stats_left = 0;
static char write_buffer[WRITE_BUFFER_SIZE];
static uv_handle_t read_handles[TARGET_CONNECTIONS];
static uv_handle_t write_handles[TARGET_CONNECTIONS];
/* Make this as large as you need. */
#define MAX_WRITE_HANDLES 1000
static uv_handle_t write_handles[MAX_WRITE_HANDLES];
static uv_handle_t timer_handle;
@ -79,9 +83,8 @@ static void show_stats(uv_handle_t *handle, int status) {
int64_t diff;
#if PRINT_STATS
LOGF("connections: %d, read: %.1f gbit/s, write: %.1f gbit/s\n",
read_sockets,
gbit(nrecv, STATS_INTERVAL),
LOGF("connections: %d, write: %.1f gbit/s\n",
write_sockets,
gbit(nsent, STATS_INTERVAL));
#endif
@ -91,8 +94,8 @@ static void show_stats(uv_handle_t *handle, int status) {
uv_update_time();
diff = uv_now() - start_time;
LOGF("pump_%d: %.1f gbit/s\n", read_sockets,
gbit(nrecv_total, diff));
LOGF("pump%d_client: %.1f gbit/s\n", write_sockets,
gbit(nsent_total, diff));
exit(0);
}
@ -103,8 +106,38 @@ static void show_stats(uv_handle_t *handle, int status) {
}
void close_cb(uv_handle_t* handle, int status) {
static void read_show_stats() {
int64_t diff;
uv_update_time();
diff = uv_now() - start_time;
LOGF("pump%d_server: %.1f gbit/s\n", max_read_sockets,
gbit(nrecv_total, diff));
}
void write_sockets_close_cb(uv_handle_t* handle, int status) {
ASSERT(status == 0);
/* If any client closes, the process is done. */
exit(0);
}
void read_sockets_close_cb(uv_handle_t* handle, int status) {
ASSERT(status == 0);
free(handle);
read_sockets--;
/* If it's past the first second and everyone has closed their connection
* Then print stats.
*/
if (uv_now() - start_time > 1000 && read_sockets == 0) {
read_show_stats();
uv_close(&server);
}
}
@ -114,15 +147,27 @@ static void start_stats_collection() {
/* Show-stats timer */
stats_left = STATS_COUNT;
r = uv_timer_init(&timer_handle, close_cb, NULL);
r = uv_timer_init(&timer_handle, NULL, NULL);
ASSERT(r == 0);
r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL);
ASSERT(r == 0);
uv_update_time();
start_time = uv_now();
}
static void read_cb(uv_handle_t* handle, int bytes, uv_buf_t buf) {
ASSERT(bytes >= 0);
if (nrecv_total == 0) {
ASSERT(start_time == 0);
uv_update_time();
start_time = uv_now();
}
if (bytes < 0) {
uv_close(handle);
return;
}
buf_free(buf);
@ -162,11 +207,19 @@ static void do_write(uv_handle_t* handle) {
}
}
static void maybe_start_writing() {
static void connect_cb(uv_req_t* req, int status) {
int i;
if (read_sockets == TARGET_CONNECTIONS &&
write_sockets == TARGET_CONNECTIONS) {
if (status) LOG(uv_strerror(uv_last_error()));
ASSERT(status == 0);
write_sockets++;
req_free(req);
maybe_connect_some();
if (write_sockets == TARGET_CONNECTIONS) {
start_stats_collection();
/* Yay! start writing */
@ -177,86 +230,42 @@ static void maybe_start_writing() {
}
static void connect_cb(uv_req_t* req, int status) {
if (status) LOG(uv_strerror(uv_last_error()));
ASSERT(status == 0);
write_sockets++;
req_free(req);
maybe_connect_some();
maybe_start_writing();
}
static void do_connect(uv_handle_t* handle, struct sockaddr* addr) {
static void maybe_connect_some() {
uv_req_t* req;
uv_handle_t* handle;
int r;
r = uv_tcp_init(handle, close_cb, NULL);
ASSERT(r == 0);
req = req_alloc();
uv_req_init(req, handle, connect_cb);
r = uv_connect(req, addr);
ASSERT(r == 0);
}
static void maybe_connect_some() {
while (max_connect_socket < TARGET_CONNECTIONS &&
max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) {
do_connect(&write_handles[max_connect_socket++],
(struct sockaddr*) &connect_addr);
handle = &write_handles[max_connect_socket++];
r = uv_tcp_init(handle, write_sockets_close_cb, NULL);
ASSERT(r == 0);
req = req_alloc();
uv_req_init(req, handle, connect_cb);
r = uv_connect(req, (struct sockaddr*) &connect_addr);
ASSERT(r == 0);
}
}
static void accept_cb(uv_handle_t* server) {
static void accept_cb(uv_handle_t* s) {
uv_handle_t* handle;
int r;
ASSERT(read_sockets < TARGET_CONNECTIONS);
handle = &read_handles[read_sockets];
ASSERT(&server == s);
r = uv_accept(server, handle, close_cb, NULL);
handle = malloc(sizeof(uv_handle_t));
r = uv_accept(s, handle, read_sockets_close_cb, NULL);
ASSERT(r == 0);
r = uv_read_start(handle, read_cb);
ASSERT(r == 0);
read_sockets++;
maybe_start_writing();
}
BENCHMARK_IMPL(pump) {
uv_handle_t server;
int r;
uv_init(buf_alloc);
listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT);
connect_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
/* Server */
r = uv_tcp_init(&server, close_cb, NULL);
ASSERT(r == 0);
r = uv_bind(&server, (struct sockaddr*) &listen_addr);
ASSERT(r == 0);
r = uv_listen(&server, TARGET_CONNECTIONS, accept_cb);
ASSERT(r == 0);
uv_update_time();
start_time = uv_now();
/* Start making connections */
maybe_connect_some();
uv_run();
return 0;
max_read_sockets++;
}
@ -331,3 +340,52 @@ static void buf_free(uv_buf_t uv_buf_t) {
buf->next = buf_freelist;
buf_freelist = buf;
}
HELPER_IMPL(pump_server) {
int r;
uv_init(buf_alloc);
listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT);
/* Server */
r = uv_tcp_init(&server, NULL, NULL);
ASSERT(r == 0);
r = uv_bind(&server, (struct sockaddr*) &listen_addr);
ASSERT(r == 0);
r = uv_listen(&server, MAX_WRITE_HANDLES, accept_cb);
ASSERT(r == 0);
uv_run();
return 0;
}
void pump(int n) {
int r;
ASSERT(n <= MAX_WRITE_HANDLES);
TARGET_CONNECTIONS = n;
uv_init(buf_alloc);
connect_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
/* Start making connections */
maybe_connect_some();
uv_run();
}
BENCHMARK_IMPL(pump100_client) {
pump(100);
return 0;
}
BENCHMARK_IMPL(pump1_client) {
pump(1);
return 0;
}