Added demand-spawning of threads, and prespawn_threads config argument

This commit is contained in:
Jeremy Friesner 2023-05-13 18:40:46 -07:00
parent 5dcd7e25e4
commit 4f89995634
6 changed files with 114 additions and 42 deletions

View File

@ -213,10 +213,15 @@ about web server instance:
When `mg_start()` returns, all initialization is guaranteed to be complete
(e.g. listening ports are opened, SSL is initialized, etc). `mg_start()` starts
some threads: a master thread, that accepts new connections, and several
worker threads, that process accepted connections. The number of worker threads
is configurable via `num_threads` configuration option. That number puts a
some threads: a master thread, that accepts new connections, and optionally some
worker threads, that process accepted connections. The maximum number of worker
threads is configurable via `num_threads` configuration option. That number puts a
limit on number of simultaneous requests that can be handled by CivetWeb.
The number of worker threads to be pre-spawned at startup is specified via the
'prespawn_threads' configuration option; worker threads that are not pre-spawned
will instead be demand-created the first time they are needed.
If you embed CivetWeb into a program that uses SSL outside CivetWeb as well,
you may need to initialize SSL before calling `mg_start()`, and set the pre-
processor define `SSL_ALREADY_INITIALIZED`. This is not required if SSL is

View File

@ -558,8 +558,8 @@ The configuration value is approximate, the real limit might be a few bytes off.
The minimum is 1024 (1 kB).
### num\_threads `50`
Number of worker threads. CivetWeb handles each incoming connection in a
separate thread. Therefore, the value of this option is effectively the number
Maximum number of worker threads allowed. CivetWeb handles each incoming connection
in a separate thread. Therefore, the value of this option is effectively the number
of concurrent HTTP connections CivetWeb can handle.
If there are more simultaneous requests (connection attempts), they are queued.
@ -572,6 +572,15 @@ In case the clients are web browsers, it is recommended to use `num_threads` of
at least 5, since browsers often establish multiple connections to load a single
web page, including all linked documents (CSS, JavaScript, images, ...).
### prespawn\_threads '0'
Number of worker threads that should be pre-spawned by mg_start(). Defaults to
0, meaning no worker threads will be pre-spawned at startup; rather, worker threads
will be spawned when a new connection comes in and there aren't currently any
idle worker threads available to handle it (if we haven't already reached the
maximum worker-thread count as specified by num_threads). If this value is
specified less than zero, or greater than the value of num_threads, it will be
treated as if it was specified to be equal to the value of num_threads.
### listen\_backlog `200`
Maximum number of connections waiting to be accepted by the server operating system.
Internally, this parameter is passed to the "listen" socket/system call.
@ -876,8 +885,8 @@ All port, socket, process and thread specific parameters are per server:
`enable_http2`, `enable_keep_alive`, `enable_websocket_ping_pong`,
`keep_alive_timeout_ms`, `linger_timeout_ms`, `listen_backlog`,
`listening_ports`, `lua_background_script`, `lua_background_script_params`,
`max_request_size`, `num_threads`, `request_timeout_ms`, `run_as_user`,
`tcp_nodelay`, `throttle`, `websocket_timeout_ms` + all options from `main.c`.
`max_request_size`, `num_threads`, 'prespawn_threads', `request_timeout_ms`,
`run_as_user`, `tcp_nodelay`, `throttle`, `websocket_timeout_ms` + all options from `main.c`.
All other options can be set per domain. In particular
`authentication_domain`, `document_root` and (for HTTPS) `ssl_certificate`

View File

@ -26,6 +26,7 @@ listening_ports 8080
# extra_mime_types
# ssl_certificate
# num_threads 50
# prespawn_threads 0
# run_as_user
# url_rewrite_patterns
# hide_files_patterns

View File

@ -1934,6 +1934,7 @@ enum {
/* Once for each server */
LISTENING_PORTS,
NUM_THREADS,
PRESPAWN_THREADS,
RUN_AS_USER,
CONFIG_TCP_NODELAY, /* Prepended CONFIG_ to avoid conflict with the
* socket option typedef TCP_NODELAY. */
@ -2081,6 +2082,7 @@ static const struct mg_option config_options[] = {
/* Once for each server */
{"listening_ports", MG_CONFIG_TYPE_STRING_LIST, "8080"},
{"num_threads", MG_CONFIG_TYPE_NUMBER, "50"},
{"prespawn_threads", MG_CONFIG_TYPE_NUMBER, "0"},
{"run_as_user", MG_CONFIG_TYPE_STRING, NULL},
{"tcp_nodelay", MG_CONFIG_TYPE_NUMBER, "0"},
{"max_request_size", MG_CONFIG_TYPE_NUMBER, "16384"},
@ -2394,7 +2396,14 @@ struct mg_context {
pthread_t masterthreadid; /* The master thread ID */
unsigned int
cfg_worker_threads; /* The number of configured worker threads. */
cfg_max_worker_threads; /* How many worker-threads we are allowed to create, total */
unsigned int
spawned_worker_threads; /* How many worker-threads currently exist (modified by master thread) */
unsigned int
idle_worker_thread_count; /* How many worker-threads are currently sitting around with nothing to do */
/* Access to this value MUST be synchronized by thread_mutex */
pthread_t *worker_threadids; /* The worker thread IDs */
unsigned long starter_thread_idx; /* thread index which called mg_start */
@ -17997,7 +18006,7 @@ mg_close_connection(struct mg_connection *conn)
* timeouts, we will just wait a few seconds in mg_join_thread. */
/* join worker thread */
for (i = 0; i < conn->phys_ctx->cfg_worker_threads; i++) {
for (i = 0; i < conn->phys_ctx->spawned_worker_threads; i++) {
mg_join_thread(conn->phys_ctx->worker_threadids[i]);
}
}
@ -19207,7 +19216,8 @@ mg_connect_websocket_client_impl(const struct mg_client_options *client_options,
/* Now upgrade to ws/wss client context */
conn->phys_ctx->user_data = user_data;
conn->phys_ctx->context_type = CONTEXT_WS_CLIENT;
conn->phys_ctx->cfg_worker_threads = 1; /* one worker thread */
conn->phys_ctx->cfg_max_worker_threads = 1; /* one worker thread */
conn->phys_ctx->spawned_worker_threads = 1; /* one worker thread */
/* Start a thread to read the websocket client connection
* This thread will automatically stop when mg_disconnect is
@ -19216,7 +19226,7 @@ mg_connect_websocket_client_impl(const struct mg_client_options *client_options,
thread_data,
conn->phys_ctx->worker_threadids)
!= 0) {
conn->phys_ctx->cfg_worker_threads = 0;
conn->phys_ctx->spawned_worker_threads = 0;
mg_free(thread_data);
mg_close_connection(conn);
conn = NULL;
@ -19587,6 +19597,7 @@ process_new_connection(struct mg_connection *conn)
#endif
}
static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads); /* forward declaration */
#if defined(ALTERNATIVE_QUEUE)
@ -19595,8 +19606,10 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
{
unsigned int i;
(void)mg_start_worker_thread(ctx, 1); /* will start a worker-thread only if there aren't currently any idle worker-threads */
while (!ctx->stop_flag) {
for (i = 0; i < ctx->cfg_worker_threads; i++) {
for (i = 0; i < ctx->spawned_worker_threads; i++) {
/* find a free worker slot and signal it */
if (ctx->client_socks[i].in_use == 2) {
(void)pthread_mutex_lock(&ctx->thread_mutex);
@ -19621,10 +19634,13 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
static int
consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
{
DEBUG_TRACE("%s", "going idle");
(void)pthread_mutex_lock(&ctx->thread_mutex);
if (counter_was_preincremented == 0) { /* first call only: the master-thread pre-incremented this before he spawned us */
ctx->idle_worker_thread_count++;
}
ctx->client_socks[thread_index].in_use = 2;
(void)pthread_mutex_unlock(&ctx->thread_mutex);
@ -19641,6 +19657,7 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
}
return 0;
}
ctx->idle_worker_thread_count--;
(void)pthread_mutex_unlock(&ctx->thread_mutex);
if (sp->in_use == 1) {
DEBUG_TRACE("grabbed socket %d, going busy", sp->sock);
@ -19655,12 +19672,15 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
/* Worker threads take accepted socket from the queue */
static int
consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index, int counter_was_preincremented)
{
(void)thread_index;
(void)pthread_mutex_lock(&ctx->thread_mutex);
DEBUG_TRACE("%s", "going idle");
(void)pthread_mutex_lock(&ctx->thread_mutex);
if (counter_was_preincremented == 0) { /* first call only: the master-thread pre-incremented this before he spawned us */
ctx->idle_worker_thread_count++;
}
/* If the queue is empty, wait. We're idle at this point. */
while ((ctx->sq_head == ctx->sq_tail)
@ -19684,6 +19704,8 @@ consume_socket(struct mg_context *ctx, struct socket *sp, int thread_index)
}
(void)pthread_cond_signal(&ctx->sq_empty);
ctx->idle_worker_thread_count--;
(void)pthread_mutex_unlock(&ctx->thread_mutex);
return STOP_FLAG_IS_ZERO(&ctx->stop_flag);
@ -19730,6 +19752,8 @@ produce_socket(struct mg_context *ctx, const struct socket *sp)
(void)pthread_cond_signal(&ctx->sq_full);
(void)pthread_mutex_unlock(&ctx->thread_mutex);
(void)mg_start_worker_thread(ctx, 1); /* will start a worker-thread only if there aren't currently any idle worker-threads */
}
#endif /* ALTERNATIVE_QUEUE */
@ -19740,6 +19764,7 @@ worker_thread_run(struct mg_connection *conn)
struct mg_context *ctx = conn->phys_ctx;
int thread_index;
struct mg_workerTLS tls;
int first_call_to_consume_socket = 1;
mg_set_thread_name("worker");
@ -19765,7 +19790,7 @@ worker_thread_run(struct mg_connection *conn)
/* Connection structure has been pre-allocated */
thread_index = (int)(conn - ctx->worker_connections);
if ((thread_index < 0)
|| ((unsigned)thread_index >= (unsigned)ctx->cfg_worker_threads)) {
|| ((unsigned)thread_index >= (unsigned)ctx->cfg_max_worker_threads)) {
mg_cry_ctx_internal(ctx,
"Internal error: Invalid worker index %i",
thread_index);
@ -19806,7 +19831,8 @@ worker_thread_run(struct mg_connection *conn)
/* Call consume_socket() even when ctx->stop_flag > 0, to let it
* signal sq_empty condvar to wake up the master waiting in
* produce_socket() */
while (consume_socket(ctx, &conn->client, thread_index)) {
while (consume_socket(ctx, &conn->client, thread_index, first_call_to_consume_socket)) {
first_call_to_consume_socket = 0;
/* New connections must start with new protocol negotiation */
tls.alpn_proto = NULL;
@ -20188,7 +20214,7 @@ master_thread_run(struct mg_context *ctx)
/* Wakeup workers that are waiting for connections to handle. */
#if defined(ALTERNATIVE_QUEUE)
for (i = 0; i < ctx->cfg_worker_threads; i++) {
for (i = 0; i < ctx->spawned_worker_threads; i++) {
event_signal(ctx->client_wait_events[i]);
}
#else
@ -20198,7 +20224,7 @@ master_thread_run(struct mg_context *ctx)
#endif
/* Join all worker threads to avoid leaking threads. */
workerthreadcount = ctx->cfg_worker_threads;
workerthreadcount = ctx->spawned_worker_threads;
for (i = 0; i < workerthreadcount; i++) {
if (ctx->worker_threadids[i] != 0) {
mg_join_thread(ctx->worker_threadids[i]);
@ -20302,7 +20328,7 @@ free_context(struct mg_context *ctx)
#if defined(ALTERNATIVE_QUEUE)
mg_free(ctx->client_socks);
if (ctx->client_wait_events != NULL) {
for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
for (i = 0; (unsigned)i < ctx->spawned_worker_threads; i++) {
event_destroy(ctx->client_wait_events[i]);
}
mg_free(ctx->client_wait_events);
@ -20494,12 +20520,41 @@ legacy_init(const char **options)
}
static int mg_start_worker_thread(struct mg_context *ctx, int only_if_no_idle_threads) {
const unsigned int i = ctx->spawned_worker_threads;
if (i >= ctx->cfg_max_worker_threads) {
return -1; /* Oops, we hit our worker-thread limit! No more worker threads, ever! */
}
(void)pthread_mutex_lock(&ctx->thread_mutex);
if ((only_if_no_idle_threads)&&(ctx->idle_worker_thread_count > 0)) {
(void)pthread_mutex_unlock(&ctx->thread_mutex);
return -2; /* There are idle threads available, so no need to spawn a new worker thread now */
}
ctx->idle_worker_thread_count++; /* we do this here to avoid a race condition while the thread is starting up */
(void)pthread_mutex_unlock(&ctx->thread_mutex);
ctx->worker_connections[i].phys_ctx = ctx;
int ret = mg_start_thread_with_id(worker_thread,
&ctx->worker_connections[i],
&ctx->worker_threadids[i]);
if (ret == 0) {
ctx->spawned_worker_threads++; /* note that we've filled another slot in the table */
DEBUG_TRACE("Started worker_thread #%i", ctx->spawned_worker_threads);
} else {
(void)pthread_mutex_lock(&ctx->thread_mutex);
ctx->idle_worker_thread_count--; /* whoops, roll-back on error */
(void)pthread_mutex_unlock(&ctx->thread_mutex);
}
return ret;
}
CIVETWEB_API struct mg_context *
mg_start2(struct mg_init_data *init, struct mg_error_data *error)
{
struct mg_context *ctx;
const char *name, *value, *default_value;
int idx, ok, workerthreadcount;
int idx, ok, prespawnthreadcount, workerthreadcount;
unsigned int i;
int itmp;
void (*exit_callback)(const struct mg_context *ctx) = 0;
@ -20742,7 +20797,12 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
#endif
/* Worker thread count option */
workerthreadcount = atoi(ctx->dd.config[NUM_THREADS]);
workerthreadcount = atoi(ctx->dd.config[NUM_THREADS]);
prespawnthreadcount = atoi(ctx->dd.config[PRESPAWN_THREADS]);
if ((prespawnthreadcount < 0)||(prespawnthreadcount > workerthreadcount)) {
prespawnthreadcount = workerthreadcount; /* can't prespawn more than all of them! */
}
if ((workerthreadcount > MAX_WORKER_THREADS) || (workerthreadcount <= 0)) {
if (workerthreadcount <= 0) {
@ -21007,8 +21067,8 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
return NULL;
}
ctx->cfg_worker_threads = ((unsigned int)(workerthreadcount));
ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_worker_threads,
ctx->cfg_max_worker_threads = ((unsigned int)(workerthreadcount));
ctx->worker_threadids = (pthread_t *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(pthread_t),
ctx);
@ -21019,7 +21079,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
error->code_sub =
(unsigned)ctx->cfg_worker_threads * (unsigned)sizeof(pthread_t);
(unsigned)ctx->cfg_max_worker_threads * (unsigned)sizeof(pthread_t);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
error->text,
@ -21033,7 +21093,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
return NULL;
}
ctx->worker_connections =
(struct mg_connection *)mg_calloc_ctx(ctx->cfg_worker_threads,
(struct mg_connection *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(struct mg_connection),
ctx);
if (ctx->worker_connections == NULL) {
@ -21043,7 +21103,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
error->code_sub = (unsigned)ctx->cfg_worker_threads
error->code_sub = (unsigned)ctx->cfg_max_worker_threads
* (unsigned)sizeof(struct mg_connection);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
@ -21060,7 +21120,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
#if defined(ALTERNATIVE_QUEUE)
ctx->client_wait_events =
(void **)mg_calloc_ctx(ctx->cfg_worker_threads,
(void **)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(ctx->client_wait_events[0]),
ctx);
if (ctx->client_wait_events == NULL) {
@ -21070,7 +21130,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
error->code_sub = (unsigned)ctx->cfg_worker_threads
error->code_sub = (unsigned)ctx->cfg_max_worker_threads
* (unsigned)sizeof(ctx->client_wait_events[0]);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
@ -21086,7 +21146,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
}
ctx->client_socks =
(struct socket *)mg_calloc_ctx(ctx->cfg_worker_threads,
(struct socket *)mg_calloc_ctx(ctx->cfg_max_worker_threads,
sizeof(ctx->client_socks[0]),
ctx);
if (ctx->client_socks == NULL) {
@ -21097,7 +21157,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
if (error != NULL) {
error->code = MG_ERROR_DATA_CODE_OUT_OF_MEMORY;
error->code_sub = (unsigned)ctx->cfg_worker_threads
error->code_sub = (unsigned)ctx->cfg_max_worker_threads
* (unsigned)sizeof(ctx->client_socks[0]);
mg_snprintf(NULL,
NULL, /* No truncation check for error buffers */
@ -21112,7 +21172,7 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
return NULL;
}
for (i = 0; (unsigned)i < ctx->cfg_worker_threads; i++) {
for (i = 0; (unsigned)i < ctx->cfg_max_worker_threads; i++) {
ctx->client_wait_events[i] = event_create();
if (ctx->client_wait_events[i] == 0) {
const char *err_msg = "Error creating worker event %i";
@ -21176,23 +21236,18 @@ mg_start2(struct mg_init_data *init, struct mg_error_data *error)
ctx->context_type = CONTEXT_SERVER; /* server context */
/* Start worker threads */
for (i = 0; i < ctx->cfg_worker_threads; i++) {
for (int i = 0; i < prespawnthreadcount; i++) {
/* worker_thread sets up the other fields */
ctx->worker_connections[i].phys_ctx = ctx;
if (mg_start_thread_with_id(worker_thread,
&ctx->worker_connections[i],
&ctx->worker_threadids[i])
!= 0) {
if (mg_start_worker_thread(ctx, 0) != 0) {
long error_no = (long)ERRNO;
/* thread was not created */
if (i > 0) {
if (ctx->spawned_worker_threads > 0) {
/* If the second, third, ... thread cannot be created, set a
* warning, but keep running. */
mg_cry_ctx_internal(ctx,
"Cannot start worker thread %i: error %ld",
i + 1,
ctx->spawned_worker_threads + 1,
error_no);
/* If the server initialization should stop here, all
@ -22122,7 +22177,7 @@ mg_get_connection_info(const struct mg_context *ctx,
return 0;
}
if ((unsigned)idx >= ctx->cfg_worker_threads) {
if ((unsigned)idx >= ctx->cfg_max_worker_threads) {
/* Out of range */
return 0;
}

View File

@ -22,6 +22,7 @@ opts = [
"fallback_document_root",
"ssl_certificate",
"num_threads",
"prespawn_threads",
"run_as_user",
"url_rewrite_patterns",
"hide_files_patterns",

View File

@ -1626,6 +1626,7 @@ START_TEST(test_config_options)
ck_assert_str_eq("ssl_certificate_chain",
config_options[SSL_CERTIFICATE_CHAIN].name);
ck_assert_str_eq("num_threads", config_options[NUM_THREADS].name);
ck_assert_str_eq("prespawn_threads", config_options[PRESPAWN_THREADS].name);
ck_assert_str_eq("run_as_user", config_options[RUN_AS_USER].name);
ck_assert_str_eq("url_rewrite_patterns",
config_options[URL_REWRITE_PATTERN].name);