diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 67 |
1 files changed, 30 insertions, 37 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index dfa49317..8fb8e67b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,7 +53,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; - // XXX: options + // options nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout @@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) nni_msgq_set_cb(mq, cb, fd); -#if 0 - if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { - nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); - return (NNG_ENOMEM); - } -#endif - fd->sn_init = 1; *szp = sizeof(int); memcpy(val, &fd->sn_rfd, sizeof(int)); @@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); } -void -nni_sock_send_pending(nni_sock *sock) -{ - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); - } -} - -void -nni_sock_recv_pending(nni_sock *sock) -{ - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); - } -} - static void nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; - if (s == NULL) { - return; - } - // Close any open notification pipes. if (s->s_recv_fd.sn_init) { nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); @@ -796,9 +769,32 @@ nni_sock_closeall(void) } } +static void +nni_sock_normalize_expiration(nni_aio *aio, nni_duration def) +{ + if (aio->a_reltime) { + if (aio->a_expire == (nni_time) -2) { + aio->a_expire = def; + } + switch (aio->a_expire) { + case (nni_time) 0: + aio->a_expire = NNI_TIME_ZERO; + break; + case (nni_time) -1: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_expire; + break; + } + aio->a_reltime = 0; + } +} + void nni_sock_send(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_sndtimeo); sock->s_sock_ops.sock_send(sock->s_data, aio); } @@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) void nni_sock_recv(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_rcvtimeo); sock->s_sock_ops.sock_recv(sock->s_data, aio); } @@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) expire += timeo; } - for (;;) { - nni_aio_set_timeout(aio, expire); - nni_sock_recv(sock, aio); - nni_aio_wait(aio); + nni_aio_set_timeout(aio, expire); + nni_sock_recv(sock, aio); + nni_aio_wait(aio); - rv = nni_aio_result(aio); - if (rv != 0) { - break; - } + if ((rv = nni_aio_result(aio)) == 0) { msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); *msgp = msg; - break; } + nni_aio_fini(aio); return (rv); } |
