1
0
mirror of https://github.com/libuv/libuv synced 2025-03-28 21:13:16 +00:00

API changes - unix broken

This commit is contained in:
Bert Belder 2011-04-07 04:51:59 +02:00
parent ea149dd2d7
commit 0d85eb252b
7 changed files with 336 additions and 309 deletions

325
ol-win.c
View File

@ -82,14 +82,6 @@
/* The request is currently queued. */
#define OL_REQ_PENDING 0x01
/* When STRAY is set, that means that the handle owning the ol_req */
/* struct was destroyed while the old_req was queued to an iocp */
#define OL_REQ_STRAY 0x02
/* When INTERNAL is set that means that the ol_req struct was */
/* allocated by libol, so libol also needs to free it again */
#define OL_REQ_INTERNAL 0x04
/*
* Pointers to winsock extension functions that have to be retrieved dynamically
*/
@ -212,39 +204,41 @@ void ol_init() {
}
void ol_req_init(ol_req *req, void *cb) {
req->_.flags = 0;
void ol_req_init(ol_req* req, ol_handle* handle, void *cb) {
req->type = OL_UNKNOWN_REQ;
req->flags = 0;
req->handle = handle;
req->cb = cb;
}
ol_req* ol_overlapped_to_req(OVERLAPPED* overlapped) {
return CONTAINING_RECORD(overlapped, ol_req, _.overlapped);
return CONTAINING_RECORD(overlapped, ol_req, overlapped);
}
int ol_set_socket_options(ol_handle *handle) {
int ol_set_socket_options(SOCKET socket) {
DWORD yes = 1;
/* Set the SO_REUSEADDR option on the socket */
/* If it fails, soit. */
setsockopt(handle->_.socket,
setsockopt(socket,
SOL_SOCKET,
SO_REUSEADDR,
(char*)&yes,
sizeof(int));
/* Make the socket non-inheritable */
if (!SetHandleInformation(handle->_.handle, HANDLE_FLAG_INHERIT, 0)) {
if (!SetHandleInformation((HANDLE)socket, HANDLE_FLAG_INHERIT, 0)) {
ol_errno_ = GetLastError();
return -1;
}
/* Associate it with the I/O completion port. */
/* Use ol_handle pointer as completion key. */
if (CreateIoCompletionPort(handle->_.handle,
if (CreateIoCompletionPort((HANDLE)socket,
ol_iocp_,
(ULONG_PTR)handle,
(ULONG_PTR)socket,
0) == NULL) {
ol_errno_ = GetLastError();
return -1;
@ -254,63 +248,82 @@ int ol_set_socket_options(ol_handle *handle) {
}
ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data) {
ol_handle* handle;
handle = (ol_handle*)malloc(sizeof(ol_handle));
int ol_tcp_handle_init(ol_handle *handle, ol_close_cb close_cb, void* data) {
handle->close_cb = close_cb;
handle->data = data;
handle->type = OL_TCP;
handle->_.flags = 0;
handle->_.reqs_pending = 0;
handle->_.error = 0;
handle->flags = 0;
handle->reqs_pending = 0;
handle->error = 0;
handle->accept_data = NULL;
handle->_.socket = socket(AF_INET, SOCK_STREAM, 0);
if (handle->_.socket == INVALID_SOCKET) {
handle->socket = socket(AF_INET, SOCK_STREAM, 0);
if (handle->socket == INVALID_SOCKET) {
ol_errno_ = WSAGetLastError();
free(handle);
return NULL;
return -1;
}
if (ol_set_socket_options(handle) != 0) {
closesocket(handle->_.socket);
free(handle);
return NULL;
if (ol_set_socket_options(handle->socket) != 0) {
closesocket(handle->socket);
return -1;
}
ol_refs_++;
return handle;
return 0;
}
int ol_tcp_handle_accept(ol_handle* server, ol_handle* client, ol_close_cb close_cb, void* data) {
if (!server->accept_data ||
server->accept_data->socket == INVALID_SOCKET) {
ol_errno_ = WSAENOTCONN;
return -1;
}
client->close_cb = close_cb;
client->data = data;
client->type = OL_TCP;
client->socket = server->accept_data->socket;
client->flags = 0;
client->reqs_pending = 0;
client->error = 0;
client->accept_data = NULL;
server->accept_data->socket = INVALID_SOCKET;
ol_refs_++;
return 0;
}
int ol_close_error(ol_handle* handle, ol_err e) {
ol_req *req;
if (handle->_.flags & OL_HANDLE_CLOSING)
if (handle->flags & OL_HANDLE_CLOSING)
return 0;
handle->_.error = e;
handle->error = e;
switch (handle->type) {
case OL_TCP:
closesocket(handle->_.socket);
if (handle->_.reqs_pending == 0) {
closesocket(handle->socket);
if (handle->reqs_pending == 0) {
/* If there are no operations queued for this socket, queue one */
/* manually, so ol_poll will call close_cb. */
req = (ol_req*)malloc(sizeof(*req));
req->handle = handle;
req->type = OL_CLOSE;
req->_.flags = 0;
if (!PostQueuedCompletionStatus(ol_iocp_, 0, (ULONG_PTR)handle, &req->_.overlapped))
req->flags = 0;
if (!PostQueuedCompletionStatus(ol_iocp_, 0, (ULONG_PTR)handle, &req->overlapped))
ol_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
req->_.flags |= OL_REQ_PENDING;
handle->_.reqs_pending++;
req->flags |= OL_REQ_PENDING;
handle->reqs_pending++;
}
/* After all packets to come out, ol_poll will call close_cb. */
handle->_.flags |= OL_HANDLE_CLOSING;
handle->flags |= OL_HANDLE_CLOSING;
return 0;
default:
@ -326,12 +339,6 @@ int ol_close(ol_handle* handle) {
}
void ol_free(ol_handle* handle) {
free(handle);
ol_refs_--;
}
struct sockaddr_in ol_ip4_addr(char *ip, int port) {
struct sockaddr_in addr;
@ -355,7 +362,7 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) {
return -1;
}
if (bind(handle->_.socket, addr, addrsize) == SOCKET_ERROR) {
if (bind(handle->socket, addr, addrsize) == SOCKET_ERROR) {
ol_errno_ = WSAGetLastError();
return -1;
}
@ -364,77 +371,93 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) {
}
void ol_queue_accept(ol_handle *handle, ol_req *req) {
ol_handle* peer;
void *buffer;
void ol_queue_accept(ol_handle *handle) {
ol_accept_data* data;
BOOL success;
DWORD bytes;
peer = ol_tcp_handle_new(NULL, NULL);
if (peer == NULL) {
/* destroy ourselves */
data = handle->accept_data;
assert(data != NULL);
data->socket = socket(AF_INET, SOCK_STREAM, 0);
if (data->socket == INVALID_SOCKET) {
ol_close_error(handle, WSAGetLastError());
return;
}
if (ol_set_socket_options(data->socket) != 0) {
closesocket(data->socket);
ol_close_error(handle, ol_errno_);
return;
}
/* AcceptEx specifies that the buffer must be big enough to at least hold */
/* two socket addresses plus 32 bytes. */
buffer = malloc(sizeof(struct sockaddr_storage) * 2 + 32);
/* Prepare the ol_req and OVERLAPPED structures. */
assert(!(req->_.flags & OL_REQ_PENDING));
req->_.flags |= OL_REQ_PENDING;
req->data = (void*)peer;
memset(&req->_.overlapped, 0, sizeof(req->_.overlapped));
assert(!(data->req.flags & OL_REQ_PENDING));
data->req.flags |= OL_REQ_PENDING;
memset(&data->req.overlapped, 0, sizeof(data->req.overlapped));
success = pAcceptEx(handle->_.socket,
peer->_.socket,
buffer,
success = pAcceptEx(handle->socket,
data->socket,
(void*)&data->buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
&bytes,
&req->_.overlapped);
&data->req.overlapped);
if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
ol_errno_ = WSAGetLastError();
/* destroy the preallocated client handle */
ol_close(peer);
ol_free(peer);
closesocket(data->socket);
/* destroy ourselves */
ol_close_error(handle, ol_errno_);
return;
}
handle->_.reqs_pending++;
req->_.flags |= OL_REQ_PENDING;
handle->reqs_pending++;
data->req.flags |= OL_REQ_PENDING;
}
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) {
ol_req* req;
ol_accept_data *data;
if (listen(handle->_.socket, backlog) == SOCKET_ERROR)
if (handle->accept_data != NULL) {
/* Already listening. */
ol_errno_ = WSAEALREADY;
return -1;
}
handle->accept_cb = cb;
req = (ol_req*)malloc(sizeof(*req));
req->type = OL_ACCEPT;
req->handle = handle;
req->_.flags = OL_REQ_INTERNAL;
data = (ol_accept_data*)malloc(sizeof(*data));
if (!data) {
ol_errno_ = WSAENOBUFS;
return -1;
}
data->socket = INVALID_SOCKET;
ol_req_init(&data->req, handle, (void*)cb);
data->req.type = OL_ACCEPT;
ol_queue_accept(handle, req);
if (listen(handle->socket, backlog) == SOCKET_ERROR) {
ol_errno_ = WSAGetLastError();
free(data);
return -1;
}
handle->accept_data = data;
ol_queue_accept(handle);
return 0;
}
int ol_connect(ol_handle* handle, ol_req *req, struct sockaddr* addr) {
int ol_connect(ol_req* req, struct sockaddr* addr) {
int addrsize;
BOOL success;
DWORD bytes;
ol_handle* handle = req->handle;
assert(!(req->_.flags & OL_REQ_PENDING));
assert(!(req->flags & OL_REQ_PENDING));
if (addr->sa_family == AF_INET) {
addrsize = sizeof(struct sockaddr_in);
@ -445,101 +468,96 @@ int ol_connect(ol_handle* handle, ol_req *req, struct sockaddr* addr) {
return -1;
}
memset(&req->_.overlapped, 0, sizeof(req->_.overlapped));
req->handle = handle;
memset(&req->overlapped, 0, sizeof(req->overlapped));
req->type = OL_CONNECT;
success = pConnectEx(handle->_.socket,
success = pConnectEx(handle->socket,
addr,
addrsize,
NULL,
0,
&bytes,
&req->_.overlapped);
&req->overlapped);
if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
ol_errno_ = WSAGetLastError();
return -1;
}
req->_.flags |= OL_REQ_PENDING;
handle->_.reqs_pending++;
req->flags |= OL_REQ_PENDING;
handle->reqs_pending++;
return 0;
}
int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
int ol_write(ol_req *req, ol_buf* bufs, int bufcnt) {
int result;
DWORD bytes;
ol_handle* handle = req->handle;
assert(!(req->_.flags & OL_REQ_PENDING));
assert(!(req->flags & OL_REQ_PENDING));
memset(&req->_.overlapped, 0, sizeof(req->_.overlapped));
req->handle = handle;
memset(&req->overlapped, 0, sizeof(req->overlapped));
req->type = OL_WRITE;
result = WSASend(handle->_.socket,
result = WSASend(handle->socket,
(WSABUF*)bufs,
bufcnt,
&bytes,
0,
&req->_.overlapped,
&req->overlapped,
NULL);
if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) {
ol_errno_ = WSAGetLastError();
return -1;
}
req->_.flags |= OL_REQ_PENDING;
handle->_.reqs_pending++;
req->flags |= OL_REQ_PENDING;
handle->reqs_pending++;
return 0;
}
int ol_read(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
int ol_read(ol_req *req, ol_buf* bufs, int bufcnt) {
int result;
DWORD bytes, flags;
ol_handle* handle = req->handle;
assert(!(req->_.flags & OL_REQ_PENDING));
assert(!(req->flags & OL_REQ_PENDING));
memset(&req->_.overlapped, 0, sizeof(req->_.overlapped));
req->handle = handle;
memset(&req->overlapped, 0, sizeof(req->overlapped));
req->type = OL_READ;
flags = 0;
result = WSARecv(handle->_.socket,
result = WSARecv(handle->socket,
(WSABUF*)bufs,
bufcnt,
&bytes,
&flags,
&req->_.overlapped,
&req->overlapped,
NULL);
if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) {
ol_errno_ = WSAGetLastError();
return -1;
}
req->_.flags |= OL_REQ_PENDING;
handle->_.reqs_pending++;
req->flags |= OL_REQ_PENDING;
handle->reqs_pending++;
return 0;
}
int ol_write2(ol_handle* handle, const char* msg) {
ol_req *req;
int ol_write2(ol_req *req, const char* msg) {
ol_buf buf;
req = (ol_req*)malloc(sizeof(*req));
req->_.flags = OL_REQ_INTERNAL;
req->cb = NULL;
ol_handle* handle = req->handle;
buf.base = (char*)msg;
buf.len = strlen(msg);
return ol_write(handle, req, &buf, 1);
return ol_write(req, &buf, 1);
}
@ -555,8 +573,7 @@ void ol_poll() {
OVERLAPPED* overlapped;
ol_req* req;
ol_handle* handle;
ol_handle *peer;
int free_req;
ol_accept_data *data;
success = GetQueuedCompletionStatus(ol_iocp_,
&bytes,
@ -571,89 +588,81 @@ void ol_poll() {
handle = req->handle;
/* Mark the request non-pending */
req->_.flags &= ~OL_REQ_PENDING;
handle->_.reqs_pending--;
/* Cache this value, because when the req is not internal the callback */
/* might free the req structure, so we cannot look at the flags field */
/* after the callback has been called. */
free_req = req->_.flags & OL_REQ_INTERNAL;
req->flags &= ~OL_REQ_PENDING;
handle->reqs_pending--;
/* If the related socket got closed in the meantime, disregard this */
/* result. If it is an internal request, free it. If this is the last */
/* request pending, close the handle's close callback. */
if (handle->_.flags & OL_HANDLE_CLOSING) {
if (req->type == OL_ACCEPT) {
peer = (ol_handle*)req->data;
ol_close(peer);
ol_free(peer);
}
if (free_req) {
free(req);
}
if (handle->_.reqs_pending == 0) {
handle->_.flags |= OL_HANDLE_CLOSED;
if (handle->close_cb)
handle->close_cb(handle, handle->_.error);
/* result. If this is the last request pending, call the handle's close callback. */
if (handle->flags & OL_HANDLE_CLOSING) {
if (handle->reqs_pending == 0) {
handle->flags |= OL_HANDLE_CLOSED;
if (handle->accept_data) {
if (handle->accept_data) {
if (handle->accept_data->socket) {
closesocket(handle->accept_data->socket);
}
free(handle->accept_data);
handle->accept_data = NULL;
}
}
if (handle->close_cb) {
handle->close_cb(handle, handle->error);
}
ol_refs_--;
}
return;
}
switch (req->type) {
case OL_WRITE:
success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE);
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
ol_close_error(handle, GetLastError());
} else if (req->cb) {
((ol_write_cb)req->cb)(req);
}
if (free_req) {
free(req);
}
return;
case OL_READ:
handle = (ol_handle*)key;
success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE);
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
((ol_close_cb)req->cb)(handle, GetLastError());
ol_close_error(handle, GetLastError());
} else if (req->cb) {
((ol_read_cb)req->cb)(req, bytes);
}
if (free_req) {
free(req);
}
break;
case OL_ACCEPT:
peer = (ol_handle*)req->data;
handle = (ol_handle*)key;
success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE);
if (success && handle->accept_cb) {
handle->accept_cb(handle, peer);
} else {
/* Ignore failed accept if the listen socket is still healthy */
ol_close(peer);
ol_free(peer);
data = handle->accept_data;
assert(data != NULL);
assert(data->socket != INVALID_SOCKET);
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (success && req->cb) {
((ol_accept_cb)req->cb)(handle);
}
/* accept_cb should call ol_accept_handle which sets data->socket */
/* to INVALID_SOCKET. */
/* Just ignore failed accept if the listen socket is still healthy. */
if (data->socket != INVALID_SOCKET) {
closesocket(handle->socket);
data->socket = INVALID_SOCKET;
}
/* Queue another accept */
ol_queue_accept(handle, req);
ol_queue_accept(handle);
return;
case OL_CONNECT:
if (req->cb) {
handle = (ol_handle*)key;
success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE);
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (success) {
((ol_connect_cb)req->cb)(req, 0);
} else {
((ol_connect_cb)req->cb)(req, GetLastError());
}
}
if (free_req) {
free(req);
}
return;
case OL_CLOSE:

View File

@ -14,18 +14,29 @@ typedef struct _ol_buf {
char* base;
} ol_buf;
typedef struct {
struct ol_req_s {
struct ol_req_shared_s;
OVERLAPPED overlapped;
int flags;
} ol_req_private;
};
typedef struct {
ol_req req;
SOCKET socket;
/* AcceptEx specifies that the buffer must be big enough to at least hold */
/* two socket addresses plus 32 bytes. */
char buffer[sizeof(struct sockaddr_storage) * 2 + 32];
} ol_accept_data;
struct ol_handle_s {
struct ol_handle_shared_s;
union {
SOCKET socket;
HANDLE handle;
};
ol_accept_data *accept_data;
unsigned int flags;
unsigned int reqs_pending;
ol_err error;
} ol_handle_private;
};

92
ol.h
View File

@ -4,7 +4,6 @@
#include <stddef.h> /* size_t */
typedef int ol_err; /* FIXME */
typedef struct ol_req_s ol_req;
@ -12,19 +11,12 @@ typedef struct ol_handle_s ol_handle;
typedef void (*ol_read_cb)(ol_req* req, size_t nread);
typedef void (*ol_write_cb)(ol_req* req);
typedef void (*ol_accept_cb)(ol_handle* server, ol_handle* new_client);
typedef void (*ol_accept_cb)(ol_handle* handle);
typedef void (*ol_close_cb)(ol_handle* handle, ol_err e);
typedef void (*ol_connect_cb)(ol_req* req, ol_err e);
typedef void (*ol_shutdown_cb)(ol_req* req);
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "ol-unix.h"
#else
# include "ol-win.h"
#endif
typedef enum {
OL_UNKNOWN_HANDLE = 0,
OL_TCP,
@ -33,19 +25,6 @@ typedef enum {
OL_FILE,
} ol_handle_type;
struct ol_handle_s {
/* read-only */
ol_handle_type type;
/* private */
ol_handle_private _;
/* public */
ol_accept_cb accept_cb;
ol_close_cb close_cb;
void* data;
};
typedef enum {
OL_UNKNOWN_REQ = 0,
OL_CONNECT,
@ -57,18 +36,31 @@ typedef enum {
} ol_req_type;
struct ol_req_s {
struct ol_handle_shared_s {
/* read-only */
ol_handle_type type;
/* public */
ol_close_cb close_cb;
void* data;
};
struct ol_req_shared_s {
/* read-only */
ol_req_type type;
ol_handle* handle;
/* private */
ol_req_private _;
/* public */
ol_handle* handle;
void* cb;
void *data;
void* data;
};
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "ol-unix.h"
#else
# include "ol-win.h"
#endif
/**
* Most functions return boolean: 0 for success and -1 for failure.
* On error the user should then call ol_last_error() to determine
@ -81,36 +73,36 @@ const char* ol_err_str(ol_err err);
void ol_init();
int ol_run();
void ol_req_init(ol_req* req, void* cb);
void ol_req_init(ol_req* req, ol_handle* handle, void* cb);
ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data);
/* TODO:
* ol_named_pipe_handle_new
* ol_file_handle_new
* ol_tty_handle_new
/*
* TODO:
* - ol_(pipe|pipe_tty)_handle_init
* - ol_bind_pipe(char *name)
* - ol_continuous_read(ol_handle *handle, ol_continuous_read_cb *cb)
* - A way to list cancelled ol_reqs after before/on ol_close_cb
*/
/* TCP server methods. */
int ol_bind(ol_handle* handle, struct sockaddr* addr);
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb);
/* TCP socket methods. */
int ol_connect(ol_handle* handle, ol_req* req, struct sockaddr* addr);
int ol_read(ol_handle* handle, ol_req* req, ol_buf* bufs, int bufcnt);
int ol_write(ol_handle* handle, ol_req* req, ol_buf* bufs, int bufcnt);
int ol_write2(ol_handle* handle, const char* msg);
int ol_shutdown(ol_handle* handle, ol_req* req);
/* Handle and callback bust be set by calling ol_req_init. */
int ol_tcp_handle_init(ol_handle *handle, ol_close_cb close_cb, void* data);
int ol_bind(ol_handle* handle, struct sockaddr* addr);
int ol_connect(ol_req* req, struct sockaddr* addr);
int ol_shutdown(ol_req* req);
/* Request handle to be closed. close_cb will be made */
/* synchronously during this call. */
/* TCP server methods. */
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb);
int ol_tcp_handle_accept(ol_handle* server, ol_handle* client, ol_close_cb close_cb, void* data);
/* Generic handle methods */
int ol_read(ol_req* req, ol_buf* bufs, int bufcnt);
int ol_write(ol_req* req, ol_buf* bufs, int bufcnt);
int ol_write2(ol_req *req, const char* msg);
/* Request handle to be closed. close_cb will be called */
/* asynchronously after this call. */
int ol_close(ol_handle* handle);
/* Must be called for all handles after close_cb. Handles that arrive
* via the accept_cb must use ol_free().
*/
void ol_free(ol_handle* handle);
/* Utility */
struct sockaddr_in ol_ip4_addr(char* ip, int port);

View File

@ -3,50 +3,55 @@
#include <stdio.h>
#include <stdlib.h>
#define BUFSIZE 1024
typedef struct {
ol_handle* handle;
ol_handle handle;
ol_req req;
ol_buf buf;
char read_buffer[BUFSIZE];
} peer_t;
ol_handle server;
void after_write(ol_req* req);
void after_read(ol_req* req, size_t nread);
void try_read(peer_t* peer);
void on_close(ol_handle* peer, ol_err err);
void on_accept(ol_handle* server, ol_handle* new_client);
ol_handle *server = NULL;
void on_accept(ol_handle* handle);
void after_write(ol_req* req) {
peer_t *peer = (peer_t*) req->data;
peer_t* peer = (peer_t*) req->data;
try_read(peer);
}
void after_read(ol_req* req, size_t nread) {
peer_t* peer;
int r;
if (nread == 0) {
ol_close(req->handle);
} else {
peer_t *peer = (peer_t*) req->data;
peer = (peer_t*) req->data;
peer->buf.len = nread;
peer->req.cb = after_write;
ol_write(peer->handle, &peer->req, &peer->buf, 1);
ol_req_init(&peer->req, &peer->handle, after_write);
peer->req.data = peer;
r = ol_write(&peer->req, &peer->buf, 1);
assert(!r);
}
}
void try_read(peer_t* peer) {
int r;
peer->buf.len = BUFSIZE;
peer->req.cb = after_read;
ol_read(peer->handle, &peer->req, &peer->buf, 1);
ol_req_init(&peer->req, &peer->handle, after_read);
peer->req.data = peer;
r = ol_read(&peer->req, &peer->buf, 1);
assert(!r);
}
@ -54,36 +59,28 @@ void on_close(ol_handle* peer, ol_err err) {
if (err) {
fprintf(stdout, "Socket error\n");
}
ol_free(peer);
}
void on_accept(ol_handle* server, ol_handle* new_client) {
peer_t* p;
void on_accept(ol_handle* server) {
peer_t* p = (peer_t*)malloc(sizeof(peer_t));
int r;
new_client->close_cb = on_close;
r = ol_tcp_handle_accept(server, &p->handle, on_close, (void*)p);
assert(!r);
p = (peer_t*)malloc(sizeof(peer_t));
p->handle = new_client;
p->buf.base = p->read_buffer;
p->buf.len = BUFSIZE;
p->req.data = p;
ol_req_init(&p->req, NULL);
p->buf.base = (char*)&p->read_buffer;
try_read(p);
}
void on_server_close(ol_handle* handle, ol_err err) {
assert(handle == server);
assert(handle == &server);
if (err) {
fprintf(stdout, "Socket error\n");
}
ol_free(server);
server = NULL;
}
@ -91,17 +88,21 @@ int echo_start(int port) {
struct sockaddr_in addr = ol_ip4_addr("0.0.0.0", port);
int r;
assert(server == NULL);
server = ol_tcp_handle_new(&on_server_close, NULL);
r = ol_tcp_handle_init(&server, on_server_close, NULL);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Socket creation error\n");
return 1;
}
r = ol_bind(server, (struct sockaddr*) &addr);
r = ol_bind(&server, (struct sockaddr*) &addr);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Bind error\n");
return 1;
}
r = ol_listen(server, 128, on_accept);
r = ol_listen(&server, 128, on_accept);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");
@ -113,8 +114,7 @@ int echo_start(int port) {
int echo_stop() {
assert(server != NULL);
return ol_close(server);
return ol_close(&server);
}

View File

@ -10,18 +10,21 @@ void close_cb(ol_handle *handle, ol_err e) {
assert("ol_close error" && e == 0);
assert("ol_close_cb not called from a fresh stack" && nested == 0);
close_cb_called++;
ol_free(handle);
}
TEST_IMPL(close_cb_stack) {
ol_handle *handle;
ol_handle handle;
int r;
ol_init();
handle = ol_tcp_handle_new(&close_cb, NULL);
r = ol_tcp_handle_init(&handle, &close_cb, NULL);
assert(!r);
nested++;
ol_close(handle);
r = ol_close(&handle);
assert(!r);
nested--;
ol_run();

View File

@ -5,121 +5,132 @@
#include <stdio.h>
static int completed_pingers = 0;
static ol_req connect_req;
#define NUM_PINGS 50
#define NUM_PINGS 1000
/* 64 bytes is enough for a pinger */
#define BUFSIZE 64
#define BUFSIZE 10240
static char PING[] = "PING\n";
static char* PING = "PING\n";
typedef struct {
int pongs;
int state;
ol_handle* handle;
ol_req req;
ol_handle handle;
ol_req connect_req;
ol_req read_req;
ol_buf buf;
char read_buffer[BUFSIZE];
} pinger;
} pinger_t;
void pinger_try_read(pinger_t* pinger);
void pinger_try_read(pinger* pinger);
void pinger_on_close(ol_handle* handle, ol_err err) {
pinger* p;
pinger_t* pinger = (pinger_t*)handle->data;
assert(!err);
p = (pinger*)handle->data;
assert(NUM_PINGS == p->pongs);
free(p);
ol_free(handle);
assert(NUM_PINGS == pinger->pongs);
free(pinger);
completed_pingers++;
}
void pinger_after_write(ol_req *req) {
free(req);
}
void pinger_write_ping(pinger_t* pinger) {
ol_req *req;
int r;
req = (ol_req*)malloc(sizeof(*req));
ol_req_init(req, &pinger->handle, pinger_after_write);
r = ol_write2(req, (char*)&PING);
assert(!r);
}
void pinger_after_read(ol_req* req, size_t nread) {
unsigned int i;
int r;
pinger* p;
pinger_t* pinger;
pinger = (pinger_t*)req->handle->data;
if (nread == 0) {
ol_close(req->handle);
ol_close(&pinger->handle);
return;
}
p = (pinger*)req->data;
/* Now we count the pings */
for (i = 0; i < nread; i++) {
assert(p->buf.base[i] == PING[p->state]);
/* 5 = strlen(PING) */
p->state = (p->state + 1) % 5;
if (p->state == 0) {
p->pongs++;
if (p->pongs < NUM_PINGS) {
r = ol_write2(p->handle, PING);
assert(!r);
assert(pinger->buf.base[i] == PING[pinger->state]);
pinger->state = (pinger->state + 1) % (sizeof(PING) - 1);
if (pinger->state == 0) {
pinger->pongs++;
if (pinger->pongs < NUM_PINGS) {
pinger_write_ping(pinger);
} else {
ol_close(p->handle);
ol_close(&pinger->handle);
return;
}
}
}
pinger_try_read(p);
pinger_try_read(pinger);
}
void pinger_try_read(pinger* pinger) {
pinger->buf.len = BUFSIZE;
pinger->req.cb = pinger_after_read;
ol_read(pinger->handle, &pinger->req, &pinger->buf, 1);
void pinger_try_read(pinger_t* pinger) {
ol_read(&pinger->read_req, &pinger->buf, 1);
}
void pinger_on_connect(ol_req *req, ol_err err) {
ol_handle *handle = req->handle;
pinger *p;
int r;
pinger_t *pinger = (pinger_t*)req->handle->data;
if (err) {
/* error */
assert(0);
}
p = calloc(sizeof(pinger), 1);
p->handle = handle;
p->buf.base = p->read_buffer;
p->buf.len = BUFSIZE;
p->req.data = p;
handle->data = p;
pinger_try_read(p);
r = ol_write2(handle, PING);
if (r < 0) {
/* error */
assert(0);
}
pinger_try_read(pinger);
pinger_write_ping(pinger);
}
int pinger_connect(int port) {
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
ol_handle* handle = ol_tcp_handle_new(pinger_on_close, NULL);
int pinger_new(int port) {
struct sockaddr_in client_addr = ol_ip4_addr("0.0.0.0", 0);
struct sockaddr_in server_addr = ol_ip4_addr("127.0.0.1", TEST_PORT);
struct sockaddr_in server_addr = ol_ip4_addr("145.94.50.9", TEST_PORT);
pinger_t *pinger;
ol_bind(handle, (struct sockaddr*)&client_addr);
ol_req_init(&connect_req, &pinger_on_connect);
return ol_connect(handle, &connect_req, (struct sockaddr*)&server_addr);
pinger = (pinger_t*)malloc(sizeof(*pinger));
pinger->state = 0;
pinger->pongs = 0;
pinger->buf.len = sizeof(pinger->read_buffer);
pinger->buf.base = (char*)&pinger->read_buffer;
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
if (ol_tcp_handle_init(&pinger->handle, pinger_on_close, (void*)pinger)) {
return -1;
}
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
ol_req_init(&pinger->connect_req, &pinger->handle, pinger_on_connect);
ol_req_init(&pinger->read_req, &pinger->handle, pinger_after_read);
ol_bind(&pinger->handle, (struct sockaddr*)&client_addr);
return ol_connect(&pinger->connect_req, (struct sockaddr*)&server_addr);
}
TEST_IMPL(ping_pong) {
ol_init();
if (pinger_connect(8000)) {
if (pinger_new(8000)) {
return 2;
}

View File

@ -8,5 +8,6 @@
int run_##name()
#define TEST_PORT 8123
#define TEST_PORT_2 8124
#endif /* TEST_H_ */