mirror of
https://github.com/libuv/libuv
synced 2025-03-28 21:13:16 +00:00
unix: handle passing kind of working
This commit is contained in:
parent
bb6b629e6a
commit
c920db9fd1
@ -29,6 +29,8 @@
|
||||
#include <string.h>
|
||||
#include <sys/uio.h>
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
|
||||
static void uv__stream_connect(uv_stream_t*);
|
||||
static void uv__write(uv_stream_t* stream);
|
||||
@ -476,12 +478,17 @@ static void uv__write_callbacks(uv_stream_t* stream) {
|
||||
static void uv__read(uv_stream_t* stream) {
|
||||
uv_buf_t buf;
|
||||
ssize_t nread;
|
||||
struct msghdr msg;
|
||||
struct cmsghdr* cmsg;
|
||||
char cmsg_space[64];
|
||||
int received_fd = -1;
|
||||
struct ev_loop* ev = stream->loop->ev;
|
||||
|
||||
/* XXX: Maybe instead of having UV_READING we just test if
|
||||
* tcp->read_cb is NULL or not?
|
||||
*/
|
||||
while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) {
|
||||
while ((stream->read_cb || stream->read2_cb) &&
|
||||
stream->flags & UV_READING) {
|
||||
assert(stream->alloc_cb);
|
||||
buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
|
||||
|
||||
@ -489,10 +496,29 @@ static void uv__read(uv_stream_t* stream) {
|
||||
assert(buf.base);
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
do {
|
||||
nread = read(stream->fd, buf.base, buf.len);
|
||||
if (stream->read_cb) {
|
||||
do {
|
||||
nread = read(stream->fd, buf.base, buf.len);
|
||||
}
|
||||
while (nread < 0 && errno == EINTR);
|
||||
} else {
|
||||
assert(stream->read2_cb);
|
||||
/* read2_cb uses recvmsg */
|
||||
msg.msg_flags = 0;
|
||||
msg.msg_iov = (struct iovec*) &buf;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_name = NULL;
|
||||
msg.msg_namelen = 0;
|
||||
/* Set up to receive a descriptor even if one isn't in the message */
|
||||
msg.msg_controllen = 64;
|
||||
msg.msg_control = (void *) cmsg_space;
|
||||
|
||||
do {
|
||||
nread = recvmsg(stream->fd, &msg, 0);
|
||||
}
|
||||
while (nread < 0 && errno == EINTR);
|
||||
}
|
||||
while (nread < 0 && errno == EINTR);
|
||||
|
||||
|
||||
if (nread < 0) {
|
||||
/* Error */
|
||||
@ -502,24 +528,73 @@ static void uv__read(uv_stream_t* stream) {
|
||||
ev_io_start(ev, &stream->read_watcher);
|
||||
}
|
||||
uv__set_sys_error(stream->loop, EAGAIN);
|
||||
stream->read_cb(stream, 0, buf);
|
||||
|
||||
if (stream->read_cb) {
|
||||
stream->read_cb(stream, 0, buf);
|
||||
} else {
|
||||
stream->read2_cb((uv_pipe_t*)stream, 0, buf, UV_UNKNOWN_HANDLE);
|
||||
}
|
||||
|
||||
return;
|
||||
} else {
|
||||
/* Error. User should call uv_close(). */
|
||||
uv__set_sys_error(stream->loop, errno);
|
||||
stream->read_cb(stream, -1, buf);
|
||||
|
||||
if (stream->read_cb) {
|
||||
stream->read_cb(stream, -1, buf);
|
||||
} else {
|
||||
stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE);
|
||||
}
|
||||
|
||||
assert(!ev_is_active(&stream->read_watcher));
|
||||
return;
|
||||
}
|
||||
|
||||
} else if (nread == 0) {
|
||||
/* EOF */
|
||||
uv__set_artificial_error(stream->loop, UV_EOF);
|
||||
ev_io_stop(ev, &stream->read_watcher);
|
||||
stream->read_cb(stream, -1, buf);
|
||||
|
||||
if (stream->read_cb) {
|
||||
stream->read_cb(stream, -1, buf);
|
||||
} else {
|
||||
stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE);
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
/* Successful read */
|
||||
stream->read_cb(stream, nread, buf);
|
||||
|
||||
if (stream->read_cb) {
|
||||
stream->read_cb(stream, nread, buf);
|
||||
} else {
|
||||
assert(stream->read2_cb);
|
||||
|
||||
/*
|
||||
* XXX: Some implementations can send multiple file descriptors in a
|
||||
* single message. We should be using CMSG_NXTHDR() to walk the
|
||||
* chain to get at them all. This would require changing the API to
|
||||
* hand these back up the caller, is a pain.
|
||||
*/
|
||||
|
||||
for (cmsg = CMSG_FIRSTHDR(&msg);
|
||||
msg.msg_controllen > 0 && cmsg != NULL;
|
||||
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
|
||||
|
||||
if (cmsg->cmsg_type == SCM_RIGHTS) {
|
||||
if (stream->accepted_fd != -1) {
|
||||
fprintf(stderr, "(libuv) ignoring extra FD received\n");
|
||||
}
|
||||
|
||||
stream->accepted_fd = *(int *) CMSG_DATA(cmsg);
|
||||
|
||||
} else {
|
||||
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
|
||||
cmsg->cmsg_type);
|
||||
}
|
||||
}
|
||||
|
||||
stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -780,8 +855,8 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
|
||||
}
|
||||
|
||||
|
||||
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
uv_read_cb read_cb) {
|
||||
int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
uv_read_cb read_cb, uv_read2_cb read2_cb) {
|
||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
|
||||
stream->type == UV_TTY);
|
||||
|
||||
@ -803,6 +878,7 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
assert(alloc_cb);
|
||||
|
||||
stream->read_cb = read_cb;
|
||||
stream->read2_cb = read2_cb;
|
||||
stream->alloc_cb = alloc_cb;
|
||||
|
||||
/* These should have been set by uv_tcp_init. */
|
||||
@ -813,13 +889,15 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
}
|
||||
|
||||
|
||||
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
uv_read_cb read_cb) {
|
||||
return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
|
||||
}
|
||||
|
||||
|
||||
int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
uv_read2_cb read_cb) {
|
||||
int r;
|
||||
r = uv_read_start(stream, alloc_cb, NULL);
|
||||
assert(stream->read_cb == NULL);
|
||||
stream->read2_cb = read_cb;
|
||||
return r;
|
||||
return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
|
||||
}
|
||||
|
||||
|
||||
|
@ -45,8 +45,7 @@ static void ipc_on_connection(uv_stream_t* server, int status) {
|
||||
static void exit_cb(uv_process_t* process, int exit_status, int term_signal) {
|
||||
printf("exit_cb\n");
|
||||
exit_cb_called++;
|
||||
ASSERT(exit_status == 1);
|
||||
ASSERT(term_signal == 0);
|
||||
ASSERT(exit_status == 0);
|
||||
uv_close((uv_handle_t*)process, NULL);
|
||||
}
|
||||
|
||||
@ -60,6 +59,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
|
||||
uv_handle_type pending) {
|
||||
int r;
|
||||
uv_buf_t outbuf;
|
||||
uv_err_t err;
|
||||
|
||||
if (nread == 0) {
|
||||
/* Everything OK, but nothing read. */
|
||||
@ -67,6 +67,17 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread < 0) {
|
||||
err = uv_last_error(pipe->loop);
|
||||
if (err.code == UV_EOF) {
|
||||
free(buf.base);
|
||||
return;
|
||||
}
|
||||
|
||||
printf("error recving on channel: %s\n", uv_strerror(err));
|
||||
abort();
|
||||
}
|
||||
|
||||
ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE);
|
||||
read2_cb_called++;
|
||||
|
||||
@ -82,7 +93,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
|
||||
ASSERT(r == 0);
|
||||
|
||||
/* Make sure that the expected data is correctly multiplexed. */
|
||||
ASSERT(memcmp("hello\n", buf.base, buf.len) == 0);
|
||||
ASSERT(memcmp("hello\n", buf.base, nread) == 0);
|
||||
fprintf(stderr, "got %d bytes\n", (int)nread);
|
||||
|
||||
outbuf = uv_buf_init("world\n", 6);
|
||||
|
Loading…
x
Reference in New Issue
Block a user