1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

unix, windows: use the same threadpool implementation

This commit is contained in:
Saúl Ibarra Corretgé 2014-06-17 22:07:19 +02:00
parent fe8322d27c
commit 8d11aacb79
16 changed files with 206 additions and 268 deletions

View File

@ -17,7 +17,7 @@ ACLOCAL_AMFLAGS = -I m4
AM_CPPFLAGS = -I$(top_srcdir)/include \
-I$(top_srcdir)/src
include_HEADERS=include/uv.h include/uv-errno.h include/uv-version.h
include_HEADERS=include/uv.h include/uv-errno.h include/uv-threadpool.h include/uv-version.h
CLEANFILES =
@ -28,6 +28,7 @@ libuv_la_SOURCES = src/fs-poll.c \
src/heap-inl.h \
src/inet.c \
src/queue.h \
src/threadpool.c \
src/uv-common.c \
src/uv-common.h \
src/version.c
@ -67,7 +68,6 @@ libuv_la_SOURCES += src/win/async.c \
src/win/stream-inl.h \
src/win/tcp.c \
src/win/thread.c \
src/win/threadpool.c \
src/win/timer.c \
src/win/tty.c \
src/win/udp.c \
@ -99,7 +99,6 @@ libuv_la_SOURCES += src/unix/async.c \
src/unix/stream.c \
src/unix/tcp.c \
src/unix/thread.c \
src/unix/threadpool.c \
src/unix/timer.c \
src/unix/tty.c \
src/unix/udp.c

View File

@ -26,6 +26,7 @@ CFLAGS += -Wall \
INCLUDES = include/stdint-msvc2008.h \
include/tree.h \
include/uv-errno.h \
include/uv-threadpool.h \
include/uv-version.h \
include/uv-win.h \
include/uv.h \
@ -42,6 +43,7 @@ INCLUDES = include/stdint-msvc2008.h \
OBJS = src/fs-poll.o \
src/inet.o \
src/threadpool.o \
src/uv-common.o \
src/version.o \
src/win/async.o \
@ -63,7 +65,6 @@ OBJS = src/fs-poll.o \
src/win/stream.o \
src/win/tcp.o \
src/win/thread.o \
src/win/threadpool.o \
src/win/timer.o \
src/win/tty.o \
src/win/udp.o \

37
include/uv-threadpool.h Normal file
View File

@ -0,0 +1,37 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
/*
* This file is private to libuv. It provides common functionality to both
* Windows and Unix backends.
*/
#ifndef UV_THREADPOOL_H_
#define UV_THREADPOOL_H_
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2];
};
#endif /* UV_THREADPOOL_H_ */

View File

@ -42,6 +42,8 @@
#endif
#include <signal.h>
#include "uv-threadpool.h"
#if defined(__linux__)
# include "uv-linux.h"
#elif defined(__sun)
@ -96,13 +98,6 @@ struct uv__async {
int wfd;
};
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
void* wq[2];
};
#ifndef UV_PLATFORM_SEM_T
# define UV_PLATFORM_SEM_T sem_t
#endif

View File

@ -45,6 +45,7 @@ typedef intptr_t ssize_t;
#endif
#include "tree.h"
#include "uv-threadpool.h"
#define MAX_PIPENAME_LEN 256
@ -307,7 +308,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
/* Counter to keep track of active udp streams */ \
unsigned int active_udp_streams; \
/* Counter to started timer */ \
uint64_t timer_counter;
uint64_t timer_counter; \
/* Threadpool */ \
void* wq[2]; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async;
#define UV_REQ_TYPE_PRIVATE \
/* TODO: remove the req suffix */ \
@ -520,6 +525,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
unsigned int flags;
#define UV_GETADDRINFO_PRIVATE_FIELDS \
struct uv__work work_req; \
uv_getaddrinfo_cb getaddrinfo_cb; \
void* alloc; \
WCHAR* node; \
@ -529,6 +535,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
int retcode;
#define UV_GETNAMEINFO_PRIVATE_FIELDS \
struct uv__work work_req; \
uv_getnameinfo_cb getnameinfo_cb; \
struct sockaddr_storage storage; \
int flags; \
@ -547,6 +554,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
volatile char exit_cb_pending;
#define UV_FS_PRIVATE_FIELDS \
struct uv__work work_req; \
int flags; \
DWORD sys_errno_; \
union { \
@ -572,6 +580,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
};
#define UV_WORK_PRIVATE_FIELDS \
struct uv__work work_req;
#define UV_FS_EVENT_PRIVATE_FIELDS \
struct uv_fs_event_req_s { \

View File

@ -19,7 +19,24 @@
* IN THE SOFTWARE.
*/
#include "internal.h"
#include "uv-common.h"
#if !defined(_WIN32)
# include "unix/internal.h"
#else
# include "win/req-inl.h"
/* TODO(saghul): unify internal req functions */
static void uv__req_init(uv_loop_t* loop,
uv_req_t* req,
uv_req_type type) {
uv_req_init(loop, req);
req->type = type;
uv__req_register(loop, req);
}
# define uv__req_init(loop, req, type) \
uv__req_init((loop), (uv_req_t*)(req), (type))
#endif
#include <stdlib.h>
#define MAX_THREADPOOL_SIZE 128
@ -91,6 +108,30 @@ static void post(QUEUE* q) {
}
static void cleanup(void) {
unsigned int i;
if (initialized == 0)
return;
post(&exit_message);
for (i = 0; i < nthreads; i++)
if (uv_thread_join(threads + i))
abort();
if (threads != default_threads)
free(threads);
uv_mutex_destroy(&mutex);
uv_cond_destroy(&cond);
threads = NULL;
nthreads = 0;
initialized = 0;
}
static void init_once(void) {
unsigned int i;
const char* val;
@ -126,30 +167,7 @@ static void init_once(void) {
abort();
initialized = 1;
}
UV_DESTRUCTOR(static void cleanup(void)) {
unsigned int i;
if (initialized == 0)
return;
post(&exit_message);
for (i = 0; i < nthreads; i++)
if (uv_thread_join(threads + i))
abort();
if (threads != default_threads)
free(threads);
uv_mutex_destroy(&mutex);
uv_cond_destroy(&cond);
threads = NULL;
nthreads = 0;
initialized = 0;
atexit(cleanup);
}
@ -179,7 +197,7 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
uv_mutex_unlock(&mutex);
if (!cancelled)
return -EBUSY;
return UV_EBUSY;
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
@ -213,7 +231,7 @@ void uv__work_done(uv_async_t* handle) {
QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
err = (w->work == uv__cancelled) ? -ECANCELED : 0;
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
w->done(w, err);
}
}
@ -244,7 +262,7 @@ int uv_queue_work(uv_loop_t* loop,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb) {
if (work_cb == NULL)
return -EINVAL;
return UV_EINVAL;
uv__req_init(loop, req, UV_WORK);
req->loop = loop;
@ -277,7 +295,7 @@ int uv_cancel(uv_req_t* req) {
wreq = &((uv_work_t*) req)->work_req;
break;
default:
return -EINVAL;
return UV_EINVAL;
}
return uv__work_cancel(loop, req, wreq);

View File

@ -210,13 +210,6 @@ void uv__signal_close(uv_signal_t* handle);
void uv__signal_global_once_init(void);
void uv__signal_loop_cleanup(uv_loop_t* loop);
/* thread pool */
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w, int status));
void uv__work_done(uv_async_t* handle);
/* platform specific */
uint64_t uv__hrtime(uv_clocktype_t type);
int uv__kqueue_init(uv_loop_t* loop);

View File

@ -92,6 +92,13 @@ void uv__fs_poll_close(uv_fs_poll_t* handle);
int uv__getaddrinfo_translate_error(int sys_err); /* EAI_* error. */
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w, int status));
void uv__work_done(uv_async_t* handle);
#define uv__has_active_reqs(loop) \
(QUEUE_EMPTY(&(loop)->active_reqs) == 0)

View File

@ -131,6 +131,7 @@ int uv_loop_init(uv_loop_t* loop) {
loop->last_tick_count = 0;
uv_update_time(loop);
QUEUE_INIT(&loop->wq);
QUEUE_INIT(&loop->handle_queue);
QUEUE_INIT(&loop->active_reqs);
loop->active_handles = 0;
@ -157,6 +158,15 @@ int uv_loop_init(uv_loop_t* loop) {
loop->timer_counter = 0;
loop->stop_flag = 0;
if (uv_mutex_init(&loop->wq_mutex))
abort();
if (uv_async_init(loop, &loop->wq_async, uv__work_done))
abort();
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV__HANDLE_INTERNAL;
return 0;
}

View File

@ -44,12 +44,8 @@
#define QUEUE_FS_TP_JOB(loop, req) \
do { \
if (!QueueUserWorkItem(&uv_fs_thread_proc, \
req, \
WT_EXECUTEDEFAULT)) { \
return uv_translate_sys_error(GetLastError()); \
} \
uv__req_register(loop, req); \
uv__work_submit((loop), &(req)->work_req, uv__fs_work, uv__fs_done); \
} while (0)
#define SET_REQ_RESULT(req, result_value) \
@ -232,11 +228,7 @@ INLINE static void uv_fs_req_init(uv_loop_t* loop, uv_fs_t* req,
req->result = 0;
req->ptr = NULL;
req->path = NULL;
if (cb != NULL) {
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
}
req->cb = cb;
}
@ -1510,11 +1502,10 @@ static void fs__fchown(uv_fs_t* req) {
}
static DWORD WINAPI uv_fs_thread_proc(void* parameter) {
uv_fs_t* req = (uv_fs_t*) parameter;
uv_loop_t* loop = req->loop;
static void uv__fs_work(struct uv__work* w) {
uv_fs_t* req;
assert(req != NULL);
req = container_of(w, uv_fs_t, work_req);
assert(req->type == UV_FS);
#define XX(uc, lc) case UV_FS_##uc: fs__##lc(req); break;
@ -1547,9 +1538,41 @@ static DWORD WINAPI uv_fs_thread_proc(void* parameter) {
default:
assert(!"bad uv_fs_type");
}
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
if (status == UV_ECANCELED) {
assert(req->result == 0);
req->result = UV_ECANCELED;
}
if (req->cb != NULL)
req->cb(req);
}
void uv_fs_req_cleanup(uv_fs_t* req) {
if (req->flags & UV_FS_CLEANEDUP)
return;
if (req->flags & UV_FS_FREE_PATHS)
free(req->pathw);
if (req->flags & UV_FS_FREE_PTR)
free(req->ptr);
req->path = NULL;
req->pathw = NULL;
req->new_pathw = NULL;
req->ptr = NULL;
req->flags |= UV_FS_CLEANEDUP;
}
@ -2064,30 +2087,3 @@ int uv_fs_futime(uv_loop_t* loop, uv_fs_t* req, uv_file fd, double atime,
return req->result;
}
}
void uv_process_fs_req(uv_loop_t* loop, uv_fs_t* req) {
assert(req->cb);
uv__req_unregister(loop, req);
req->cb(req);
}
void uv_fs_req_cleanup(uv_fs_t* req) {
if (req->flags & UV_FS_CLEANEDUP)
return;
if (req->flags & UV_FS_FREE_PATHS)
free(req->pathw);
if (req->flags & UV_FS_FREE_PTR)
free(req->ptr);
req->path = NULL;
req->pathw = NULL;
req->new_pathw = NULL;
req->ptr = NULL;
req->flags |= UV_FS_CLEANEDUP;
}

View File

@ -56,25 +56,13 @@
#define ALIGNED_SIZE(X) ((((X) + 3) >> 2) << 2)
/* getaddrinfo worker thread implementation */
static DWORD WINAPI getaddrinfo_thread_proc(void* parameter) {
uv_getaddrinfo_t* req = (uv_getaddrinfo_t*) parameter;
uv_loop_t* loop = req->loop;
int ret;
static void uv__getaddrinfo_work(struct uv__work* w) {
uv_getaddrinfo_t* req;
int err;
assert(req != NULL);
/* call OS function on this thread */
ret = GetAddrInfoW(req->node,
req->service,
req->hints,
&req->res);
req->retcode = ret;
/* post getaddrinfo completed */
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
req = container_of(w, uv_getaddrinfo_t, work_req);
err = GetAddrInfoW(req->node, req->service, req->hints, &req->res);
req->retcode = uv__getaddrinfo_translate_error(err);
}
@ -87,7 +75,8 @@ static DWORD WINAPI getaddrinfo_thread_proc(void* parameter) {
* and copy all structs and referenced strings into the one block.
* Each size calculation is adjusted to avoid unaligned pointers.
*/
void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req) {
static void uv__getaddrinfo_done(struct uv__work* w, int status) {
uv_getaddrinfo_t* req;
int addrinfo_len = 0;
int name_len = 0;
size_t addrinfo_struct_len = ALIGNED_SIZE(sizeof(struct addrinfo));
@ -95,7 +84,8 @@ void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req) {
struct addrinfo* addrinfo_ptr;
char* alloc_ptr = NULL;
char* cur_ptr = NULL;
int err = 0;
req = container_of(w, uv_getaddrinfo_t, work_req);
/* release input parameter memory */
if (req->alloc != NULL) {
@ -103,6 +93,16 @@ void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req) {
req->alloc = NULL;
}
if (status == UV_ECANCELED) {
assert(req->retcode == 0);
req->retcode = UV_EAI_CANCELED;
if (req->res != NULL) {
FreeAddrInfoW(req->res);
req->res = NULL;
}
goto complete;
}
if (req->retcode == 0) {
/* convert addrinfoW to addrinfo */
/* first calculate required length */
@ -113,7 +113,7 @@ void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req) {
if (addrinfow_ptr->ai_canonname != NULL) {
name_len = uv_utf16_to_utf8(addrinfow_ptr->ai_canonname, -1, NULL, 0);
if (name_len == 0) {
err = uv_translate_sys_error(GetLastError());
req->retcode = uv_translate_sys_error(GetLastError());
goto complete;
}
addrinfo_len += ALIGNED_SIZE(name_len);
@ -178,11 +178,8 @@ void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req) {
}
}
} else {
err = UV_EAI_MEMORY;
req->retcode = UV_EAI_MEMORY;
}
} else {
/* GetAddrInfo failed */
err = uv__getaddrinfo_translate_error(req->retcode);
}
/* return memory to system */
@ -192,10 +189,10 @@ void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req) {
}
complete:
uv__req_unregister(loop, req);
uv__req_unregister(req->loop, req);
/* finally do callback with converted result */
req->getaddrinfo_cb(req, err, (struct addrinfo*)alloc_ptr);
req->getaddrinfo_cb(req, req->retcode, (struct addrinfo*)alloc_ptr);
}
@ -246,6 +243,7 @@ int uv_getaddrinfo(uv_loop_t* loop,
req->res = NULL;
req->type = UV_GETADDRINFO;
req->loop = loop;
req->retcode = 0;
/* calculate required memory size for all input values */
if (node != NULL) {
@ -323,13 +321,10 @@ int uv_getaddrinfo(uv_loop_t* loop,
req->hints = NULL;
}
/* Ask thread to run. Treat this as a long operation */
if (QueueUserWorkItem(&getaddrinfo_thread_proc,
req,
WT_EXECUTELONGFUNCTION) == 0) {
err = GetLastError();
goto error;
}
uv__work_submit(loop,
&req->work_req,
uv__getaddrinfo_work,
uv__getaddrinfo_done);
uv__req_register(loop, req);

View File

@ -28,16 +28,13 @@
#include "req-inl.h"
/* getnameinfo worker thread implementation */
static DWORD WINAPI getnameinfo_thread_proc(void* parameter) {
uv_getnameinfo_t* req = (uv_getnameinfo_t*)parameter;
uv_loop_t* loop = req->loop;
static void uv__getnameinfo_work(struct uv__work* w) {
uv_getnameinfo_t* req;
WCHAR host[NI_MAXHOST];
WCHAR service[NI_MAXSERV];
int ret = 0;
assert(req != NULL);
req = container_of(w, uv_getnameinfo_t, work_req);
ret = GetNameInfoW((struct sockaddr*)&req->storage,
sizeof(req->storage),
host,
@ -65,30 +62,29 @@ static DWORD WINAPI getnameinfo_thread_proc(void* parameter) {
sizeof(req->service),
NULL,
NULL);
/* post getnameinfo completed */
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
/*
* Called from uv_run when complete.
*/
void uv_process_getnameinfo_req(uv_loop_t* loop, uv_getnameinfo_t* req) {
static void uv__getnameinfo_done(struct uv__work* w, int status) {
uv_getnameinfo_t* req;
char* host;
char* service;
if (req->retcode == 0) {
req = container_of(w, uv_getnameinfo_t, work_req);
uv__req_unregister(req->loop, req);
host = service = NULL;
if (status == UV_ECANCELED) {
assert(req->retcode == 0);
req->retcode = UV_EAI_CANCELED;
} else if (req->retcode == 0) {
host = req->host;
service = req->service;
} else {
host = NULL;
service = NULL;
}
uv__req_unregister(loop, req);
req->getnameinfo_cb(req, req->retcode, host, service);
}
@ -119,20 +115,18 @@ int uv_getnameinfo(uv_loop_t* loop,
}
uv_req_init(loop, (uv_req_t*)req);
uv__req_register(loop, req);
req->getnameinfo_cb = getnameinfo_cb;
req->flags = flags;
req->type = UV_GETNAMEINFO;
req->loop = loop;
req->retcode = 0;
/* Ask thread to run. Treat this as a long operation. */
if (QueueUserWorkItem(&getnameinfo_thread_proc,
req,
WT_EXECUTELONGFUNCTION) == 0) {
return uv_translate_sys_error(GetLastError());
}
uv__req_register(loop, req);
uv__work_submit(loop,
&req->work_req,
uv__getnameinfo_work,
uv__getnameinfo_done);
return 0;
}

View File

@ -288,29 +288,10 @@ void uv_process_endgame(uv_loop_t* loop, uv_process_t* handle);
int uv_translate_sys_error(int sys_errno);
/*
* Getaddrinfo
*/
void uv_process_getaddrinfo_req(uv_loop_t* loop, uv_getaddrinfo_t* req);
/*
* Getnameinfo
*/
void uv_process_getnameinfo_req(uv_loop_t* loop, uv_getnameinfo_t* req);
/*
* FS
*/
void uv_fs_init();
void uv_process_fs_req(uv_loop_t* loop, uv_fs_t* req);
/*
* Threadpool
*/
void uv_process_work_req(uv_loop_t* loop, uv_work_t* req);
/*

View File

@ -195,26 +195,10 @@ INLINE static void uv_process_reqs(uv_loop_t* loop) {
uv_process_poll_req(loop, (uv_poll_t*) req->data, req);
break;
case UV_GETADDRINFO:
uv_process_getaddrinfo_req(loop, (uv_getaddrinfo_t*) req);
break;
case UV_GETNAMEINFO:
uv_process_getnameinfo_req(loop, (uv_getnameinfo_t*)req);
break;
case UV_PROCESS_EXIT:
uv_process_proc_exit(loop, (uv_process_t*) req->data);
break;
case UV_FS:
uv_process_fs_req(loop, (uv_fs_t*) req);
break;
case UV_WORK:
uv_process_work_req(loop, (uv_work_t*) req);
break;
case UV_FS_EVENT_REQ:
uv_process_fs_event_req(loop, req, (uv_fs_event_t*) req->data);
break;

View File

@ -1,81 +0,0 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include <assert.h>
#include "uv.h"
#include "internal.h"
#include "req-inl.h"
static void uv_work_req_init(uv_loop_t* loop, uv_work_t* req,
uv_work_cb work_cb, uv_after_work_cb after_work_cb) {
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_WORK;
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
}
static DWORD WINAPI uv_work_thread_proc(void* parameter) {
uv_work_t* req = (uv_work_t*)parameter;
uv_loop_t* loop = req->loop;
assert(req != NULL);
assert(req->type == UV_WORK);
assert(req->work_cb);
req->work_cb(req);
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb,
uv_after_work_cb after_work_cb) {
if (work_cb == NULL)
return UV_EINVAL;
uv_work_req_init(loop, req, work_cb, after_work_cb);
if (!QueueUserWorkItem(&uv_work_thread_proc, req, WT_EXECUTELONGFUNCTION)) {
return uv_translate_sys_error(GetLastError());
}
uv__req_register(loop, req);
return 0;
}
int uv_cancel(uv_req_t* req) {
return UV_ENOSYS;
}
void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) {
uv__req_unregister(loop, req);
if(req->after_work_cb)
req->after_work_cb(req, 0);
}

4
uv.gyp
View File

@ -61,11 +61,13 @@
'include/uv.h',
'include/tree.h',
'include/uv-errno.h',
'include/uv-threadpool.h',
'include/uv-version.h',
'src/fs-poll.c',
'src/heap-inl.h',
'src/inet.c',
'src/queue.h',
'src/threadpool.c',
'src/uv-common.c',
'src/uv-common.h',
'src/version.c'
@ -103,7 +105,6 @@
'src/win/stream-inl.h',
'src/win/tcp.c',
'src/win/tty.c',
'src/win/threadpool.c',
'src/win/timer.c',
'src/win/udp.c',
'src/win/util.c',
@ -154,7 +155,6 @@
'src/unix/stream.c',
'src/unix/tcp.c',
'src/unix/thread.c',
'src/unix/threadpool.c',
'src/unix/timer.c',
'src/unix/tty.c',
'src/unix/udp.c',