diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 67 | ||||
| -rw-r--r-- | src/core/socket.h | 12 | ||||
| -rw-r--r-- | src/core/taskq.c | 3 |
4 files changed, 36 insertions, 49 deletions
diff --git a/src/core/aio.h b/src/core/aio.h index b12fcc55..3bdcf433 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -35,7 +35,8 @@ struct nni_aio { unsigned a_expiring : 1; // expiration callback in progress unsigned a_waiting : 1; // a thread is waiting for this to finish unsigned a_synch : 1; // run completion synchronously - unsigned a_pad : 26; // ensure 32-bit alignment + unsigned a_reltime : 1; // expiration time is relative + unsigned a_pad : 25; // ensure 32-bit alignment nni_task a_task; // Read/write operations. diff --git a/src/core/socket.c b/src/core/socket.c index dfa49317..8fb8e67b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,7 +53,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; - // XXX: options + // options nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout @@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) nni_msgq_set_cb(mq, cb, fd); -#if 0 - if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { - nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); - return (NNG_ENOMEM); - } -#endif - fd->sn_init = 1; *szp = sizeof(int); memcpy(val, &fd->sn_rfd, sizeof(int)); @@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); } -void -nni_sock_send_pending(nni_sock *sock) -{ - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); - } -} - -void -nni_sock_recv_pending(nni_sock *sock) -{ - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); - } -} - static void nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; - if (s == NULL) { - return; - } - // Close any open notification pipes. if (s->s_recv_fd.sn_init) { nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); @@ -796,9 +769,32 @@ nni_sock_closeall(void) } } +static void +nni_sock_normalize_expiration(nni_aio *aio, nni_duration def) +{ + if (aio->a_reltime) { + if (aio->a_expire == (nni_time) -2) { + aio->a_expire = def; + } + switch (aio->a_expire) { + case (nni_time) 0: + aio->a_expire = NNI_TIME_ZERO; + break; + case (nni_time) -1: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_expire; + break; + } + aio->a_reltime = 0; + } +} + void nni_sock_send(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_sndtimeo); sock->s_sock_ops.sock_send(sock->s_data, aio); } @@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) void nni_sock_recv(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_rcvtimeo); sock->s_sock_ops.sock_recv(sock->s_data, aio); } @@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) expire += timeo; } - for (;;) { - nni_aio_set_timeout(aio, expire); - nni_sock_recv(sock, aio); - nni_aio_wait(aio); + nni_aio_set_timeout(aio, expire); + nni_sock_recv(sock, aio); + nni_aio_wait(aio); - rv = nni_aio_result(aio); - if (rv != 0) { - break; - } + if ((rv = nni_aio_result(aio)) == 0) { msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); *msgp = msg; - break; } + nni_aio_fini(aio); return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 6d9622e3..52310ef3 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -46,18 +46,8 @@ extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // Note that each of these should be called without any locks held, since // the socket can reenter the protocol. -// nni_sock_send_pending is called by the protocol when it enqueues -// a send operation. The main purpose of this is to clear the raised -// signal raised on the event descriptor. -extern void nni_sock_send_pending(nni_sock *); - -// nni_sock_recv_pending is called by the protocl when it enqueues -// a receive operation. The main purpose of this is to clear the raised -// signal raised on the event descriptor. -extern void nni_sock_recv_pending(nni_sock *); - // nni_socket_sendq obtains the upper writeq. The protocol should -// recieve messages from this, and place them on the appropriate pipe. +// receive messages from this, and place them on the appropriate pipe. extern nni_msgq *nni_sock_sendq(nni_sock *); // nni_socket_recvq obtains the upper readq. The protocol should diff --git a/src/core/taskq.c b/src/core/taskq.c index d0f04e8a..90cf7e22 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -192,6 +192,9 @@ nni_task_wait(nni_task *task) nni_taskq *tq = task->task_tq; int running; + if (task->task_cb == NULL) { + return; + } nni_mtx_lock(&tq->tq_mtx); for (;;) { running = 0; |
