aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/socket.c67
-rw-r--r--src/core/socket.h12
-rw-r--r--src/core/taskq.c3
4 files changed, 36 insertions, 49 deletions
diff --git a/src/core/aio.h b/src/core/aio.h
index b12fcc55..3bdcf433 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -35,7 +35,8 @@ struct nni_aio {
unsigned a_expiring : 1; // expiration callback in progress
unsigned a_waiting : 1; // a thread is waiting for this to finish
unsigned a_synch : 1; // run completion synchronously
- unsigned a_pad : 26; // ensure 32-bit alignment
+ unsigned a_reltime : 1; // expiration time is relative
+ unsigned a_pad : 25; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
diff --git a/src/core/socket.c b/src/core/socket.c
index dfa49317..8fb8e67b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,7 +53,7 @@ struct nni_socket {
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
- // XXX: options
+ // options
nni_duration s_linger; // linger time
nni_duration s_sndtimeo; // send timeout
nni_duration s_rcvtimeo; // receive timeout
@@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
nni_msgq_set_cb(mq, cb, fd);
-#if 0
- if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
- nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
- return (NNG_ENOMEM);
- }
-#endif
-
fd->sn_init = 1;
*szp = sizeof(int);
memcpy(val, &fd->sn_rfd, sizeof(int));
@@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
}
-void
-nni_sock_send_pending(nni_sock *sock)
-{
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
- }
-}
-
-void
-nni_sock_recv_pending(nni_sock *sock)
-{
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
- }
-}
-
static void
nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
- if (s == NULL) {
- return;
- }
-
// Close any open notification pipes.
if (s->s_recv_fd.sn_init) {
nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
@@ -796,9 +769,32 @@ nni_sock_closeall(void)
}
}
+static void
+nni_sock_normalize_expiration(nni_aio *aio, nni_duration def)
+{
+ if (aio->a_reltime) {
+ if (aio->a_expire == (nni_time) -2) {
+ aio->a_expire = def;
+ }
+ switch (aio->a_expire) {
+ case (nni_time) 0:
+ aio->a_expire = NNI_TIME_ZERO;
+ break;
+ case (nni_time) -1:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_expire;
+ break;
+ }
+ aio->a_reltime = 0;
+ }
+}
+
void
nni_sock_send(nni_sock *sock, nni_aio *aio)
{
+ nni_sock_normalize_expiration(aio, sock->s_sndtimeo);
sock->s_sock_ops.sock_send(sock->s_data, aio);
}
@@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
void
nni_sock_recv(nni_sock *sock, nni_aio *aio)
{
+ nni_sock_normalize_expiration(aio, sock->s_rcvtimeo);
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
@@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
expire += timeo;
}
- for (;;) {
- nni_aio_set_timeout(aio, expire);
- nni_sock_recv(sock, aio);
- nni_aio_wait(aio);
+ nni_aio_set_timeout(aio, expire);
+ nni_sock_recv(sock, aio);
+ nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- if (rv != 0) {
- break;
- }
+ if ((rv = nni_aio_result(aio)) == 0) {
msg = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
*msgp = msg;
- break;
}
+
nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 6d9622e3..52310ef3 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -46,18 +46,8 @@ extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// Note that each of these should be called without any locks held, since
// the socket can reenter the protocol.
-// nni_sock_send_pending is called by the protocol when it enqueues
-// a send operation. The main purpose of this is to clear the raised
-// signal raised on the event descriptor.
-extern void nni_sock_send_pending(nni_sock *);
-
-// nni_sock_recv_pending is called by the protocl when it enqueues
-// a receive operation. The main purpose of this is to clear the raised
-// signal raised on the event descriptor.
-extern void nni_sock_recv_pending(nni_sock *);
-
// nni_socket_sendq obtains the upper writeq. The protocol should
-// recieve messages from this, and place them on the appropriate pipe.
+// receive messages from this, and place them on the appropriate pipe.
extern nni_msgq *nni_sock_sendq(nni_sock *);
// nni_socket_recvq obtains the upper readq. The protocol should
diff --git a/src/core/taskq.c b/src/core/taskq.c
index d0f04e8a..90cf7e22 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -192,6 +192,9 @@ nni_task_wait(nni_task *task)
nni_taskq *tq = task->task_tq;
int running;
+ if (task->task_cb == NULL) {
+ return;
+ }
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
running = 0;