aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c67
1 files changed, 30 insertions, 37 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index dfa49317..8fb8e67b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,7 +53,7 @@ struct nni_socket {
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
- // XXX: options
+ // options
nni_duration s_linger; // linger time
nni_duration s_sndtimeo; // send timeout
nni_duration s_rcvtimeo; // receive timeout
@@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
nni_msgq_set_cb(mq, cb, fd);
-#if 0
- if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
- nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
- return (NNG_ENOMEM);
- }
-#endif
-
fd->sn_init = 1;
*szp = sizeof(int);
memcpy(val, &fd->sn_rfd, sizeof(int));
@@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
}
-void
-nni_sock_send_pending(nni_sock *sock)
-{
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
- }
-}
-
-void
-nni_sock_recv_pending(nni_sock *sock)
-{
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
- }
-}
-
static void
nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
- if (s == NULL) {
- return;
- }
-
// Close any open notification pipes.
if (s->s_recv_fd.sn_init) {
nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
@@ -796,9 +769,32 @@ nni_sock_closeall(void)
}
}
+static void
+nni_sock_normalize_expiration(nni_aio *aio, nni_duration def)
+{
+ if (aio->a_reltime) {
+ if (aio->a_expire == (nni_time) -2) {
+ aio->a_expire = def;
+ }
+ switch (aio->a_expire) {
+ case (nni_time) 0:
+ aio->a_expire = NNI_TIME_ZERO;
+ break;
+ case (nni_time) -1:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_expire;
+ break;
+ }
+ aio->a_reltime = 0;
+ }
+}
+
void
nni_sock_send(nni_sock *sock, nni_aio *aio)
{
+ nni_sock_normalize_expiration(aio, sock->s_sndtimeo);
sock->s_sock_ops.sock_send(sock->s_data, aio);
}
@@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
void
nni_sock_recv(nni_sock *sock, nni_aio *aio)
{
+ nni_sock_normalize_expiration(aio, sock->s_rcvtimeo);
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
@@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
expire += timeo;
}
- for (;;) {
- nni_aio_set_timeout(aio, expire);
- nni_sock_recv(sock, aio);
- nni_aio_wait(aio);
+ nni_aio_set_timeout(aio, expire);
+ nni_sock_recv(sock, aio);
+ nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- if (rv != 0) {
- break;
- }
+ if ((rv = nni_aio_result(aio)) == 0) {
msg = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
*msgp = msg;
- break;
}
+
nni_aio_fini(aio);
return (rv);
}