mirror of
https://github.com/libuv/libuv
synced 2025-03-28 21:13:16 +00:00
unix,win: replace QUEUE with struct uv__queue (#4022)
Recent versions of gcc have started emitting warnings about the liberal type casting inside the QUEUE macros. Although the warnings are false positives, let's use them as the impetus to switch to a type-safer and arguably cleaner approach. Fixes: https://github.com/libuv/libuv/issues/4019
This commit is contained in:
parent
8f32a14afa
commit
1b01b786c0
10
include/uv.h
10
include/uv.h
@ -59,6 +59,12 @@ extern "C" {
|
||||
#include <stdio.h>
|
||||
#include <stdint.h>
|
||||
|
||||
/* Internal type, do not use. */
|
||||
struct uv__queue {
|
||||
struct uv__queue* next;
|
||||
struct uv__queue* prev;
|
||||
};
|
||||
|
||||
#if defined(_WIN32)
|
||||
# include "uv/win.h"
|
||||
#else
|
||||
@ -459,7 +465,7 @@ struct uv_shutdown_s {
|
||||
uv_handle_type type; \
|
||||
/* private */ \
|
||||
uv_close_cb close_cb; \
|
||||
void* handle_queue[2]; \
|
||||
struct uv__queue handle_queue; \
|
||||
union { \
|
||||
int fd; \
|
||||
void* reserved[4]; \
|
||||
@ -1849,7 +1855,7 @@ struct uv_loop_s {
|
||||
void* data;
|
||||
/* Loop reference counting. */
|
||||
unsigned int active_handles;
|
||||
void* handle_queue[2];
|
||||
struct uv__queue handle_queue;
|
||||
union {
|
||||
void* unused;
|
||||
unsigned int count;
|
||||
|
@ -40,7 +40,7 @@
|
||||
void* cf_state; \
|
||||
uv_mutex_t cf_mutex; \
|
||||
uv_sem_t cf_sem; \
|
||||
void* cf_signals[2]; \
|
||||
struct uv__queue cf_signals; \
|
||||
|
||||
#define UV_PLATFORM_FS_EVENT_FIELDS \
|
||||
uv__io_t event_watcher; \
|
||||
@ -48,8 +48,8 @@
|
||||
int realpath_len; \
|
||||
int cf_flags; \
|
||||
uv_async_t* cf_cb; \
|
||||
void* cf_events[2]; \
|
||||
void* cf_member[2]; \
|
||||
struct uv__queue cf_events; \
|
||||
struct uv__queue cf_member; \
|
||||
int cf_error; \
|
||||
uv_mutex_t cf_mutex; \
|
||||
|
||||
|
@ -28,7 +28,7 @@
|
||||
int inotify_fd; \
|
||||
|
||||
#define UV_PLATFORM_FS_EVENT_FIELDS \
|
||||
void* watchers[2]; \
|
||||
struct uv__queue watchers; \
|
||||
int wd; \
|
||||
|
||||
#endif /* UV_LINUX_H */
|
||||
|
@ -31,7 +31,7 @@ struct uv__work {
|
||||
void (*work)(struct uv__work *w);
|
||||
void (*done)(struct uv__work *w, int status);
|
||||
struct uv_loop_s* loop;
|
||||
void* wq[2];
|
||||
struct uv__queue wq;
|
||||
};
|
||||
|
||||
#endif /* UV_THREADPOOL_H_ */
|
||||
|
@ -92,8 +92,8 @@ typedef struct uv__io_s uv__io_t;
|
||||
|
||||
struct uv__io_s {
|
||||
uv__io_cb cb;
|
||||
void* pending_queue[2];
|
||||
void* watcher_queue[2];
|
||||
struct uv__queue pending_queue;
|
||||
struct uv__queue watcher_queue;
|
||||
unsigned int pevents; /* Pending event mask i.e. mask at next tick. */
|
||||
unsigned int events; /* Current event mask. */
|
||||
int fd;
|
||||
@ -220,21 +220,21 @@ typedef struct {
|
||||
#define UV_LOOP_PRIVATE_FIELDS \
|
||||
unsigned long flags; \
|
||||
int backend_fd; \
|
||||
void* pending_queue[2]; \
|
||||
void* watcher_queue[2]; \
|
||||
struct uv__queue pending_queue; \
|
||||
struct uv__queue watcher_queue; \
|
||||
uv__io_t** watchers; \
|
||||
unsigned int nwatchers; \
|
||||
unsigned int nfds; \
|
||||
void* wq[2]; \
|
||||
struct uv__queue wq; \
|
||||
uv_mutex_t wq_mutex; \
|
||||
uv_async_t wq_async; \
|
||||
uv_rwlock_t cloexec_lock; \
|
||||
uv_handle_t* closing_handles; \
|
||||
void* process_handles[2]; \
|
||||
void* prepare_handles[2]; \
|
||||
void* check_handles[2]; \
|
||||
void* idle_handles[2]; \
|
||||
void* async_handles[2]; \
|
||||
struct uv__queue process_handles; \
|
||||
struct uv__queue prepare_handles; \
|
||||
struct uv__queue check_handles; \
|
||||
struct uv__queue idle_handles; \
|
||||
struct uv__queue async_handles; \
|
||||
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
|
||||
uv__io_t async_io_watcher; \
|
||||
int async_wfd; \
|
||||
@ -257,7 +257,7 @@ typedef struct {
|
||||
#define UV_PRIVATE_REQ_TYPES /* empty */
|
||||
|
||||
#define UV_WRITE_PRIVATE_FIELDS \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
unsigned int write_index; \
|
||||
uv_buf_t* bufs; \
|
||||
unsigned int nbufs; \
|
||||
@ -265,12 +265,12 @@ typedef struct {
|
||||
uv_buf_t bufsml[4]; \
|
||||
|
||||
#define UV_CONNECT_PRIVATE_FIELDS \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
|
||||
#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */
|
||||
|
||||
#define UV_UDP_SEND_PRIVATE_FIELDS \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
struct sockaddr_storage addr; \
|
||||
unsigned int nbufs; \
|
||||
uv_buf_t* bufs; \
|
||||
@ -286,8 +286,8 @@ typedef struct {
|
||||
uv_connect_t *connect_req; \
|
||||
uv_shutdown_t *shutdown_req; \
|
||||
uv__io_t io_watcher; \
|
||||
void* write_queue[2]; \
|
||||
void* write_completed_queue[2]; \
|
||||
struct uv__queue write_queue; \
|
||||
struct uv__queue write_completed_queue; \
|
||||
uv_connection_cb connection_cb; \
|
||||
int delayed_error; \
|
||||
int accepted_fd; \
|
||||
@ -300,8 +300,8 @@ typedef struct {
|
||||
uv_alloc_cb alloc_cb; \
|
||||
uv_udp_recv_cb recv_cb; \
|
||||
uv__io_t io_watcher; \
|
||||
void* write_queue[2]; \
|
||||
void* write_completed_queue[2]; \
|
||||
struct uv__queue write_queue; \
|
||||
struct uv__queue write_completed_queue; \
|
||||
|
||||
#define UV_PIPE_PRIVATE_FIELDS \
|
||||
const char* pipe_fname; /* strdup'ed */
|
||||
@ -311,19 +311,19 @@ typedef struct {
|
||||
|
||||
#define UV_PREPARE_PRIVATE_FIELDS \
|
||||
uv_prepare_cb prepare_cb; \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
|
||||
#define UV_CHECK_PRIVATE_FIELDS \
|
||||
uv_check_cb check_cb; \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
|
||||
#define UV_IDLE_PRIVATE_FIELDS \
|
||||
uv_idle_cb idle_cb; \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
|
||||
#define UV_ASYNC_PRIVATE_FIELDS \
|
||||
uv_async_cb async_cb; \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
int pending; \
|
||||
|
||||
#define UV_TIMER_PRIVATE_FIELDS \
|
||||
@ -352,7 +352,7 @@ typedef struct {
|
||||
int retcode;
|
||||
|
||||
#define UV_PROCESS_PRIVATE_FIELDS \
|
||||
void* queue[2]; \
|
||||
struct uv__queue queue; \
|
||||
int status; \
|
||||
|
||||
#define UV_FS_PRIVATE_FIELDS \
|
||||
|
@ -357,7 +357,7 @@ typedef struct {
|
||||
/* Counter to started timer */ \
|
||||
uint64_t timer_counter; \
|
||||
/* Threadpool */ \
|
||||
void* wq[2]; \
|
||||
struct uv__queue wq; \
|
||||
uv_mutex_t wq_mutex; \
|
||||
uv_async_t wq_async;
|
||||
|
||||
@ -486,7 +486,7 @@ typedef struct {
|
||||
uint32_t payload_remaining; \
|
||||
uint64_t dummy; /* TODO: retained for ABI compat; remove this in v2.x. */ \
|
||||
} ipc_data_frame; \
|
||||
void* ipc_xfer_queue[2]; \
|
||||
struct uv__queue ipc_xfer_queue; \
|
||||
int ipc_xfer_queue_length; \
|
||||
uv_write_t* non_overlapped_writes_tail; \
|
||||
CRITICAL_SECTION readfile_thread_lock; \
|
||||
|
132
src/queue.h
132
src/queue.h
@ -18,91 +18,73 @@
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
typedef void *QUEUE[2];
|
||||
#define uv__queue_data(pointer, type, field) \
|
||||
((type*) ((char*) (pointer) - offsetof(type, field)))
|
||||
|
||||
/* Private macros. */
|
||||
#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0]))
|
||||
#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1]))
|
||||
#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
|
||||
#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
|
||||
#define uv__queue_foreach(q, h) \
|
||||
for ((q) = (h)->next; (q) != (h); (q) = (q)->next)
|
||||
|
||||
/* Public macros. */
|
||||
#define QUEUE_DATA(ptr, type, field) \
|
||||
((type *) ((char *) (ptr) - offsetof(type, field)))
|
||||
static inline void uv__queue_init(struct uv__queue* q) {
|
||||
q->next = q;
|
||||
q->prev = q;
|
||||
}
|
||||
|
||||
/* Important note: mutating the list while QUEUE_FOREACH is
|
||||
* iterating over its elements results in undefined behavior.
|
||||
*/
|
||||
#define QUEUE_FOREACH(q, h) \
|
||||
for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q))
|
||||
static inline int uv__queue_empty(const struct uv__queue* q) {
|
||||
return q == q->next;
|
||||
}
|
||||
|
||||
#define QUEUE_EMPTY(q) \
|
||||
((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q))
|
||||
static inline struct uv__queue* uv__queue_head(const struct uv__queue* q) {
|
||||
return q->next;
|
||||
}
|
||||
|
||||
#define QUEUE_HEAD(q) \
|
||||
(QUEUE_NEXT(q))
|
||||
static inline struct uv__queue* uv__queue_next(const struct uv__queue* q) {
|
||||
return q->next;
|
||||
}
|
||||
|
||||
#define QUEUE_INIT(q) \
|
||||
do { \
|
||||
QUEUE_NEXT(q) = (q); \
|
||||
QUEUE_PREV(q) = (q); \
|
||||
} \
|
||||
while (0)
|
||||
static inline void uv__queue_add(struct uv__queue* h, struct uv__queue* n) {
|
||||
h->prev->next = n->next;
|
||||
n->next->prev = h->prev;
|
||||
h->prev = n->prev;
|
||||
h->prev->next = h;
|
||||
}
|
||||
|
||||
#define QUEUE_ADD(h, n) \
|
||||
do { \
|
||||
QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \
|
||||
QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \
|
||||
QUEUE_PREV(h) = QUEUE_PREV(n); \
|
||||
QUEUE_PREV_NEXT(h) = (h); \
|
||||
} \
|
||||
while (0)
|
||||
static inline void uv__queue_split(struct uv__queue* h,
|
||||
struct uv__queue* q,
|
||||
struct uv__queue* n) {
|
||||
n->prev = h->prev;
|
||||
n->prev->next = n;
|
||||
n->next = q;
|
||||
h->prev = q->prev;
|
||||
h->prev->next = h;
|
||||
q->prev = n;
|
||||
}
|
||||
|
||||
#define QUEUE_SPLIT(h, q, n) \
|
||||
do { \
|
||||
QUEUE_PREV(n) = QUEUE_PREV(h); \
|
||||
QUEUE_PREV_NEXT(n) = (n); \
|
||||
QUEUE_NEXT(n) = (q); \
|
||||
QUEUE_PREV(h) = QUEUE_PREV(q); \
|
||||
QUEUE_PREV_NEXT(h) = (h); \
|
||||
QUEUE_PREV(q) = (n); \
|
||||
} \
|
||||
while (0)
|
||||
static inline void uv__queue_move(struct uv__queue* h, struct uv__queue* n) {
|
||||
if (uv__queue_empty(h))
|
||||
uv__queue_init(n);
|
||||
else
|
||||
uv__queue_split(h, h->next, n);
|
||||
}
|
||||
|
||||
#define QUEUE_MOVE(h, n) \
|
||||
do { \
|
||||
if (QUEUE_EMPTY(h)) \
|
||||
QUEUE_INIT(n); \
|
||||
else { \
|
||||
QUEUE* q = QUEUE_HEAD(h); \
|
||||
QUEUE_SPLIT(h, q, n); \
|
||||
} \
|
||||
} \
|
||||
while (0)
|
||||
static inline void uv__queue_insert_head(struct uv__queue* h,
|
||||
struct uv__queue* q) {
|
||||
q->next = h->next;
|
||||
q->prev = h;
|
||||
q->next->prev = q;
|
||||
h->next = q;
|
||||
}
|
||||
|
||||
#define QUEUE_INSERT_HEAD(h, q) \
|
||||
do { \
|
||||
QUEUE_NEXT(q) = QUEUE_NEXT(h); \
|
||||
QUEUE_PREV(q) = (h); \
|
||||
QUEUE_NEXT_PREV(q) = (q); \
|
||||
QUEUE_NEXT(h) = (q); \
|
||||
} \
|
||||
while (0)
|
||||
static inline void uv__queue_insert_tail(struct uv__queue* h,
|
||||
struct uv__queue* q) {
|
||||
q->next = h;
|
||||
q->prev = h->prev;
|
||||
q->prev->next = q;
|
||||
h->prev = q;
|
||||
}
|
||||
|
||||
#define QUEUE_INSERT_TAIL(h, q) \
|
||||
do { \
|
||||
QUEUE_NEXT(q) = (h); \
|
||||
QUEUE_PREV(q) = QUEUE_PREV(h); \
|
||||
QUEUE_PREV_NEXT(q) = (q); \
|
||||
QUEUE_PREV(h) = (q); \
|
||||
} \
|
||||
while (0)
|
||||
|
||||
#define QUEUE_REMOVE(q) \
|
||||
do { \
|
||||
QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \
|
||||
QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \
|
||||
} \
|
||||
while (0)
|
||||
static inline void uv__queue_remove(struct uv__queue* q) {
|
||||
q->prev->next = q->next;
|
||||
q->next->prev = q->prev;
|
||||
}
|
||||
|
||||
#endif /* QUEUE_H_ */
|
||||
|
@ -37,10 +37,10 @@ static unsigned int slow_io_work_running;
|
||||
static unsigned int nthreads;
|
||||
static uv_thread_t* threads;
|
||||
static uv_thread_t default_threads[4];
|
||||
static QUEUE exit_message;
|
||||
static QUEUE wq;
|
||||
static QUEUE run_slow_work_message;
|
||||
static QUEUE slow_io_pending_wq;
|
||||
static struct uv__queue exit_message;
|
||||
static struct uv__queue wq;
|
||||
static struct uv__queue run_slow_work_message;
|
||||
static struct uv__queue slow_io_pending_wq;
|
||||
|
||||
static unsigned int slow_work_thread_threshold(void) {
|
||||
return (nthreads + 1) / 2;
|
||||
@ -56,7 +56,7 @@ static void uv__cancelled(struct uv__work* w) {
|
||||
*/
|
||||
static void worker(void* arg) {
|
||||
struct uv__work* w;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
int is_slow_work;
|
||||
|
||||
uv_sem_post((uv_sem_t*) arg);
|
||||
@ -68,49 +68,49 @@ static void worker(void* arg) {
|
||||
|
||||
/* Keep waiting while either no work is present or only slow I/O
|
||||
and we're at the threshold for that. */
|
||||
while (QUEUE_EMPTY(&wq) ||
|
||||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
|
||||
QUEUE_NEXT(&run_slow_work_message) == &wq &&
|
||||
while (uv__queue_empty(&wq) ||
|
||||
(uv__queue_head(&wq) == &run_slow_work_message &&
|
||||
uv__queue_next(&run_slow_work_message) == &wq &&
|
||||
slow_io_work_running >= slow_work_thread_threshold())) {
|
||||
idle_threads += 1;
|
||||
uv_cond_wait(&cond, &mutex);
|
||||
idle_threads -= 1;
|
||||
}
|
||||
|
||||
q = QUEUE_HEAD(&wq);
|
||||
q = uv__queue_head(&wq);
|
||||
if (q == &exit_message) {
|
||||
uv_cond_signal(&cond);
|
||||
uv_mutex_unlock(&mutex);
|
||||
break;
|
||||
}
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
|
||||
|
||||
is_slow_work = 0;
|
||||
if (q == &run_slow_work_message) {
|
||||
/* If we're at the slow I/O threshold, re-schedule until after all
|
||||
other work in the queue is done. */
|
||||
if (slow_io_work_running >= slow_work_thread_threshold()) {
|
||||
QUEUE_INSERT_TAIL(&wq, q);
|
||||
uv__queue_insert_tail(&wq, q);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* If we encountered a request to run slow I/O work but there is none
|
||||
to run, that means it's cancelled => Start over. */
|
||||
if (QUEUE_EMPTY(&slow_io_pending_wq))
|
||||
if (uv__queue_empty(&slow_io_pending_wq))
|
||||
continue;
|
||||
|
||||
is_slow_work = 1;
|
||||
slow_io_work_running++;
|
||||
|
||||
q = QUEUE_HEAD(&slow_io_pending_wq);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
q = uv__queue_head(&slow_io_pending_wq);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
|
||||
/* If there is more slow I/O work, schedule it to be run as well. */
|
||||
if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
|
||||
QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
|
||||
if (!uv__queue_empty(&slow_io_pending_wq)) {
|
||||
uv__queue_insert_tail(&wq, &run_slow_work_message);
|
||||
if (idle_threads > 0)
|
||||
uv_cond_signal(&cond);
|
||||
}
|
||||
@ -118,13 +118,13 @@ static void worker(void* arg) {
|
||||
|
||||
uv_mutex_unlock(&mutex);
|
||||
|
||||
w = QUEUE_DATA(q, struct uv__work, wq);
|
||||
w = uv__queue_data(q, struct uv__work, wq);
|
||||
w->work(w);
|
||||
|
||||
uv_mutex_lock(&w->loop->wq_mutex);
|
||||
w->work = NULL; /* Signal uv_cancel() that the work req is done
|
||||
executing. */
|
||||
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
|
||||
uv__queue_insert_tail(&w->loop->wq, &w->wq);
|
||||
uv_async_send(&w->loop->wq_async);
|
||||
uv_mutex_unlock(&w->loop->wq_mutex);
|
||||
|
||||
@ -139,12 +139,12 @@ static void worker(void* arg) {
|
||||
}
|
||||
|
||||
|
||||
static void post(QUEUE* q, enum uv__work_kind kind) {
|
||||
static void post(struct uv__queue* q, enum uv__work_kind kind) {
|
||||
uv_mutex_lock(&mutex);
|
||||
if (kind == UV__WORK_SLOW_IO) {
|
||||
/* Insert into a separate queue. */
|
||||
QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
|
||||
if (!QUEUE_EMPTY(&run_slow_work_message)) {
|
||||
uv__queue_insert_tail(&slow_io_pending_wq, q);
|
||||
if (!uv__queue_empty(&run_slow_work_message)) {
|
||||
/* Running slow I/O tasks is already scheduled => Nothing to do here.
|
||||
The worker that runs said other task will schedule this one as well. */
|
||||
uv_mutex_unlock(&mutex);
|
||||
@ -153,7 +153,7 @@ static void post(QUEUE* q, enum uv__work_kind kind) {
|
||||
q = &run_slow_work_message;
|
||||
}
|
||||
|
||||
QUEUE_INSERT_TAIL(&wq, q);
|
||||
uv__queue_insert_tail(&wq, q);
|
||||
if (idle_threads > 0)
|
||||
uv_cond_signal(&cond);
|
||||
uv_mutex_unlock(&mutex);
|
||||
@ -220,9 +220,9 @@ static void init_threads(void) {
|
||||
if (uv_mutex_init(&mutex))
|
||||
abort();
|
||||
|
||||
QUEUE_INIT(&wq);
|
||||
QUEUE_INIT(&slow_io_pending_wq);
|
||||
QUEUE_INIT(&run_slow_work_message);
|
||||
uv__queue_init(&wq);
|
||||
uv__queue_init(&slow_io_pending_wq);
|
||||
uv__queue_init(&run_slow_work_message);
|
||||
|
||||
if (uv_sem_init(&sem, 0))
|
||||
abort();
|
||||
@ -285,9 +285,9 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
|
||||
uv_mutex_lock(&mutex);
|
||||
uv_mutex_lock(&w->loop->wq_mutex);
|
||||
|
||||
cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
|
||||
cancelled = !uv__queue_empty(&w->wq) && w->work != NULL;
|
||||
if (cancelled)
|
||||
QUEUE_REMOVE(&w->wq);
|
||||
uv__queue_remove(&w->wq);
|
||||
|
||||
uv_mutex_unlock(&w->loop->wq_mutex);
|
||||
uv_mutex_unlock(&mutex);
|
||||
@ -297,7 +297,7 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
|
||||
|
||||
w->work = uv__cancelled;
|
||||
uv_mutex_lock(&loop->wq_mutex);
|
||||
QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
|
||||
uv__queue_insert_tail(&loop->wq, &w->wq);
|
||||
uv_async_send(&loop->wq_async);
|
||||
uv_mutex_unlock(&loop->wq_mutex);
|
||||
|
||||
@ -308,21 +308,21 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
|
||||
void uv__work_done(uv_async_t* handle) {
|
||||
struct uv__work* w;
|
||||
uv_loop_t* loop;
|
||||
QUEUE* q;
|
||||
QUEUE wq;
|
||||
struct uv__queue* q;
|
||||
struct uv__queue wq;
|
||||
int err;
|
||||
int nevents;
|
||||
|
||||
loop = container_of(handle, uv_loop_t, wq_async);
|
||||
uv_mutex_lock(&loop->wq_mutex);
|
||||
QUEUE_MOVE(&loop->wq, &wq);
|
||||
uv__queue_move(&loop->wq, &wq);
|
||||
uv_mutex_unlock(&loop->wq_mutex);
|
||||
|
||||
nevents = 0;
|
||||
|
||||
while (!QUEUE_EMPTY(&wq)) {
|
||||
q = QUEUE_HEAD(&wq);
|
||||
QUEUE_REMOVE(q);
|
||||
while (!uv__queue_empty(&wq)) {
|
||||
q = uv__queue_head(&wq);
|
||||
uv__queue_remove(q);
|
||||
|
||||
w = container_of(q, struct uv__work, wq);
|
||||
err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;
|
||||
|
@ -136,7 +136,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
struct pollfd pqry;
|
||||
struct pollfd* pe;
|
||||
struct poll_ctl pc;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__io_t* w;
|
||||
uint64_t base;
|
||||
uint64_t diff;
|
||||
@ -151,18 +151,18 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
int reset_timeout;
|
||||
|
||||
if (loop->nfds == 0) {
|
||||
assert(QUEUE_EMPTY(&loop->watcher_queue));
|
||||
assert(uv__queue_empty(&loop->watcher_queue));
|
||||
return;
|
||||
}
|
||||
|
||||
lfields = uv__get_internal_fields(loop);
|
||||
|
||||
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
|
||||
q = QUEUE_HEAD(&loop->watcher_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
while (!uv__queue_empty(&loop->watcher_queue)) {
|
||||
q = uv__queue_head(&loop->watcher_queue);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
|
||||
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
|
||||
w = uv__queue_data(q, uv__io_t, watcher_queue);
|
||||
assert(w->pevents != 0);
|
||||
assert(w->fd >= 0);
|
||||
assert(w->fd < (int) loop->nwatchers);
|
||||
|
@ -55,7 +55,7 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
|
||||
handle->pending = 0;
|
||||
handle->u.fd = 0; /* This will be used as a busy flag. */
|
||||
|
||||
QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
|
||||
uv__queue_insert_tail(&loop->async_handles, &handle->queue);
|
||||
uv__handle_start(handle);
|
||||
|
||||
return 0;
|
||||
@ -124,7 +124,7 @@ static void uv__async_spin(uv_async_t* handle) {
|
||||
|
||||
void uv__async_close(uv_async_t* handle) {
|
||||
uv__async_spin(handle);
|
||||
QUEUE_REMOVE(&handle->queue);
|
||||
uv__queue_remove(&handle->queue);
|
||||
uv__handle_stop(handle);
|
||||
}
|
||||
|
||||
@ -132,8 +132,8 @@ void uv__async_close(uv_async_t* handle) {
|
||||
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
char buf[1024];
|
||||
ssize_t r;
|
||||
QUEUE queue;
|
||||
QUEUE* q;
|
||||
struct uv__queue queue;
|
||||
struct uv__queue* q;
|
||||
uv_async_t* h;
|
||||
_Atomic int *pending;
|
||||
|
||||
@ -157,13 +157,13 @@ static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
abort();
|
||||
}
|
||||
|
||||
QUEUE_MOVE(&loop->async_handles, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
h = QUEUE_DATA(q, uv_async_t, queue);
|
||||
uv__queue_move(&loop->async_handles, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
h = uv__queue_data(q, uv_async_t, queue);
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INSERT_TAIL(&loop->async_handles, q);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_insert_tail(&loop->async_handles, q);
|
||||
|
||||
/* Atomically fetch and clear pending flag */
|
||||
pending = (_Atomic int*) &h->pending;
|
||||
@ -241,8 +241,8 @@ static int uv__async_start(uv_loop_t* loop) {
|
||||
|
||||
|
||||
void uv__async_stop(uv_loop_t* loop) {
|
||||
QUEUE queue;
|
||||
QUEUE* q;
|
||||
struct uv__queue queue;
|
||||
struct uv__queue* q;
|
||||
uv_async_t* h;
|
||||
|
||||
if (loop->async_io_watcher.fd == -1)
|
||||
@ -251,13 +251,13 @@ void uv__async_stop(uv_loop_t* loop) {
|
||||
/* Make sure no other thread is accessing the async handle fd after the loop
|
||||
* cleanup.
|
||||
*/
|
||||
QUEUE_MOVE(&loop->async_handles, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
h = QUEUE_DATA(q, uv_async_t, queue);
|
||||
uv__queue_move(&loop->async_handles, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
h = uv__queue_data(q, uv_async_t, queue);
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INSERT_TAIL(&loop->async_handles, q);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_insert_tail(&loop->async_handles, q);
|
||||
|
||||
uv__async_spin(h);
|
||||
}
|
||||
@ -275,20 +275,20 @@ void uv__async_stop(uv_loop_t* loop) {
|
||||
|
||||
|
||||
int uv__async_fork(uv_loop_t* loop) {
|
||||
QUEUE queue;
|
||||
QUEUE* q;
|
||||
struct uv__queue queue;
|
||||
struct uv__queue* q;
|
||||
uv_async_t* h;
|
||||
|
||||
if (loop->async_io_watcher.fd == -1) /* never started */
|
||||
return 0;
|
||||
|
||||
QUEUE_MOVE(&loop->async_handles, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
h = QUEUE_DATA(q, uv_async_t, queue);
|
||||
uv__queue_move(&loop->async_handles, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
h = uv__queue_data(q, uv_async_t, queue);
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INSERT_TAIL(&loop->async_handles, q);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_insert_tail(&loop->async_handles, q);
|
||||
|
||||
/* The state of any thread that set pending is now likely corrupt in this
|
||||
* child because the user called fork, so just clear these flags and move
|
||||
|
@ -344,7 +344,7 @@ static void uv__finish_close(uv_handle_t* handle) {
|
||||
}
|
||||
|
||||
uv__handle_unref(handle);
|
||||
QUEUE_REMOVE(&handle->handle_queue);
|
||||
uv__queue_remove(&handle->handle_queue);
|
||||
|
||||
if (handle->close_cb) {
|
||||
handle->close_cb(handle);
|
||||
@ -380,7 +380,7 @@ int uv_backend_fd(const uv_loop_t* loop) {
|
||||
static int uv__loop_alive(const uv_loop_t* loop) {
|
||||
return uv__has_active_handles(loop) ||
|
||||
uv__has_active_reqs(loop) ||
|
||||
!QUEUE_EMPTY(&loop->pending_queue) ||
|
||||
!uv__queue_empty(&loop->pending_queue) ||
|
||||
loop->closing_handles != NULL;
|
||||
}
|
||||
|
||||
@ -389,8 +389,8 @@ static int uv__backend_timeout(const uv_loop_t* loop) {
|
||||
if (loop->stop_flag == 0 &&
|
||||
/* uv__loop_alive(loop) && */
|
||||
(uv__has_active_handles(loop) || uv__has_active_reqs(loop)) &&
|
||||
QUEUE_EMPTY(&loop->pending_queue) &&
|
||||
QUEUE_EMPTY(&loop->idle_handles) &&
|
||||
uv__queue_empty(&loop->pending_queue) &&
|
||||
uv__queue_empty(&loop->idle_handles) &&
|
||||
(loop->flags & UV_LOOP_REAP_CHILDREN) == 0 &&
|
||||
loop->closing_handles == NULL)
|
||||
return uv__next_timeout(loop);
|
||||
@ -399,7 +399,7 @@ static int uv__backend_timeout(const uv_loop_t* loop) {
|
||||
|
||||
|
||||
int uv_backend_timeout(const uv_loop_t* loop) {
|
||||
if (QUEUE_EMPTY(&loop->watcher_queue))
|
||||
if (uv__queue_empty(&loop->watcher_queue))
|
||||
return uv__backend_timeout(loop);
|
||||
/* Need to call uv_run to update the backend fd state. */
|
||||
return 0;
|
||||
@ -432,7 +432,8 @@ int uv_run(uv_loop_t* loop, uv_run_mode mode) {
|
||||
|
||||
while (r != 0 && loop->stop_flag == 0) {
|
||||
can_sleep =
|
||||
QUEUE_EMPTY(&loop->pending_queue) && QUEUE_EMPTY(&loop->idle_handles);
|
||||
uv__queue_empty(&loop->pending_queue) &&
|
||||
uv__queue_empty(&loop->idle_handles);
|
||||
|
||||
uv__run_pending(loop);
|
||||
uv__run_idle(loop);
|
||||
@ -448,7 +449,7 @@ int uv_run(uv_loop_t* loop, uv_run_mode mode) {
|
||||
|
||||
/* Process immediate callbacks (e.g. write_cb) a small fixed number of
|
||||
* times to avoid loop starvation.*/
|
||||
for (r = 0; r < 8 && !QUEUE_EMPTY(&loop->pending_queue); r++)
|
||||
for (r = 0; r < 8 && !uv__queue_empty(&loop->pending_queue); r++)
|
||||
uv__run_pending(loop);
|
||||
|
||||
/* Run one final update on the provider_idle_time in case uv__io_poll
|
||||
@ -827,17 +828,17 @@ int uv_fileno(const uv_handle_t* handle, uv_os_fd_t* fd) {
|
||||
|
||||
|
||||
static void uv__run_pending(uv_loop_t* loop) {
|
||||
QUEUE* q;
|
||||
QUEUE pq;
|
||||
struct uv__queue* q;
|
||||
struct uv__queue pq;
|
||||
uv__io_t* w;
|
||||
|
||||
QUEUE_MOVE(&loop->pending_queue, &pq);
|
||||
uv__queue_move(&loop->pending_queue, &pq);
|
||||
|
||||
while (!QUEUE_EMPTY(&pq)) {
|
||||
q = QUEUE_HEAD(&pq);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
w = QUEUE_DATA(q, uv__io_t, pending_queue);
|
||||
while (!uv__queue_empty(&pq)) {
|
||||
q = uv__queue_head(&pq);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
w = uv__queue_data(q, uv__io_t, pending_queue);
|
||||
w->cb(loop, w, POLLOUT);
|
||||
}
|
||||
}
|
||||
@ -892,8 +893,8 @@ static void maybe_resize(uv_loop_t* loop, unsigned int len) {
|
||||
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
|
||||
assert(cb != NULL);
|
||||
assert(fd >= -1);
|
||||
QUEUE_INIT(&w->pending_queue);
|
||||
QUEUE_INIT(&w->watcher_queue);
|
||||
uv__queue_init(&w->pending_queue);
|
||||
uv__queue_init(&w->watcher_queue);
|
||||
w->cb = cb;
|
||||
w->fd = fd;
|
||||
w->events = 0;
|
||||
@ -919,8 +920,8 @@ void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
return;
|
||||
#endif
|
||||
|
||||
if (QUEUE_EMPTY(&w->watcher_queue))
|
||||
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
|
||||
if (uv__queue_empty(&w->watcher_queue))
|
||||
uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
|
||||
|
||||
if (loop->watchers[w->fd] == NULL) {
|
||||
loop->watchers[w->fd] = w;
|
||||
@ -945,8 +946,8 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
w->pevents &= ~events;
|
||||
|
||||
if (w->pevents == 0) {
|
||||
QUEUE_REMOVE(&w->watcher_queue);
|
||||
QUEUE_INIT(&w->watcher_queue);
|
||||
uv__queue_remove(&w->watcher_queue);
|
||||
uv__queue_init(&w->watcher_queue);
|
||||
w->events = 0;
|
||||
|
||||
if (w == loop->watchers[w->fd]) {
|
||||
@ -955,14 +956,14 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
loop->nfds--;
|
||||
}
|
||||
}
|
||||
else if (QUEUE_EMPTY(&w->watcher_queue))
|
||||
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
|
||||
else if (uv__queue_empty(&w->watcher_queue))
|
||||
uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
|
||||
}
|
||||
|
||||
|
||||
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
|
||||
uv__io_stop(loop, w, POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
|
||||
QUEUE_REMOVE(&w->pending_queue);
|
||||
uv__queue_remove(&w->pending_queue);
|
||||
|
||||
/* Remove stale events for this file descriptor */
|
||||
if (w->fd != -1)
|
||||
@ -971,8 +972,8 @@ void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
|
||||
|
||||
|
||||
void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
|
||||
if (QUEUE_EMPTY(&w->pending_queue))
|
||||
QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
|
||||
if (uv__queue_empty(&w->pending_queue))
|
||||
uv__queue_insert_tail(&loop->pending_queue, &w->pending_queue);
|
||||
}
|
||||
|
||||
|
||||
|
@ -80,13 +80,13 @@ enum uv__cf_loop_signal_type_e {
|
||||
typedef enum uv__cf_loop_signal_type_e uv__cf_loop_signal_type_t;
|
||||
|
||||
struct uv__cf_loop_signal_s {
|
||||
QUEUE member;
|
||||
struct uv__queue member;
|
||||
uv_fs_event_t* handle;
|
||||
uv__cf_loop_signal_type_t type;
|
||||
};
|
||||
|
||||
struct uv__fsevents_event_s {
|
||||
QUEUE member;
|
||||
struct uv__queue member;
|
||||
int events;
|
||||
char path[1];
|
||||
};
|
||||
@ -98,7 +98,7 @@ struct uv__cf_loop_state_s {
|
||||
FSEventStreamRef fsevent_stream;
|
||||
uv_sem_t fsevent_sem;
|
||||
uv_mutex_t fsevent_mutex;
|
||||
void* fsevent_handles[2];
|
||||
struct uv__queue fsevent_handles;
|
||||
unsigned int fsevent_handle_count;
|
||||
};
|
||||
|
||||
@ -150,22 +150,22 @@ static void (*pFSEventStreamStop)(FSEventStreamRef);
|
||||
|
||||
#define UV__FSEVENTS_PROCESS(handle, block) \
|
||||
do { \
|
||||
QUEUE events; \
|
||||
QUEUE* q; \
|
||||
struct uv__queue events; \
|
||||
struct uv__queue* q; \
|
||||
uv__fsevents_event_t* event; \
|
||||
int err; \
|
||||
uv_mutex_lock(&(handle)->cf_mutex); \
|
||||
/* Split-off all events and empty original queue */ \
|
||||
QUEUE_MOVE(&(handle)->cf_events, &events); \
|
||||
uv__queue_move(&(handle)->cf_events, &events); \
|
||||
/* Get error (if any) and zero original one */ \
|
||||
err = (handle)->cf_error; \
|
||||
(handle)->cf_error = 0; \
|
||||
uv_mutex_unlock(&(handle)->cf_mutex); \
|
||||
/* Loop through events, deallocating each after processing */ \
|
||||
while (!QUEUE_EMPTY(&events)) { \
|
||||
q = QUEUE_HEAD(&events); \
|
||||
event = QUEUE_DATA(q, uv__fsevents_event_t, member); \
|
||||
QUEUE_REMOVE(q); \
|
||||
while (!uv__queue_empty(&events)) { \
|
||||
q = uv__queue_head(&events); \
|
||||
event = uv__queue_data(q, uv__fsevents_event_t, member); \
|
||||
uv__queue_remove(q); \
|
||||
/* NOTE: Checking uv__is_active() is required here, because handle \
|
||||
* callback may close handle and invoking it after it will lead to \
|
||||
* incorrect behaviour */ \
|
||||
@ -193,14 +193,14 @@ static void uv__fsevents_cb(uv_async_t* cb) {
|
||||
|
||||
/* Runs in CF thread, pushed event into handle's event list */
|
||||
static void uv__fsevents_push_event(uv_fs_event_t* handle,
|
||||
QUEUE* events,
|
||||
struct uv__queue* events,
|
||||
int err) {
|
||||
assert(events != NULL || err != 0);
|
||||
uv_mutex_lock(&handle->cf_mutex);
|
||||
|
||||
/* Concatenate two queues */
|
||||
if (events != NULL)
|
||||
QUEUE_ADD(&handle->cf_events, events);
|
||||
uv__queue_add(&handle->cf_events, events);
|
||||
|
||||
/* Propagate error */
|
||||
if (err != 0)
|
||||
@ -224,12 +224,12 @@ static void uv__fsevents_event_cb(const FSEventStreamRef streamRef,
|
||||
char* path;
|
||||
char* pos;
|
||||
uv_fs_event_t* handle;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv_loop_t* loop;
|
||||
uv__cf_loop_state_t* state;
|
||||
uv__fsevents_event_t* event;
|
||||
FSEventStreamEventFlags flags;
|
||||
QUEUE head;
|
||||
struct uv__queue head;
|
||||
|
||||
loop = info;
|
||||
state = loop->cf_state;
|
||||
@ -238,9 +238,9 @@ static void uv__fsevents_event_cb(const FSEventStreamRef streamRef,
|
||||
|
||||
/* For each handle */
|
||||
uv_mutex_lock(&state->fsevent_mutex);
|
||||
QUEUE_FOREACH(q, &state->fsevent_handles) {
|
||||
handle = QUEUE_DATA(q, uv_fs_event_t, cf_member);
|
||||
QUEUE_INIT(&head);
|
||||
uv__queue_foreach(q, &state->fsevent_handles) {
|
||||
handle = uv__queue_data(q, uv_fs_event_t, cf_member);
|
||||
uv__queue_init(&head);
|
||||
|
||||
/* Process and filter out events */
|
||||
for (i = 0; i < numEvents; i++) {
|
||||
@ -318,10 +318,10 @@ static void uv__fsevents_event_cb(const FSEventStreamRef streamRef,
|
||||
event->events = UV_CHANGE;
|
||||
}
|
||||
|
||||
QUEUE_INSERT_TAIL(&head, &event->member);
|
||||
uv__queue_insert_tail(&head, &event->member);
|
||||
}
|
||||
|
||||
if (!QUEUE_EMPTY(&head))
|
||||
if (!uv__queue_empty(&head))
|
||||
uv__fsevents_push_event(handle, &head, 0);
|
||||
}
|
||||
uv_mutex_unlock(&state->fsevent_mutex);
|
||||
@ -403,7 +403,7 @@ static void uv__fsevents_destroy_stream(uv__cf_loop_state_t* state) {
|
||||
static void uv__fsevents_reschedule(uv__cf_loop_state_t* state,
|
||||
uv_loop_t* loop,
|
||||
uv__cf_loop_signal_type_t type) {
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv_fs_event_t* curr;
|
||||
CFArrayRef cf_paths;
|
||||
CFStringRef* paths;
|
||||
@ -446,9 +446,9 @@ static void uv__fsevents_reschedule(uv__cf_loop_state_t* state,
|
||||
|
||||
q = &state->fsevent_handles;
|
||||
for (; i < path_count; i++) {
|
||||
q = QUEUE_NEXT(q);
|
||||
q = uv__queue_next(q);
|
||||
assert(q != &state->fsevent_handles);
|
||||
curr = QUEUE_DATA(q, uv_fs_event_t, cf_member);
|
||||
curr = uv__queue_data(q, uv_fs_event_t, cf_member);
|
||||
|
||||
assert(curr->realpath != NULL);
|
||||
paths[i] =
|
||||
@ -486,8 +486,8 @@ final:
|
||||
|
||||
/* Broadcast error to all handles */
|
||||
uv_mutex_lock(&state->fsevent_mutex);
|
||||
QUEUE_FOREACH(q, &state->fsevent_handles) {
|
||||
curr = QUEUE_DATA(q, uv_fs_event_t, cf_member);
|
||||
uv__queue_foreach(q, &state->fsevent_handles) {
|
||||
curr = uv__queue_data(q, uv_fs_event_t, cf_member);
|
||||
uv__fsevents_push_event(curr, NULL, err);
|
||||
}
|
||||
uv_mutex_unlock(&state->fsevent_mutex);
|
||||
@ -606,7 +606,7 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) {
|
||||
if (err)
|
||||
goto fail_sem_init;
|
||||
|
||||
QUEUE_INIT(&loop->cf_signals);
|
||||
uv__queue_init(&loop->cf_signals);
|
||||
|
||||
err = uv_sem_init(&state->fsevent_sem, 0);
|
||||
if (err)
|
||||
@ -616,7 +616,7 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) {
|
||||
if (err)
|
||||
goto fail_fsevent_mutex_init;
|
||||
|
||||
QUEUE_INIT(&state->fsevent_handles);
|
||||
uv__queue_init(&state->fsevent_handles);
|
||||
state->fsevent_need_reschedule = 0;
|
||||
state->fsevent_handle_count = 0;
|
||||
|
||||
@ -675,7 +675,7 @@ fail_mutex_init:
|
||||
void uv__fsevents_loop_delete(uv_loop_t* loop) {
|
||||
uv__cf_loop_signal_t* s;
|
||||
uv__cf_loop_state_t* state;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
|
||||
if (loop->cf_state == NULL)
|
||||
return;
|
||||
@ -688,10 +688,10 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) {
|
||||
uv_mutex_destroy(&loop->cf_mutex);
|
||||
|
||||
/* Free any remaining data */
|
||||
while (!QUEUE_EMPTY(&loop->cf_signals)) {
|
||||
q = QUEUE_HEAD(&loop->cf_signals);
|
||||
s = QUEUE_DATA(q, uv__cf_loop_signal_t, member);
|
||||
QUEUE_REMOVE(q);
|
||||
while (!uv__queue_empty(&loop->cf_signals)) {
|
||||
q = uv__queue_head(&loop->cf_signals);
|
||||
s = uv__queue_data(q, uv__cf_loop_signal_t, member);
|
||||
uv__queue_remove(q);
|
||||
uv__free(s);
|
||||
}
|
||||
|
||||
@ -735,22 +735,22 @@ static void* uv__cf_loop_runner(void* arg) {
|
||||
static void uv__cf_loop_cb(void* arg) {
|
||||
uv_loop_t* loop;
|
||||
uv__cf_loop_state_t* state;
|
||||
QUEUE* item;
|
||||
QUEUE split_head;
|
||||
struct uv__queue* item;
|
||||
struct uv__queue split_head;
|
||||
uv__cf_loop_signal_t* s;
|
||||
|
||||
loop = arg;
|
||||
state = loop->cf_state;
|
||||
|
||||
uv_mutex_lock(&loop->cf_mutex);
|
||||
QUEUE_MOVE(&loop->cf_signals, &split_head);
|
||||
uv__queue_move(&loop->cf_signals, &split_head);
|
||||
uv_mutex_unlock(&loop->cf_mutex);
|
||||
|
||||
while (!QUEUE_EMPTY(&split_head)) {
|
||||
item = QUEUE_HEAD(&split_head);
|
||||
QUEUE_REMOVE(item);
|
||||
while (!uv__queue_empty(&split_head)) {
|
||||
item = uv__queue_head(&split_head);
|
||||
uv__queue_remove(item);
|
||||
|
||||
s = QUEUE_DATA(item, uv__cf_loop_signal_t, member);
|
||||
s = uv__queue_data(item, uv__cf_loop_signal_t, member);
|
||||
|
||||
/* This was a termination signal */
|
||||
if (s->handle == NULL)
|
||||
@ -778,7 +778,7 @@ int uv__cf_loop_signal(uv_loop_t* loop,
|
||||
item->type = type;
|
||||
|
||||
uv_mutex_lock(&loop->cf_mutex);
|
||||
QUEUE_INSERT_TAIL(&loop->cf_signals, &item->member);
|
||||
uv__queue_insert_tail(&loop->cf_signals, &item->member);
|
||||
|
||||
state = loop->cf_state;
|
||||
assert(state != NULL);
|
||||
@ -807,7 +807,7 @@ int uv__fsevents_init(uv_fs_event_t* handle) {
|
||||
handle->realpath_len = strlen(handle->realpath);
|
||||
|
||||
/* Initialize event queue */
|
||||
QUEUE_INIT(&handle->cf_events);
|
||||
uv__queue_init(&handle->cf_events);
|
||||
handle->cf_error = 0;
|
||||
|
||||
/*
|
||||
@ -832,7 +832,7 @@ int uv__fsevents_init(uv_fs_event_t* handle) {
|
||||
/* Insert handle into the list */
|
||||
state = handle->loop->cf_state;
|
||||
uv_mutex_lock(&state->fsevent_mutex);
|
||||
QUEUE_INSERT_TAIL(&state->fsevent_handles, &handle->cf_member);
|
||||
uv__queue_insert_tail(&state->fsevent_handles, &handle->cf_member);
|
||||
state->fsevent_handle_count++;
|
||||
state->fsevent_need_reschedule = 1;
|
||||
uv_mutex_unlock(&state->fsevent_mutex);
|
||||
@ -872,7 +872,7 @@ int uv__fsevents_close(uv_fs_event_t* handle) {
|
||||
/* Remove handle from the list */
|
||||
state = handle->loop->cf_state;
|
||||
uv_mutex_lock(&state->fsevent_mutex);
|
||||
QUEUE_REMOVE(&handle->cf_member);
|
||||
uv__queue_remove(&handle->cf_member);
|
||||
state->fsevent_handle_count--;
|
||||
state->fsevent_need_reschedule = 1;
|
||||
uv_mutex_unlock(&state->fsevent_mutex);
|
||||
|
@ -133,7 +133,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
struct timespec spec;
|
||||
unsigned int nevents;
|
||||
unsigned int revents;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__io_t* w;
|
||||
uv_process_t* process;
|
||||
sigset_t* pset;
|
||||
@ -152,19 +152,19 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
int reset_timeout;
|
||||
|
||||
if (loop->nfds == 0) {
|
||||
assert(QUEUE_EMPTY(&loop->watcher_queue));
|
||||
assert(uv__queue_empty(&loop->watcher_queue));
|
||||
return;
|
||||
}
|
||||
|
||||
lfields = uv__get_internal_fields(loop);
|
||||
nevents = 0;
|
||||
|
||||
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
|
||||
q = QUEUE_HEAD(&loop->watcher_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
while (!uv__queue_empty(&loop->watcher_queue)) {
|
||||
q = uv__queue_head(&loop->watcher_queue);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
|
||||
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
|
||||
w = uv__queue_data(q, uv__io_t, watcher_queue);
|
||||
assert(w->pevents != 0);
|
||||
assert(w->fd >= 0);
|
||||
assert(w->fd < (int) loop->nwatchers);
|
||||
@ -307,8 +307,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
|
||||
/* Handle kevent NOTE_EXIT results */
|
||||
if (ev->filter == EVFILT_PROC) {
|
||||
QUEUE_FOREACH(q, &loop->process_handles) {
|
||||
process = QUEUE_DATA(q, uv_process_t, queue);
|
||||
uv__queue_foreach(q, &loop->process_handles) {
|
||||
process = uv__queue_data(q, uv_process_t, queue);
|
||||
if (process->pid == fd) {
|
||||
process->flags |= UV_HANDLE_REAP;
|
||||
loop->flags |= UV_LOOP_REAP_CHILDREN;
|
||||
|
@ -266,7 +266,7 @@ STATIC_ASSERT(EPOLL_CTL_MOD < 4);
|
||||
|
||||
struct watcher_list {
|
||||
RB_ENTRY(watcher_list) entry;
|
||||
QUEUE watchers;
|
||||
struct uv__queue watchers;
|
||||
int iterating;
|
||||
char* path;
|
||||
int wd;
|
||||
@ -701,7 +701,7 @@ static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou,
|
||||
req->work_req.loop = loop;
|
||||
req->work_req.work = NULL;
|
||||
req->work_req.done = NULL;
|
||||
QUEUE_INIT(&req->work_req.wq);
|
||||
uv__queue_init(&req->work_req.wq);
|
||||
|
||||
uv__req_register(loop, req);
|
||||
iou->in_flight++;
|
||||
@ -1224,7 +1224,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
struct uv__iou* ctl;
|
||||
struct uv__iou* iou;
|
||||
int real_timeout;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__io_t* w;
|
||||
sigset_t* sigmask;
|
||||
sigset_t sigset;
|
||||
@ -1270,11 +1270,11 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
|
||||
memset(&e, 0, sizeof(e));
|
||||
|
||||
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
|
||||
q = QUEUE_HEAD(&loop->watcher_queue);
|
||||
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
while (!uv__queue_empty(&loop->watcher_queue)) {
|
||||
q = uv__queue_head(&loop->watcher_queue);
|
||||
w = uv__queue_data(q, uv__io_t, watcher_queue);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
|
||||
op = EPOLL_CTL_MOD;
|
||||
if (w->events == 0)
|
||||
@ -2229,8 +2229,8 @@ static int uv__inotify_fork(uv_loop_t* loop, struct watcher_list* root) {
|
||||
struct watcher_list* tmp_watcher_list_iter;
|
||||
struct watcher_list* watcher_list;
|
||||
struct watcher_list tmp_watcher_list;
|
||||
QUEUE queue;
|
||||
QUEUE* q;
|
||||
struct uv__queue queue;
|
||||
struct uv__queue* q;
|
||||
uv_fs_event_t* handle;
|
||||
char* tmp_path;
|
||||
|
||||
@ -2242,41 +2242,41 @@ static int uv__inotify_fork(uv_loop_t* loop, struct watcher_list* root) {
|
||||
*/
|
||||
loop->inotify_watchers = root;
|
||||
|
||||
QUEUE_INIT(&tmp_watcher_list.watchers);
|
||||
uv__queue_init(&tmp_watcher_list.watchers);
|
||||
/* Note that the queue we use is shared with the start and stop()
|
||||
* functions, making QUEUE_FOREACH unsafe to use. So we use the
|
||||
* QUEUE_MOVE trick to safely iterate. Also don't free the watcher
|
||||
* functions, making uv__queue_foreach unsafe to use. So we use the
|
||||
* uv__queue_move trick to safely iterate. Also don't free the watcher
|
||||
* list until we're done iterating. c.f. uv__inotify_read.
|
||||
*/
|
||||
RB_FOREACH_SAFE(watcher_list, watcher_root,
|
||||
uv__inotify_watchers(loop), tmp_watcher_list_iter) {
|
||||
watcher_list->iterating = 1;
|
||||
QUEUE_MOVE(&watcher_list->watchers, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
handle = QUEUE_DATA(q, uv_fs_event_t, watchers);
|
||||
uv__queue_move(&watcher_list->watchers, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
handle = uv__queue_data(q, uv_fs_event_t, watchers);
|
||||
/* It's critical to keep a copy of path here, because it
|
||||
* will be set to NULL by stop() and then deallocated by
|
||||
* maybe_free_watcher_list
|
||||
*/
|
||||
tmp_path = uv__strdup(handle->path);
|
||||
assert(tmp_path != NULL);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INSERT_TAIL(&watcher_list->watchers, q);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_insert_tail(&watcher_list->watchers, q);
|
||||
uv_fs_event_stop(handle);
|
||||
|
||||
QUEUE_INSERT_TAIL(&tmp_watcher_list.watchers, &handle->watchers);
|
||||
uv__queue_insert_tail(&tmp_watcher_list.watchers, &handle->watchers);
|
||||
handle->path = tmp_path;
|
||||
}
|
||||
watcher_list->iterating = 0;
|
||||
maybe_free_watcher_list(watcher_list, loop);
|
||||
}
|
||||
|
||||
QUEUE_MOVE(&tmp_watcher_list.watchers, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
QUEUE_REMOVE(q);
|
||||
handle = QUEUE_DATA(q, uv_fs_event_t, watchers);
|
||||
uv__queue_move(&tmp_watcher_list.watchers, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
uv__queue_remove(q);
|
||||
handle = uv__queue_data(q, uv_fs_event_t, watchers);
|
||||
tmp_path = handle->path;
|
||||
handle->path = NULL;
|
||||
err = uv_fs_event_start(handle, handle->cb, tmp_path, 0);
|
||||
@ -2298,7 +2298,7 @@ static struct watcher_list* find_watcher(uv_loop_t* loop, int wd) {
|
||||
|
||||
static void maybe_free_watcher_list(struct watcher_list* w, uv_loop_t* loop) {
|
||||
/* if the watcher_list->watchers is being iterated over, we can't free it. */
|
||||
if ((!w->iterating) && QUEUE_EMPTY(&w->watchers)) {
|
||||
if ((!w->iterating) && uv__queue_empty(&w->watchers)) {
|
||||
/* No watchers left for this path. Clean up. */
|
||||
RB_REMOVE(watcher_root, uv__inotify_watchers(loop), w);
|
||||
inotify_rm_watch(loop->inotify_fd, w->wd);
|
||||
@ -2313,8 +2313,8 @@ static void uv__inotify_read(uv_loop_t* loop,
|
||||
const struct inotify_event* e;
|
||||
struct watcher_list* w;
|
||||
uv_fs_event_t* h;
|
||||
QUEUE queue;
|
||||
QUEUE* q;
|
||||
struct uv__queue queue;
|
||||
struct uv__queue* q;
|
||||
const char* path;
|
||||
ssize_t size;
|
||||
const char *p;
|
||||
@ -2357,7 +2357,7 @@ static void uv__inotify_read(uv_loop_t* loop,
|
||||
* What can go wrong?
|
||||
* A callback could call uv_fs_event_stop()
|
||||
* and the queue can change under our feet.
|
||||
* So, we use QUEUE_MOVE() trick to safely iterate over the queue.
|
||||
* So, we use uv__queue_move() trick to safely iterate over the queue.
|
||||
* And we don't free the watcher_list until we're done iterating.
|
||||
*
|
||||
* First,
|
||||
@ -2365,13 +2365,13 @@ static void uv__inotify_read(uv_loop_t* loop,
|
||||
* not to free watcher_list.
|
||||
*/
|
||||
w->iterating = 1;
|
||||
QUEUE_MOVE(&w->watchers, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
h = QUEUE_DATA(q, uv_fs_event_t, watchers);
|
||||
uv__queue_move(&w->watchers, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
h = uv__queue_data(q, uv_fs_event_t, watchers);
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INSERT_TAIL(&w->watchers, q);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_insert_tail(&w->watchers, q);
|
||||
|
||||
h->cb(h, path, events, 0);
|
||||
}
|
||||
@ -2433,13 +2433,13 @@ int uv_fs_event_start(uv_fs_event_t* handle,
|
||||
|
||||
w->wd = wd;
|
||||
w->path = memcpy(w + 1, path, len);
|
||||
QUEUE_INIT(&w->watchers);
|
||||
uv__queue_init(&w->watchers);
|
||||
w->iterating = 0;
|
||||
RB_INSERT(watcher_root, uv__inotify_watchers(loop), w);
|
||||
|
||||
no_insert:
|
||||
uv__handle_start(handle);
|
||||
QUEUE_INSERT_TAIL(&w->watchers, &handle->watchers);
|
||||
uv__queue_insert_tail(&w->watchers, &handle->watchers);
|
||||
handle->path = w->path;
|
||||
handle->cb = cb;
|
||||
handle->wd = wd;
|
||||
@ -2460,7 +2460,7 @@ int uv_fs_event_stop(uv_fs_event_t* handle) {
|
||||
handle->wd = -1;
|
||||
handle->path = NULL;
|
||||
uv__handle_stop(handle);
|
||||
QUEUE_REMOVE(&handle->watchers);
|
||||
uv__queue_remove(&handle->watchers);
|
||||
|
||||
maybe_free_watcher_list(w, handle->loop);
|
||||
|
||||
|
@ -32,7 +32,7 @@
|
||||
int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \
|
||||
if (uv__is_active(handle)) return 0; \
|
||||
if (cb == NULL) return UV_EINVAL; \
|
||||
QUEUE_INSERT_HEAD(&handle->loop->name##_handles, &handle->queue); \
|
||||
uv__queue_insert_head(&handle->loop->name##_handles, &handle->queue); \
|
||||
handle->name##_cb = cb; \
|
||||
uv__handle_start(handle); \
|
||||
return 0; \
|
||||
@ -40,21 +40,21 @@
|
||||
\
|
||||
int uv_##name##_stop(uv_##name##_t* handle) { \
|
||||
if (!uv__is_active(handle)) return 0; \
|
||||
QUEUE_REMOVE(&handle->queue); \
|
||||
uv__queue_remove(&handle->queue); \
|
||||
uv__handle_stop(handle); \
|
||||
return 0; \
|
||||
} \
|
||||
\
|
||||
void uv__run_##name(uv_loop_t* loop) { \
|
||||
uv_##name##_t* h; \
|
||||
QUEUE queue; \
|
||||
QUEUE* q; \
|
||||
QUEUE_MOVE(&loop->name##_handles, &queue); \
|
||||
while (!QUEUE_EMPTY(&queue)) { \
|
||||
q = QUEUE_HEAD(&queue); \
|
||||
h = QUEUE_DATA(q, uv_##name##_t, queue); \
|
||||
QUEUE_REMOVE(q); \
|
||||
QUEUE_INSERT_TAIL(&loop->name##_handles, q); \
|
||||
struct uv__queue queue; \
|
||||
struct uv__queue* q; \
|
||||
uv__queue_move(&loop->name##_handles, &queue); \
|
||||
while (!uv__queue_empty(&queue)) { \
|
||||
q = uv__queue_head(&queue); \
|
||||
h = uv__queue_data(q, uv_##name##_t, queue); \
|
||||
uv__queue_remove(q); \
|
||||
uv__queue_insert_tail(&loop->name##_handles, q); \
|
||||
h->name##_cb(h); \
|
||||
} \
|
||||
} \
|
||||
|
@ -50,20 +50,20 @@ int uv_loop_init(uv_loop_t* loop) {
|
||||
sizeof(lfields->loop_metrics.metrics));
|
||||
|
||||
heap_init((struct heap*) &loop->timer_heap);
|
||||
QUEUE_INIT(&loop->wq);
|
||||
QUEUE_INIT(&loop->idle_handles);
|
||||
QUEUE_INIT(&loop->async_handles);
|
||||
QUEUE_INIT(&loop->check_handles);
|
||||
QUEUE_INIT(&loop->prepare_handles);
|
||||
QUEUE_INIT(&loop->handle_queue);
|
||||
uv__queue_init(&loop->wq);
|
||||
uv__queue_init(&loop->idle_handles);
|
||||
uv__queue_init(&loop->async_handles);
|
||||
uv__queue_init(&loop->check_handles);
|
||||
uv__queue_init(&loop->prepare_handles);
|
||||
uv__queue_init(&loop->handle_queue);
|
||||
|
||||
loop->active_handles = 0;
|
||||
loop->active_reqs.count = 0;
|
||||
loop->nfds = 0;
|
||||
loop->watchers = NULL;
|
||||
loop->nwatchers = 0;
|
||||
QUEUE_INIT(&loop->pending_queue);
|
||||
QUEUE_INIT(&loop->watcher_queue);
|
||||
uv__queue_init(&loop->pending_queue);
|
||||
uv__queue_init(&loop->watcher_queue);
|
||||
|
||||
loop->closing_handles = NULL;
|
||||
uv__update_time(loop);
|
||||
@ -85,7 +85,7 @@ int uv_loop_init(uv_loop_t* loop) {
|
||||
err = uv__process_init(loop);
|
||||
if (err)
|
||||
goto fail_signal_init;
|
||||
QUEUE_INIT(&loop->process_handles);
|
||||
uv__queue_init(&loop->process_handles);
|
||||
|
||||
err = uv_rwlock_init(&loop->cloexec_lock);
|
||||
if (err)
|
||||
@ -152,9 +152,9 @@ int uv_loop_fork(uv_loop_t* loop) {
|
||||
if (w == NULL)
|
||||
continue;
|
||||
|
||||
if (w->pevents != 0 && QUEUE_EMPTY(&w->watcher_queue)) {
|
||||
if (w->pevents != 0 && uv__queue_empty(&w->watcher_queue)) {
|
||||
w->events = 0; /* Force re-registration in uv__io_poll. */
|
||||
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
|
||||
uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
|
||||
}
|
||||
}
|
||||
|
||||
@ -180,7 +180,7 @@ void uv__loop_close(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
uv_mutex_lock(&loop->wq_mutex);
|
||||
assert(QUEUE_EMPTY(&loop->wq) && "thread pool work queue not empty!");
|
||||
assert(uv__queue_empty(&loop->wq) && "thread pool work queue not empty!");
|
||||
assert(!uv__has_active_reqs(loop));
|
||||
uv_mutex_unlock(&loop->wq_mutex);
|
||||
uv_mutex_destroy(&loop->wq_mutex);
|
||||
@ -192,8 +192,8 @@ void uv__loop_close(uv_loop_t* loop) {
|
||||
uv_rwlock_destroy(&loop->cloexec_lock);
|
||||
|
||||
#if 0
|
||||
assert(QUEUE_EMPTY(&loop->pending_queue));
|
||||
assert(QUEUE_EMPTY(&loop->watcher_queue));
|
||||
assert(uv__queue_empty(&loop->pending_queue));
|
||||
assert(uv__queue_empty(&loop->watcher_queue));
|
||||
assert(loop->nfds == 0);
|
||||
#endif
|
||||
|
||||
|
@ -27,7 +27,7 @@
|
||||
#include <termios.h>
|
||||
#include <sys/msg.h>
|
||||
|
||||
static QUEUE global_epoll_queue;
|
||||
static struct uv__queue global_epoll_queue;
|
||||
static uv_mutex_t global_epoll_lock;
|
||||
static uv_once_t once = UV_ONCE_INIT;
|
||||
|
||||
@ -178,18 +178,18 @@ static void after_fork(void) {
|
||||
|
||||
|
||||
static void child_fork(void) {
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv_once_t child_once = UV_ONCE_INIT;
|
||||
|
||||
/* reset once */
|
||||
memcpy(&once, &child_once, sizeof(child_once));
|
||||
|
||||
/* reset epoll list */
|
||||
while (!QUEUE_EMPTY(&global_epoll_queue)) {
|
||||
while (!uv__queue_empty(&global_epoll_queue)) {
|
||||
uv__os390_epoll* lst;
|
||||
q = QUEUE_HEAD(&global_epoll_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
lst = QUEUE_DATA(q, uv__os390_epoll, member);
|
||||
q = uv__queue_head(&global_epoll_queue);
|
||||
uv__queue_remove(q);
|
||||
lst = uv__queue_data(q, uv__os390_epoll, member);
|
||||
uv__free(lst->items);
|
||||
lst->items = NULL;
|
||||
lst->size = 0;
|
||||
@ -201,7 +201,7 @@ static void child_fork(void) {
|
||||
|
||||
|
||||
static void epoll_init(void) {
|
||||
QUEUE_INIT(&global_epoll_queue);
|
||||
uv__queue_init(&global_epoll_queue);
|
||||
if (uv_mutex_init(&global_epoll_lock))
|
||||
abort();
|
||||
|
||||
@ -225,7 +225,7 @@ uv__os390_epoll* epoll_create1(int flags) {
|
||||
lst->items[lst->size - 1].revents = 0;
|
||||
uv_once(&once, epoll_init);
|
||||
uv_mutex_lock(&global_epoll_lock);
|
||||
QUEUE_INSERT_TAIL(&global_epoll_queue, &lst->member);
|
||||
uv__queue_insert_tail(&global_epoll_queue, &lst->member);
|
||||
uv_mutex_unlock(&global_epoll_lock);
|
||||
}
|
||||
|
||||
@ -352,14 +352,14 @@ int epoll_wait(uv__os390_epoll* lst, struct epoll_event* events,
|
||||
|
||||
|
||||
int epoll_file_close(int fd) {
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
|
||||
uv_once(&once, epoll_init);
|
||||
uv_mutex_lock(&global_epoll_lock);
|
||||
QUEUE_FOREACH(q, &global_epoll_queue) {
|
||||
uv__queue_foreach(q, &global_epoll_queue) {
|
||||
uv__os390_epoll* lst;
|
||||
|
||||
lst = QUEUE_DATA(q, uv__os390_epoll, member);
|
||||
lst = uv__queue_data(q, uv__os390_epoll, member);
|
||||
if (fd < lst->size && lst->items != NULL && lst->items[fd].fd != -1)
|
||||
lst->items[fd].fd = -1;
|
||||
}
|
||||
@ -371,7 +371,7 @@ int epoll_file_close(int fd) {
|
||||
void epoll_queue_close(uv__os390_epoll* lst) {
|
||||
/* Remove epoll instance from global queue */
|
||||
uv_mutex_lock(&global_epoll_lock);
|
||||
QUEUE_REMOVE(&lst->member);
|
||||
uv__queue_remove(&lst->member);
|
||||
uv_mutex_unlock(&global_epoll_lock);
|
||||
|
||||
/* Free resources */
|
||||
|
@ -45,7 +45,7 @@ struct epoll_event {
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
QUEUE member;
|
||||
struct uv__queue member;
|
||||
struct pollfd* items;
|
||||
unsigned long size;
|
||||
int msg_queue;
|
||||
|
@ -815,7 +815,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
uv__os390_epoll* ep;
|
||||
int have_signals;
|
||||
int real_timeout;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__io_t* w;
|
||||
uint64_t base;
|
||||
int count;
|
||||
@ -827,19 +827,19 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
int reset_timeout;
|
||||
|
||||
if (loop->nfds == 0) {
|
||||
assert(QUEUE_EMPTY(&loop->watcher_queue));
|
||||
assert(uv__queue_empty(&loop->watcher_queue));
|
||||
return;
|
||||
}
|
||||
|
||||
lfields = uv__get_internal_fields(loop);
|
||||
|
||||
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
|
||||
while (!uv__queue_empty(&loop->watcher_queue)) {
|
||||
uv_stream_t* stream;
|
||||
|
||||
q = QUEUE_HEAD(&loop->watcher_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
|
||||
q = uv__queue_head(&loop->watcher_queue);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
w = uv__queue_data(q, uv__io_t, watcher_queue);
|
||||
|
||||
assert(w->pevents != 0);
|
||||
assert(w->fd >= 0);
|
||||
|
@ -230,7 +230,7 @@ out:
|
||||
uv__req_init(handle->loop, req, UV_CONNECT);
|
||||
req->handle = (uv_stream_t*)handle;
|
||||
req->cb = cb;
|
||||
QUEUE_INIT(&req->queue);
|
||||
uv__queue_init(&req->queue);
|
||||
|
||||
/* Force callback to run on next tick in case of error. */
|
||||
if (err)
|
||||
|
@ -137,7 +137,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
sigset_t set;
|
||||
uint64_t time_base;
|
||||
uint64_t time_diff;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__io_t* w;
|
||||
size_t i;
|
||||
unsigned int nevents;
|
||||
@ -149,19 +149,19 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
int reset_timeout;
|
||||
|
||||
if (loop->nfds == 0) {
|
||||
assert(QUEUE_EMPTY(&loop->watcher_queue));
|
||||
assert(uv__queue_empty(&loop->watcher_queue));
|
||||
return;
|
||||
}
|
||||
|
||||
lfields = uv__get_internal_fields(loop);
|
||||
|
||||
/* Take queued watchers and add their fds to our poll fds array. */
|
||||
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
|
||||
q = QUEUE_HEAD(&loop->watcher_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
while (!uv__queue_empty(&loop->watcher_queue)) {
|
||||
q = uv__queue_head(&loop->watcher_queue);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
|
||||
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
|
||||
w = uv__queue_data(q, uv__io_t, watcher_queue);
|
||||
assert(w->pevents != 0);
|
||||
assert(w->fd >= 0);
|
||||
assert(w->fd < (int) loop->nwatchers);
|
||||
|
@ -108,17 +108,17 @@ void uv__wait_children(uv_loop_t* loop) {
|
||||
int status;
|
||||
int options;
|
||||
pid_t pid;
|
||||
QUEUE pending;
|
||||
QUEUE* q;
|
||||
QUEUE* h;
|
||||
struct uv__queue pending;
|
||||
struct uv__queue* q;
|
||||
struct uv__queue* h;
|
||||
|
||||
QUEUE_INIT(&pending);
|
||||
uv__queue_init(&pending);
|
||||
|
||||
h = &loop->process_handles;
|
||||
q = QUEUE_HEAD(h);
|
||||
q = uv__queue_head(h);
|
||||
while (q != h) {
|
||||
process = QUEUE_DATA(q, uv_process_t, queue);
|
||||
q = QUEUE_NEXT(q);
|
||||
process = uv__queue_data(q, uv_process_t, queue);
|
||||
q = uv__queue_next(q);
|
||||
|
||||
#ifndef UV_USE_SIGCHLD
|
||||
if ((process->flags & UV_HANDLE_REAP) == 0)
|
||||
@ -149,18 +149,18 @@ void uv__wait_children(uv_loop_t* loop) {
|
||||
|
||||
assert(pid == process->pid);
|
||||
process->status = status;
|
||||
QUEUE_REMOVE(&process->queue);
|
||||
QUEUE_INSERT_TAIL(&pending, &process->queue);
|
||||
uv__queue_remove(&process->queue);
|
||||
uv__queue_insert_tail(&pending, &process->queue);
|
||||
}
|
||||
|
||||
h = &pending;
|
||||
q = QUEUE_HEAD(h);
|
||||
q = uv__queue_head(h);
|
||||
while (q != h) {
|
||||
process = QUEUE_DATA(q, uv_process_t, queue);
|
||||
q = QUEUE_NEXT(q);
|
||||
process = uv__queue_data(q, uv_process_t, queue);
|
||||
q = uv__queue_next(q);
|
||||
|
||||
QUEUE_REMOVE(&process->queue);
|
||||
QUEUE_INIT(&process->queue);
|
||||
uv__queue_remove(&process->queue);
|
||||
uv__queue_init(&process->queue);
|
||||
uv__handle_stop(process);
|
||||
|
||||
if (process->exit_cb == NULL)
|
||||
@ -176,7 +176,7 @@ void uv__wait_children(uv_loop_t* loop) {
|
||||
|
||||
process->exit_cb(process, exit_status, term_signal);
|
||||
}
|
||||
assert(QUEUE_EMPTY(&pending));
|
||||
assert(uv__queue_empty(&pending));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -978,7 +978,7 @@ int uv_spawn(uv_loop_t* loop,
|
||||
UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS)));
|
||||
|
||||
uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS);
|
||||
QUEUE_INIT(&process->queue);
|
||||
uv__queue_init(&process->queue);
|
||||
process->status = 0;
|
||||
|
||||
stdio_count = options->stdio_count;
|
||||
@ -1041,7 +1041,7 @@ int uv_spawn(uv_loop_t* loop,
|
||||
|
||||
process->pid = pid;
|
||||
process->exit_cb = options->exit_cb;
|
||||
QUEUE_INSERT_TAIL(&loop->process_handles, &process->queue);
|
||||
uv__queue_insert_tail(&loop->process_handles, &process->queue);
|
||||
uv__handle_start(process);
|
||||
}
|
||||
|
||||
@ -1103,10 +1103,10 @@ int uv_kill(int pid, int signum) {
|
||||
|
||||
|
||||
void uv__process_close(uv_process_t* handle) {
|
||||
QUEUE_REMOVE(&handle->queue);
|
||||
uv__queue_remove(&handle->queue);
|
||||
uv__handle_stop(handle);
|
||||
#ifdef UV_USE_SIGCHLD
|
||||
if (QUEUE_EMPTY(&handle->loop->process_handles))
|
||||
if (uv__queue_empty(&handle->loop->process_handles))
|
||||
uv_signal_stop(&handle->loop->child_watcher);
|
||||
#endif
|
||||
}
|
||||
|
@ -291,16 +291,16 @@ int uv__signal_loop_fork(uv_loop_t* loop) {
|
||||
|
||||
|
||||
void uv__signal_loop_cleanup(uv_loop_t* loop) {
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
|
||||
/* Stop all the signal watchers that are still attached to this loop. This
|
||||
* ensures that the (shared) signal tree doesn't contain any invalid entries
|
||||
* entries, and that signal handlers are removed when appropriate.
|
||||
* It's safe to use QUEUE_FOREACH here because the handles and the handle
|
||||
* It's safe to use uv__queue_foreach here because the handles and the handle
|
||||
* queue are not modified by uv__signal_stop().
|
||||
*/
|
||||
QUEUE_FOREACH(q, &loop->handle_queue) {
|
||||
uv_handle_t* handle = QUEUE_DATA(q, uv_handle_t, handle_queue);
|
||||
uv__queue_foreach(q, &loop->handle_queue) {
|
||||
uv_handle_t* handle = uv__queue_data(q, uv_handle_t, handle_queue);
|
||||
|
||||
if (handle->type == UV_SIGNAL)
|
||||
uv__signal_stop((uv_signal_t*) handle);
|
||||
|
@ -94,8 +94,8 @@ void uv__stream_init(uv_loop_t* loop,
|
||||
stream->accepted_fd = -1;
|
||||
stream->queued_fds = NULL;
|
||||
stream->delayed_error = 0;
|
||||
QUEUE_INIT(&stream->write_queue);
|
||||
QUEUE_INIT(&stream->write_completed_queue);
|
||||
uv__queue_init(&stream->write_queue);
|
||||
uv__queue_init(&stream->write_completed_queue);
|
||||
stream->write_queue_size = 0;
|
||||
|
||||
if (loop->emfile_fd == -1) {
|
||||
@ -439,15 +439,15 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
||||
|
||||
void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
|
||||
uv_write_t* req;
|
||||
QUEUE* q;
|
||||
while (!QUEUE_EMPTY(&stream->write_queue)) {
|
||||
q = QUEUE_HEAD(&stream->write_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
struct uv__queue* q;
|
||||
while (!uv__queue_empty(&stream->write_queue)) {
|
||||
q = uv__queue_head(&stream->write_queue);
|
||||
uv__queue_remove(q);
|
||||
|
||||
req = QUEUE_DATA(q, uv_write_t, queue);
|
||||
req = uv__queue_data(q, uv_write_t, queue);
|
||||
req->error = error;
|
||||
|
||||
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
|
||||
uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
|
||||
}
|
||||
}
|
||||
|
||||
@ -627,7 +627,7 @@ static void uv__drain(uv_stream_t* stream) {
|
||||
uv_shutdown_t* req;
|
||||
int err;
|
||||
|
||||
assert(QUEUE_EMPTY(&stream->write_queue));
|
||||
assert(uv__queue_empty(&stream->write_queue));
|
||||
if (!(stream->flags & UV_HANDLE_CLOSING)) {
|
||||
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
|
||||
uv__stream_osx_interrupt_select(stream);
|
||||
@ -714,7 +714,7 @@ static void uv__write_req_finish(uv_write_t* req) {
|
||||
uv_stream_t* stream = req->handle;
|
||||
|
||||
/* Pop the req off tcp->write_queue. */
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
uv__queue_remove(&req->queue);
|
||||
|
||||
/* Only free when there was no error. On error, we touch up write_queue_size
|
||||
* right before making the callback. The reason we don't do that right away
|
||||
@ -731,7 +731,7 @@ static void uv__write_req_finish(uv_write_t* req) {
|
||||
/* Add it to the write_completed_queue where it will have its
|
||||
* callback called in the near future.
|
||||
*/
|
||||
QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
|
||||
uv__queue_insert_tail(&stream->write_completed_queue, &req->queue);
|
||||
uv__io_feed(stream->loop, &stream->io_watcher);
|
||||
}
|
||||
|
||||
@ -837,7 +837,7 @@ static int uv__try_write(uv_stream_t* stream,
|
||||
}
|
||||
|
||||
static void uv__write(uv_stream_t* stream) {
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv_write_t* req;
|
||||
ssize_t n;
|
||||
int count;
|
||||
@ -851,11 +851,11 @@ static void uv__write(uv_stream_t* stream) {
|
||||
count = 32;
|
||||
|
||||
for (;;) {
|
||||
if (QUEUE_EMPTY(&stream->write_queue))
|
||||
if (uv__queue_empty(&stream->write_queue))
|
||||
return;
|
||||
|
||||
q = QUEUE_HEAD(&stream->write_queue);
|
||||
req = QUEUE_DATA(q, uv_write_t, queue);
|
||||
q = uv__queue_head(&stream->write_queue);
|
||||
req = uv__queue_data(q, uv_write_t, queue);
|
||||
assert(req->handle == stream);
|
||||
|
||||
n = uv__try_write(stream,
|
||||
@ -899,19 +899,19 @@ error:
|
||||
|
||||
static void uv__write_callbacks(uv_stream_t* stream) {
|
||||
uv_write_t* req;
|
||||
QUEUE* q;
|
||||
QUEUE pq;
|
||||
struct uv__queue* q;
|
||||
struct uv__queue pq;
|
||||
|
||||
if (QUEUE_EMPTY(&stream->write_completed_queue))
|
||||
if (uv__queue_empty(&stream->write_completed_queue))
|
||||
return;
|
||||
|
||||
QUEUE_MOVE(&stream->write_completed_queue, &pq);
|
||||
uv__queue_move(&stream->write_completed_queue, &pq);
|
||||
|
||||
while (!QUEUE_EMPTY(&pq)) {
|
||||
while (!uv__queue_empty(&pq)) {
|
||||
/* Pop a req off write_completed_queue. */
|
||||
q = QUEUE_HEAD(&pq);
|
||||
req = QUEUE_DATA(q, uv_write_t, queue);
|
||||
QUEUE_REMOVE(q);
|
||||
q = uv__queue_head(&pq);
|
||||
req = uv__queue_data(q, uv_write_t, queue);
|
||||
uv__queue_remove(q);
|
||||
uv__req_unregister(stream->loop, req);
|
||||
|
||||
if (req->bufs != NULL) {
|
||||
@ -1174,7 +1174,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
||||
stream->shutdown_req = req;
|
||||
stream->flags &= ~UV_HANDLE_WRITABLE;
|
||||
|
||||
if (QUEUE_EMPTY(&stream->write_queue))
|
||||
if (uv__queue_empty(&stream->write_queue))
|
||||
uv__io_feed(stream->loop, &stream->io_watcher);
|
||||
|
||||
return 0;
|
||||
@ -1227,7 +1227,7 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
uv__write_callbacks(stream);
|
||||
|
||||
/* Write queue drained. */
|
||||
if (QUEUE_EMPTY(&stream->write_queue))
|
||||
if (uv__queue_empty(&stream->write_queue))
|
||||
uv__drain(stream);
|
||||
}
|
||||
}
|
||||
@ -1270,7 +1270,7 @@ static void uv__stream_connect(uv_stream_t* stream) {
|
||||
stream->connect_req = NULL;
|
||||
uv__req_unregister(stream->loop, req);
|
||||
|
||||
if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
|
||||
if (error < 0 || uv__queue_empty(&stream->write_queue)) {
|
||||
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
|
||||
}
|
||||
|
||||
@ -1352,7 +1352,7 @@ int uv_write2(uv_write_t* req,
|
||||
req->handle = stream;
|
||||
req->error = 0;
|
||||
req->send_handle = send_handle;
|
||||
QUEUE_INIT(&req->queue);
|
||||
uv__queue_init(&req->queue);
|
||||
|
||||
req->bufs = req->bufsml;
|
||||
if (nbufs > ARRAY_SIZE(req->bufsml))
|
||||
@ -1367,7 +1367,7 @@ int uv_write2(uv_write_t* req,
|
||||
stream->write_queue_size += uv__count_bufs(bufs, nbufs);
|
||||
|
||||
/* Append the request to write_queue. */
|
||||
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
|
||||
uv__queue_insert_tail(&stream->write_queue, &req->queue);
|
||||
|
||||
/* If the queue was empty when this function began, we should attempt to
|
||||
* do the write immediately. Otherwise start the write_watcher and wait
|
||||
|
@ -148,7 +148,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
struct port_event events[1024];
|
||||
struct port_event* pe;
|
||||
struct timespec spec;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__io_t* w;
|
||||
sigset_t* pset;
|
||||
sigset_t set;
|
||||
@ -166,16 +166,16 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
int reset_timeout;
|
||||
|
||||
if (loop->nfds == 0) {
|
||||
assert(QUEUE_EMPTY(&loop->watcher_queue));
|
||||
assert(uv__queue_empty(&loop->watcher_queue));
|
||||
return;
|
||||
}
|
||||
|
||||
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
|
||||
q = QUEUE_HEAD(&loop->watcher_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q);
|
||||
while (!uv__queue_empty(&loop->watcher_queue)) {
|
||||
q = uv__queue_head(&loop->watcher_queue);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_init(q);
|
||||
|
||||
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
|
||||
w = uv__queue_data(q, uv__io_t, watcher_queue);
|
||||
assert(w->pevents != 0);
|
||||
|
||||
if (port_associate(loop->backend_fd,
|
||||
@ -316,8 +316,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
continue; /* Disabled by callback. */
|
||||
|
||||
/* Events Ports operates in oneshot mode, rearm timer on next run. */
|
||||
if (w->pevents != 0 && QUEUE_EMPTY(&w->watcher_queue))
|
||||
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
|
||||
if (w->pevents != 0 && uv__queue_empty(&w->watcher_queue))
|
||||
uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
|
||||
}
|
||||
|
||||
uv__metrics_inc_events(loop, nevents);
|
||||
|
@ -124,7 +124,7 @@ int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
|
||||
if (domain != AF_UNSPEC) {
|
||||
err = new_socket(tcp, domain, 0);
|
||||
if (err) {
|
||||
QUEUE_REMOVE(&tcp->handle_queue);
|
||||
uv__queue_remove(&tcp->handle_queue);
|
||||
if (tcp->io_watcher.fd != -1)
|
||||
uv__close(tcp->io_watcher.fd);
|
||||
tcp->io_watcher.fd = -1;
|
||||
@ -252,7 +252,7 @@ out:
|
||||
uv__req_init(handle->loop, req, UV_CONNECT);
|
||||
req->cb = cb;
|
||||
req->handle = (uv_stream_t*) handle;
|
||||
QUEUE_INIT(&req->queue);
|
||||
uv__queue_init(&req->queue);
|
||||
handle->connect_req = req;
|
||||
|
||||
uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
|
||||
|
@ -222,7 +222,7 @@ skip:
|
||||
int rc = r;
|
||||
if (newfd != -1)
|
||||
uv__close(newfd);
|
||||
QUEUE_REMOVE(&tty->handle_queue);
|
||||
uv__queue_remove(&tty->handle_queue);
|
||||
do
|
||||
r = fcntl(fd, F_SETFL, saved_flags);
|
||||
while (r == -1 && errno == EINTR);
|
||||
|
@ -62,18 +62,18 @@ void uv__udp_close(uv_udp_t* handle) {
|
||||
|
||||
void uv__udp_finish_close(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
|
||||
assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT));
|
||||
assert(handle->io_watcher.fd == -1);
|
||||
|
||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||
q = QUEUE_HEAD(&handle->write_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
while (!uv__queue_empty(&handle->write_queue)) {
|
||||
q = uv__queue_head(&handle->write_queue);
|
||||
uv__queue_remove(q);
|
||||
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
req = uv__queue_data(q, uv_udp_send_t, queue);
|
||||
req->status = UV_ECANCELED;
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
|
||||
uv__udp_run_completed(handle);
|
||||
@ -90,16 +90,16 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
||||
|
||||
static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
|
||||
assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING));
|
||||
handle->flags |= UV_HANDLE_UDP_PROCESSING;
|
||||
|
||||
while (!QUEUE_EMPTY(&handle->write_completed_queue)) {
|
||||
q = QUEUE_HEAD(&handle->write_completed_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
while (!uv__queue_empty(&handle->write_completed_queue)) {
|
||||
q = uv__queue_head(&handle->write_completed_queue);
|
||||
uv__queue_remove(q);
|
||||
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
req = uv__queue_data(q, uv_udp_send_t, queue);
|
||||
uv__req_unregister(handle->loop, req);
|
||||
|
||||
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
|
||||
@ -121,7 +121,7 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
req->send_cb(req, req->status);
|
||||
}
|
||||
|
||||
if (QUEUE_EMPTY(&handle->write_queue)) {
|
||||
if (uv__queue_empty(&handle->write_queue)) {
|
||||
/* Pending queue and completion queue empty, stop watcher. */
|
||||
uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT);
|
||||
if (!uv__io_active(&handle->io_watcher, POLLIN))
|
||||
@ -280,20 +280,20 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
struct mmsghdr h[20];
|
||||
struct mmsghdr* p;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
ssize_t npkts;
|
||||
size_t pkts;
|
||||
size_t i;
|
||||
|
||||
if (QUEUE_EMPTY(&handle->write_queue))
|
||||
if (uv__queue_empty(&handle->write_queue))
|
||||
return;
|
||||
|
||||
write_queue_drain:
|
||||
for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
|
||||
for (pkts = 0, q = uv__queue_head(&handle->write_queue);
|
||||
pkts < ARRAY_SIZE(h) && q != &handle->write_queue;
|
||||
++pkts, q = QUEUE_HEAD(q)) {
|
||||
++pkts, q = uv__queue_head(q)) {
|
||||
assert(q != NULL);
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
req = uv__queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
p = &h[pkts];
|
||||
@ -325,16 +325,16 @@ write_queue_drain:
|
||||
if (npkts < 1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
|
||||
return;
|
||||
for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
|
||||
for (i = 0, q = uv__queue_head(&handle->write_queue);
|
||||
i < pkts && q != &handle->write_queue;
|
||||
++i, q = QUEUE_HEAD(&handle->write_queue)) {
|
||||
++i, q = uv__queue_head(&handle->write_queue)) {
|
||||
assert(q != NULL);
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
req = uv__queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
req->status = UV__ERR(errno);
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
uv__queue_remove(&req->queue);
|
||||
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
uv__io_feed(handle->loop, &handle->io_watcher);
|
||||
return;
|
||||
@ -343,11 +343,11 @@ write_queue_drain:
|
||||
/* Safety: npkts known to be >0 below. Hence cast from ssize_t
|
||||
* to size_t safe.
|
||||
*/
|
||||
for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
|
||||
for (i = 0, q = uv__queue_head(&handle->write_queue);
|
||||
i < (size_t)npkts && q != &handle->write_queue;
|
||||
++i, q = QUEUE_HEAD(&handle->write_queue)) {
|
||||
++i, q = uv__queue_head(&handle->write_queue)) {
|
||||
assert(q != NULL);
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
req = uv__queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
req->status = req->bufs[0].len;
|
||||
@ -357,25 +357,25 @@ write_queue_drain:
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
uv__queue_remove(&req->queue);
|
||||
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
|
||||
/* couldn't batch everything, continue sending (jump to avoid stack growth) */
|
||||
if (!QUEUE_EMPTY(&handle->write_queue))
|
||||
if (!uv__queue_empty(&handle->write_queue))
|
||||
goto write_queue_drain;
|
||||
uv__io_feed(handle->loop, &handle->io_watcher);
|
||||
#else /* __linux__ || ____FreeBSD__ */
|
||||
uv_udp_send_t* req;
|
||||
struct msghdr h;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
ssize_t size;
|
||||
|
||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||
q = QUEUE_HEAD(&handle->write_queue);
|
||||
while (!uv__queue_empty(&handle->write_queue)) {
|
||||
q = uv__queue_head(&handle->write_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
req = uv__queue_data(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
memset(&h, 0, sizeof h);
|
||||
@ -414,8 +414,8 @@ write_queue_drain:
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
uv__queue_remove(&req->queue);
|
||||
uv__queue_insert_tail(&handle->write_completed_queue, &req->queue);
|
||||
uv__io_feed(handle->loop, &handle->io_watcher);
|
||||
}
|
||||
#endif /* __linux__ || ____FreeBSD__ */
|
||||
@ -729,7 +729,7 @@ int uv__udp_send(uv_udp_send_t* req,
|
||||
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
|
||||
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
|
||||
handle->send_queue_count++;
|
||||
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
|
||||
uv__queue_insert_tail(&handle->write_queue, &req->queue);
|
||||
uv__handle_start(handle);
|
||||
|
||||
if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) {
|
||||
@ -739,7 +739,7 @@ int uv__udp_send(uv_udp_send_t* req,
|
||||
* away. In such cases the `io_watcher` has to be queued for asynchronous
|
||||
* write.
|
||||
*/
|
||||
if (!QUEUE_EMPTY(&handle->write_queue))
|
||||
if (!uv__queue_empty(&handle->write_queue))
|
||||
uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
|
||||
} else {
|
||||
uv__io_start(handle->loop, &handle->io_watcher, POLLOUT);
|
||||
@ -1007,8 +1007,8 @@ int uv__udp_init_ex(uv_loop_t* loop,
|
||||
handle->send_queue_size = 0;
|
||||
handle->send_queue_count = 0;
|
||||
uv__io_init(&handle->io_watcher, uv__udp_io, fd);
|
||||
QUEUE_INIT(&handle->write_queue);
|
||||
QUEUE_INIT(&handle->write_completed_queue);
|
||||
uv__queue_init(&handle->write_queue);
|
||||
uv__queue_init(&handle->write_completed_queue);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -533,17 +533,17 @@ int uv_udp_recv_stop(uv_udp_t* handle) {
|
||||
|
||||
|
||||
void uv_walk(uv_loop_t* loop, uv_walk_cb walk_cb, void* arg) {
|
||||
QUEUE queue;
|
||||
QUEUE* q;
|
||||
struct uv__queue queue;
|
||||
struct uv__queue* q;
|
||||
uv_handle_t* h;
|
||||
|
||||
QUEUE_MOVE(&loop->handle_queue, &queue);
|
||||
while (!QUEUE_EMPTY(&queue)) {
|
||||
q = QUEUE_HEAD(&queue);
|
||||
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
|
||||
uv__queue_move(&loop->handle_queue, &queue);
|
||||
while (!uv__queue_empty(&queue)) {
|
||||
q = uv__queue_head(&queue);
|
||||
h = uv__queue_data(q, uv_handle_t, handle_queue);
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INSERT_TAIL(&loop->handle_queue, q);
|
||||
uv__queue_remove(q);
|
||||
uv__queue_insert_tail(&loop->handle_queue, q);
|
||||
|
||||
if (h->flags & UV_HANDLE_INTERNAL) continue;
|
||||
walk_cb(h, arg);
|
||||
@ -553,14 +553,14 @@ void uv_walk(uv_loop_t* loop, uv_walk_cb walk_cb, void* arg) {
|
||||
|
||||
static void uv__print_handles(uv_loop_t* loop, int only_active, FILE* stream) {
|
||||
const char* type;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv_handle_t* h;
|
||||
|
||||
if (loop == NULL)
|
||||
loop = uv_default_loop();
|
||||
|
||||
QUEUE_FOREACH(q, &loop->handle_queue) {
|
||||
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
|
||||
uv__queue_foreach(q, &loop->handle_queue) {
|
||||
h = uv__queue_data(q, uv_handle_t, handle_queue);
|
||||
|
||||
if (only_active && !uv__is_active(h))
|
||||
continue;
|
||||
@ -846,7 +846,7 @@ uv_loop_t* uv_loop_new(void) {
|
||||
|
||||
|
||||
int uv_loop_close(uv_loop_t* loop) {
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv_handle_t* h;
|
||||
#ifndef NDEBUG
|
||||
void* saved_data;
|
||||
@ -855,8 +855,8 @@ int uv_loop_close(uv_loop_t* loop) {
|
||||
if (uv__has_active_reqs(loop))
|
||||
return UV_EBUSY;
|
||||
|
||||
QUEUE_FOREACH(q, &loop->handle_queue) {
|
||||
h = QUEUE_DATA(q, uv_handle_t, handle_queue);
|
||||
uv__queue_foreach(q, &loop->handle_queue) {
|
||||
h = uv__queue_data(q, uv_handle_t, handle_queue);
|
||||
if (!(h->flags & UV_HANDLE_INTERNAL))
|
||||
return UV_EBUSY;
|
||||
}
|
||||
|
@ -323,7 +323,7 @@ void uv__threadpool_cleanup(void);
|
||||
(h)->loop = (loop_); \
|
||||
(h)->type = (type_); \
|
||||
(h)->flags = UV_HANDLE_REF; /* Ref the loop when active. */ \
|
||||
QUEUE_INSERT_TAIL(&(loop_)->handle_queue, &(h)->handle_queue); \
|
||||
uv__queue_insert_tail(&(loop_)->handle_queue, &(h)->handle_queue); \
|
||||
uv__handle_platform_init(h); \
|
||||
} \
|
||||
while (0)
|
||||
|
@ -255,8 +255,8 @@ int uv_loop_init(uv_loop_t* loop) {
|
||||
loop->time = 0;
|
||||
uv_update_time(loop);
|
||||
|
||||
QUEUE_INIT(&loop->wq);
|
||||
QUEUE_INIT(&loop->handle_queue);
|
||||
uv__queue_init(&loop->wq);
|
||||
uv__queue_init(&loop->handle_queue);
|
||||
loop->active_reqs.count = 0;
|
||||
loop->active_handles = 0;
|
||||
|
||||
@ -358,7 +358,7 @@ void uv__loop_close(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
uv_mutex_lock(&loop->wq_mutex);
|
||||
assert(QUEUE_EMPTY(&loop->wq) && "thread pool work queue not empty!");
|
||||
assert(uv__queue_empty(&loop->wq) && "thread pool work queue not empty!");
|
||||
assert(!uv__has_active_reqs(loop));
|
||||
uv_mutex_unlock(&loop->wq_mutex);
|
||||
uv_mutex_destroy(&loop->wq_mutex);
|
||||
|
@ -75,7 +75,7 @@
|
||||
|
||||
#define uv__handle_close(handle) \
|
||||
do { \
|
||||
QUEUE_REMOVE(&(handle)->handle_queue); \
|
||||
uv__queue_remove(&(handle)->handle_queue); \
|
||||
uv__active_handle_rm((uv_handle_t*) (handle)); \
|
||||
\
|
||||
(handle)->flags |= UV_HANDLE_CLOSED; \
|
||||
|
@ -55,7 +55,7 @@ static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
|
||||
typedef struct {
|
||||
uv__ipc_socket_xfer_type_t xfer_type;
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
QUEUE member;
|
||||
struct uv__queue member;
|
||||
} uv__ipc_xfer_queue_item_t;
|
||||
|
||||
/* IPC frame header flags. */
|
||||
@ -111,7 +111,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
||||
handle->name = NULL;
|
||||
handle->pipe.conn.ipc_remote_pid = 0;
|
||||
handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
|
||||
QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
|
||||
uv__queue_init(&handle->pipe.conn.ipc_xfer_queue);
|
||||
handle->pipe.conn.ipc_xfer_queue_length = 0;
|
||||
handle->ipc = ipc;
|
||||
handle->pipe.conn.non_overlapped_writes_tail = NULL;
|
||||
@ -637,13 +637,13 @@ void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
|
||||
if (handle->flags & UV_HANDLE_CONNECTION) {
|
||||
/* Free pending sockets */
|
||||
while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
|
||||
QUEUE* q;
|
||||
while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) {
|
||||
struct uv__queue* q;
|
||||
SOCKET socket;
|
||||
|
||||
q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
||||
q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue);
|
||||
uv__queue_remove(q);
|
||||
xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
|
||||
|
||||
/* Materialize socket and close it */
|
||||
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
@ -1062,20 +1062,20 @@ int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
||||
uv_loop_t* loop = server->loop;
|
||||
uv_pipe_t* pipe_client;
|
||||
uv_pipe_accept_t* req;
|
||||
QUEUE* q;
|
||||
struct uv__queue* q;
|
||||
uv__ipc_xfer_queue_item_t* item;
|
||||
int err;
|
||||
|
||||
if (server->ipc) {
|
||||
if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
|
||||
if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) {
|
||||
/* No valid pending sockets. */
|
||||
return WSAEWOULDBLOCK;
|
||||
}
|
||||
|
||||
q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue);
|
||||
uv__queue_remove(q);
|
||||
server->pipe.conn.ipc_xfer_queue_length--;
|
||||
item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
||||
item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member);
|
||||
|
||||
err = uv__tcp_xfer_import(
|
||||
(uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
|
||||
@ -1829,7 +1829,7 @@ static void uv__pipe_queue_ipc_xfer_info(
|
||||
item->xfer_type = xfer_type;
|
||||
item->xfer_info = *xfer_info;
|
||||
|
||||
QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
|
||||
uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member);
|
||||
handle->pipe.conn.ipc_xfer_queue_length++;
|
||||
}
|
||||
|
||||
|
@ -175,14 +175,14 @@ int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
|
||||
sock = socket(domain, SOCK_STREAM, 0);
|
||||
if (sock == INVALID_SOCKET) {
|
||||
err = WSAGetLastError();
|
||||
QUEUE_REMOVE(&handle->handle_queue);
|
||||
uv__queue_remove(&handle->handle_queue);
|
||||
return uv_translate_sys_error(err);
|
||||
}
|
||||
|
||||
err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0);
|
||||
if (err) {
|
||||
closesocket(sock);
|
||||
QUEUE_REMOVE(&handle->handle_queue);
|
||||
uv__queue_remove(&handle->handle_queue);
|
||||
return uv_translate_sys_error(err);
|
||||
}
|
||||
|
||||
|
@ -146,14 +146,14 @@ int uv__udp_init_ex(uv_loop_t* loop,
|
||||
sock = socket(domain, SOCK_DGRAM, 0);
|
||||
if (sock == INVALID_SOCKET) {
|
||||
err = WSAGetLastError();
|
||||
QUEUE_REMOVE(&handle->handle_queue);
|
||||
uv__queue_remove(&handle->handle_queue);
|
||||
return uv_translate_sys_error(err);
|
||||
}
|
||||
|
||||
err = uv__udp_set_socket(handle->loop, handle, sock, domain);
|
||||
if (err) {
|
||||
closesocket(sock);
|
||||
QUEUE_REMOVE(&handle->handle_queue);
|
||||
uv__queue_remove(&handle->handle_queue);
|
||||
return uv_translate_sys_error(err);
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@
|
||||
* The idea behind the test is as follows.
|
||||
* Certain handle types are stored in a queue internally.
|
||||
* Extra care should be taken for removal of a handle from the queue while iterating over the queue.
|
||||
* (i.e., QUEUE_REMOVE() called within QUEUE_FOREACH())
|
||||
* (i.e., uv__queue_remove() called within uv__queue_foreach())
|
||||
* This usually happens when someone closes or stops a handle from within its callback.
|
||||
* So we need to check that we haven't screwed the queue on close/stop.
|
||||
* To do so we do the following (for each handle type):
|
||||
@ -54,7 +54,8 @@
|
||||
* wrong foreach "next" |
|
||||
*
|
||||
* 4. The callback for handle #1 shouldn't be called because the handle #1 is stopped in the previous step.
|
||||
* However, if QUEUE_REMOVE() is not handled properly within QUEUE_FOREACH(), the callback _will_ be called.
|
||||
* However, if uv__queue_remove() is not handled properly within uv__queue_foreach(), the callback _will_
|
||||
* be called.
|
||||
*/
|
||||
|
||||
static const unsigned first_handle_number_idle = 2;
|
||||
|
Loading…
x
Reference in New Issue
Block a user