diff --git a/Makefile.am b/Makefile.am index c9db48a1..5630e5f7 100644 --- a/Makefile.am +++ b/Makefile.am @@ -196,6 +196,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-write-to-half-open-connection.c \ test/test-tcp-writealot.c \ test/test-tcp-try-write.c \ + test/test-tcp-write-queue-order.c \ test/test-thread.c \ test/test-threadpool-cancel.c \ test/test-threadpool.c \ diff --git a/src/unix/stream.c b/src/unix/stream.c index db25187f..ae7880c3 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -60,6 +60,7 @@ static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); static void uv__read(uv_stream_t* stream); static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); +static void uv__write_callbacks(uv_stream_t* stream); static size_t uv__write_req_size(uv_write_t* req); @@ -378,33 +379,12 @@ void uv__stream_destroy(uv_stream_t* stream) { QUEUE_REMOVE(q); req = QUEUE_DATA(q, uv_write_t, queue); - uv__req_unregister(stream->loop, req); + req->error = -ECANCELED; - if (req->bufs != req->bufsml) - free(req->bufs); - req->bufs = NULL; - - if (req->cb != NULL) - req->cb(req, -ECANCELED); + QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); } - while (!QUEUE_EMPTY(&stream->write_completed_queue)) { - q = QUEUE_HEAD(&stream->write_completed_queue); - QUEUE_REMOVE(q); - - req = QUEUE_DATA(q, uv_write_t, queue); - uv__req_unregister(stream->loop, req); - - if (req->bufs != NULL) { - stream->write_queue_size -= uv__write_req_size(req); - if (req->bufs != req->bufsml) - free(req->bufs); - req->bufs = NULL; - } - - if (req->cb) - req->cb(req, req->error); - } + uv__write_callbacks(stream); if (stream->shutdown_req) { /* The ECANCELED error code is a lie, the shutdown(2) syscall is a @@ -416,6 +396,8 @@ void uv__stream_destroy(uv_stream_t* stream) { stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED); stream->shutdown_req = NULL; } + + assert(stream->write_queue_size == 0); } @@ -891,10 +873,6 @@ static void uv__write_callbacks(uv_stream_t* stream) { } assert(QUEUE_EMPTY(&stream->write_completed_queue)); - - /* Write queue drained. */ - if (QUEUE_EMPTY(&stream->write_queue)) - uv__drain(stream); } @@ -1211,6 +1189,10 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) { uv__write(stream); uv__write_callbacks(stream); + + /* Write queue drained. */ + if (QUEUE_EMPTY(&stream->write_queue)) + uv__drain(stream); } } diff --git a/test/test-list.h b/test/test-list.h index 70c388c2..a62c11e4 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -56,6 +56,7 @@ TEST_DECLARE (delayed_accept) TEST_DECLARE (multiple_listen) TEST_DECLARE (tcp_writealot) TEST_DECLARE (tcp_try_write) +TEST_DECLARE (tcp_write_queue_order) TEST_DECLARE (tcp_open) TEST_DECLARE (tcp_connect_error_after_write) TEST_DECLARE (tcp_shutdown_after_write) @@ -332,6 +333,8 @@ TASK_LIST_START TEST_ENTRY (tcp_try_write) + TEST_ENTRY (tcp_write_queue_order) + TEST_ENTRY (tcp_open) TEST_HELPER (tcp_open, tcp4_echo_server) diff --git a/test/test-tcp-write-queue-order.c b/test/test-tcp-write-queue-order.c new file mode 100644 index 00000000..f7981137 --- /dev/null +++ b/test/test-tcp-write-queue-order.c @@ -0,0 +1,140 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include +#include +#include + +#include "uv.h" +#include "task.h" + +#define REQ_COUNT 100000 + +static uv_timer_t timer; +static uv_tcp_t server; +static uv_tcp_t client; +static uv_tcp_t incoming; +static int connect_cb_called; +static int close_cb_called; +static int connection_cb_called; +static int write_callbacks; +static int write_cancelled_callbacks; +static int write_error_callbacks; + +static uv_write_t write_requests[REQ_COUNT]; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +void timer_cb(uv_timer_t* handle) { + uv_close((uv_handle_t*) &client, close_cb); + uv_close((uv_handle_t*) &server, close_cb); + uv_close((uv_handle_t*) &incoming, close_cb); +} + +void write_cb(uv_write_t* req, int status) { + if (status == 0) + write_callbacks++; + else if (status == UV_ECANCELED) + write_cancelled_callbacks++; + else + write_error_callbacks++; +} + +static void connect_cb(uv_connect_t* req, int status) { + static char base[1024]; + int r; + int i; + uv_buf_t buf; + + ASSERT(status == 0); + connect_cb_called++; + + buf = uv_buf_init(base, sizeof(base)); + + for (i = 0; i < REQ_COUNT; i++) { + r = uv_write(&write_requests[i], + req->handle, + &buf, + 1, + write_cb); + ASSERT(r == 0); + } +} + + +static void connection_cb(uv_stream_t* tcp, int status) { + ASSERT(status == 0); + + ASSERT(0 == uv_tcp_init(tcp->loop, &incoming)); + ASSERT(0 == uv_accept(tcp, (uv_stream_t*) &incoming)); + + connection_cb_called++; +} + + +static void start_server(void) { + struct sockaddr_in addr; + + ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + + ASSERT(0 == uv_tcp_init(uv_default_loop(), &server)); + ASSERT(0 == uv_tcp_bind(&server, (struct sockaddr*) &addr, 0)); + ASSERT(0 == uv_listen((uv_stream_t*) &server, 128, connection_cb)); +} + + +TEST_IMPL(tcp_write_queue_order) { + uv_connect_t connect_req; + struct sockaddr_in addr; + + start_server(); + + ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + ASSERT(0 == uv_tcp_init(uv_default_loop(), &client)); + ASSERT(0 == uv_tcp_connect(&connect_req, + &client, + (struct sockaddr*) &addr, + connect_cb)); + + ASSERT(0 == uv_timer_init(uv_default_loop(), &timer)); + ASSERT(0 == uv_timer_start(&timer, timer_cb, 100, 0)); + + ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + + ASSERT(connect_cb_called == 1); + ASSERT(connection_cb_called == 1); + ASSERT(write_callbacks > 0); +#ifndef _WIN32 + /* on Windows we never get ECANCELED */ + ASSERT(write_cancelled_callbacks > 0); +#endif + ASSERT(write_callbacks + + write_error_callbacks + + write_cancelled_callbacks == REQ_COUNT); + ASSERT(close_cb_called == 3); + + MAKE_VALGRIND_HAPPY(); + return 0; +} diff --git a/uv.gyp b/uv.gyp index 31236d8e..780ab090 100644 --- a/uv.gyp +++ b/uv.gyp @@ -378,6 +378,7 @@ 'test/test-tcp-try-write.c', 'test/test-tcp-unexpected-read.c', 'test/test-tcp-read-stop.c', + 'test/test-tcp-write-queue-order.c', 'test/test-threadpool.c', 'test/test-threadpool-cancel.c', 'test/test-mutexes.c',