From 9cbdeda1d0a9074bd65da2aaf9c87b79545a1590 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 25 Oct 2017 15:00:52 -0700 Subject: fixes #45 expose aio to applications While here we added a test for the aio stuff, and cleaned up some dead code for the old fd notifications. There were a few improvements to shorten & clean code elsewhere, such as short-circuiting task wait when the task has no callback. The legacy sendmsg() and recvmsg() APIs are still in the socket core until we convert the device code to use the aios. --- src/core/aio.h | 3 +- src/core/socket.c | 67 ++++++++--------- src/core/socket.h | 12 +-- src/core/taskq.c | 3 + src/nng.c | 168 +++++++++++++++++++++++++++++++++++------- src/nng.h | 76 +++++++++++++++++++ src/protocol/bus/bus.c | 14 ++-- src/protocol/pair/pair_v0.c | 4 - src/protocol/pair/pair_v1.c | 16 ++-- src/protocol/pipeline/pull.c | 7 +- src/protocol/pipeline/push.c | 9 +-- src/protocol/pubsub/pub.c | 5 +- src/protocol/pubsub/sub.c | 5 +- src/protocol/reqrep/rep.c | 5 -- src/protocol/reqrep/req.c | 9 --- src/protocol/survey/respond.c | 13 +--- src/protocol/survey/survey.c | 5 -- 17 files changed, 276 insertions(+), 145 deletions(-) (limited to 'src') diff --git a/src/core/aio.h b/src/core/aio.h index b12fcc55..3bdcf433 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -35,7 +35,8 @@ struct nni_aio { unsigned a_expiring : 1; // expiration callback in progress unsigned a_waiting : 1; // a thread is waiting for this to finish unsigned a_synch : 1; // run completion synchronously - unsigned a_pad : 26; // ensure 32-bit alignment + unsigned a_reltime : 1; // expiration time is relative + unsigned a_pad : 25; // ensure 32-bit alignment nni_task a_task; // Read/write operations. diff --git a/src/core/socket.c b/src/core/socket.c index dfa49317..8fb8e67b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,7 +53,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; - // XXX: options + // options nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout @@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) nni_msgq_set_cb(mq, cb, fd); -#if 0 - if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { - nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); - return (NNG_ENOMEM); - } -#endif - fd->sn_init = 1; *szp = sizeof(int); memcpy(val, &fd->sn_rfd, sizeof(int)); @@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); } -void -nni_sock_send_pending(nni_sock *sock) -{ - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); - } -} - -void -nni_sock_recv_pending(nni_sock *sock) -{ - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); - } -} - static void nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; - if (s == NULL) { - return; - } - // Close any open notification pipes. if (s->s_recv_fd.sn_init) { nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); @@ -796,9 +769,32 @@ nni_sock_closeall(void) } } +static void +nni_sock_normalize_expiration(nni_aio *aio, nni_duration def) +{ + if (aio->a_reltime) { + if (aio->a_expire == (nni_time) -2) { + aio->a_expire = def; + } + switch (aio->a_expire) { + case (nni_time) 0: + aio->a_expire = NNI_TIME_ZERO; + break; + case (nni_time) -1: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_expire; + break; + } + aio->a_reltime = 0; + } +} + void nni_sock_send(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_sndtimeo); sock->s_sock_ops.sock_send(sock->s_data, aio); } @@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) void nni_sock_recv(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_rcvtimeo); sock->s_sock_ops.sock_recv(sock->s_data, aio); } @@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) expire += timeo; } - for (;;) { - nni_aio_set_timeout(aio, expire); - nni_sock_recv(sock, aio); - nni_aio_wait(aio); + nni_aio_set_timeout(aio, expire); + nni_sock_recv(sock, aio); + nni_aio_wait(aio); - rv = nni_aio_result(aio); - if (rv != 0) { - break; - } + if ((rv = nni_aio_result(aio)) == 0) { msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); *msgp = msg; - break; } + nni_aio_fini(aio); return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 6d9622e3..52310ef3 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -46,18 +46,8 @@ extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // Note that each of these should be called without any locks held, since // the socket can reenter the protocol. -// nni_sock_send_pending is called by the protocol when it enqueues -// a send operation. The main purpose of this is to clear the raised -// signal raised on the event descriptor. -extern void nni_sock_send_pending(nni_sock *); - -// nni_sock_recv_pending is called by the protocl when it enqueues -// a receive operation. The main purpose of this is to clear the raised -// signal raised on the event descriptor. -extern void nni_sock_recv_pending(nni_sock *); - // nni_socket_sendq obtains the upper writeq. The protocol should -// recieve messages from this, and place them on the appropriate pipe. +// receive messages from this, and place them on the appropriate pipe. extern nni_msgq *nni_sock_sendq(nni_sock *); // nni_socket_recvq obtains the upper readq. The protocol should diff --git a/src/core/taskq.c b/src/core/taskq.c index d0f04e8a..90cf7e22 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -192,6 +192,9 @@ nni_task_wait(nni_task *task) nni_taskq *tq = task->task_tq; int running; + if (task->task_cb == NULL) { + return; + } nni_mtx_lock(&tq->tq_mtx); for (;;) { running = 0; diff --git a/src/nng.c b/src/nng.c index 9fc99dc3..39206941 100644 --- a/src/nng.c +++ b/src/nng.c @@ -95,6 +95,18 @@ nng_peer(nng_socket sid) return (pnum); } +void * +nng_alloc(size_t sz) +{ + return (nni_alloc(sz)); +} + +void +nng_free(void *buf, size_t sz) +{ + nni_free(buf, sz); +} + int nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) { @@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_recvmsg(sock, msgp, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } - // Possibly massage nonblocking attempt. Note that nonblocking is - // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { + nng_recv_aio(sid, ap); + nng_aio_wait(ap); + + if ((rv = nng_aio_result(ap)) == 0) { + *msgp = nng_aio_get_msg(ap); + + } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } + nng_aio_free(ap); return (rv); } @@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags) return (rv); } -void * -nng_alloc(size_t sz) -{ - return (nni_alloc(sz)); -} - -void -nng_free(void *buf, size_t sz) -{ - nni_free(buf, sz); -} - int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_sendmsg(sock, msg, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } + + nng_aio_set_msg(ap, msg); + nng_send_aio(sid, ap); + nng_aio_wait(ap); + + rv = nng_aio_result(ap); + nng_aio_free(ap); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. @@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) return (rv); } +void +nng_recv_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_recv(sock, aio); + nni_sock_rele(sock); +} + +void +nng_send_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_send(sock, aio); + nni_sock_rele(sock); +} + int nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { @@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) return (nni_msg_getopt(msg, opt, ptr, szp)); } +int +nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) +{ + nni_aio *aio; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + *app = (nng_aio *) aio; + } + aio->a_expire = (nni_time) NNG_DURATION_DEFAULT; + aio->a_reltime = 1; + return (rv); +} + +void +nng_aio_free(nng_aio *ap) +{ + nni_aio_fini((nni_aio *) ap); +} + +int +nng_aio_result(nng_aio *ap) +{ + return (nni_aio_result((nni_aio *) ap)); +} + +void +nng_aio_stop(nng_aio *ap) +{ + nni_aio_stop((nni_aio *) ap); +} + +void +nng_aio_wait(nng_aio *ap) +{ + nni_aio_wait((nni_aio *) ap); +} + +void +nng_aio_cancel(nng_aio *ap) +{ + nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED); +} + +void +nng_aio_set_msg(nng_aio *ap, nng_msg *msg) +{ + nni_aio_set_msg((nni_aio *) ap, msg); +} + +nng_msg * +nng_aio_get_msg(nng_aio *ap) +{ + return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap))); +} + +void +nng_aio_set_timeout(nng_aio *ap, nng_duration dur) +{ + // Durations here are relative, since we have no notion of a + // common clock. But underlying aio uses absolute times normally. + // Fortunately the absolute times are big enough; we just have to + // make sure that we "convert" the timeout from relative time to + // absolute time when submitting operations. + nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur); +} + #if 0 int nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) diff --git a/src/nng.h b/src/nng.h index c228a6d7..e013b9df 100644 --- a/src/nng.h +++ b/src/nng.h @@ -50,6 +50,12 @@ typedef int32_t nng_duration; // in milliseconds typedef struct nng_msg nng_msg; typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; +typedef struct nng_aio nng_aio; + +// Some definitions for durations used with timeouts. +#define NNG_DURATION_INFINITE (-1) +#define NNG_DURATION_DEFAULT (-2) +#define NNG_DURATION_ZERO (0) // nng_fini is used to terminate the library, freeing certain global resources. // For most cases, this call is optional, but failure to do so may cause @@ -212,6 +218,20 @@ NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int); // can be passed off directly to nng_sendmsg. NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int); +// nng_send_aio sends data on the socket asynchronously. As with nng_send, +// the completion may be executed before the data has actually been delivered, +// but only when it is accepted for delivery. The supplied AIO must have +// been initialized, and have an associated message. The message will be +// "owned" by the socket if the operation completes successfully. Otherwise +// the caller is responsible for freeing it. +NNG_DECL void nng_send_aio(nng_socket, nng_aio *); + +// nng_recv_aio receives data on the socket asynchronously. On a successful +// result, the AIO will have an associated message, that can be obtained +// with nng_aio_get_msg(). The caller takes ownership of the message at +// this point. +NNG_DECL void nng_recv_aio(nng_socket, nng_aio *); + // nng_alloc is used to allocate memory. It's intended purpose is for // allocating memory suitable for message buffers with nng_send(). // Applications that need memory for other purposes should use their platform @@ -225,6 +245,62 @@ NNG_DECL void *nng_alloc(size_t); // calloc. NNG_DECL void nng_free(void *, size_t); +// Async IO API. AIO structures can be thought of as "handles" to +// support asynchronous operations. They contain the completion callback, and +// a pointer to consumer data. This is similar to how overlapped I/O +// works in Windows, when used with a completion callback. + +// nng_aio_alloc allocates a new AIO, and associated the completion +// callback and its opaque argument. If NULL is supplied for the +// callback, then the caller must use nng_aio_wait() to wait for the +// operation to complete. If the completion callback is not NULL, then +// when a submitted operation completes (or is canceled or fails) the +// callback will be executed, generally in a different thread, with no +// locks held. +NNG_DECL int nng_aio_alloc(nng_aio **, void (*)(void *), void *); + +// nng_aio_free frees the AIO and any associated resources. +// It *must not* be in use at the time it is freed. +NNG_DECL void nng_aio_free(nng_aio *); + +// nng_aio_stop stops any outstanding operation, and waits for the +// AIO to be free, including for the callback to have completed +// execution. Therefore the caller must NOT hold any locks that +// are acquired in the callback, or deadlock will occur. +NNG_DECL void nng_aio_stop(nng_aio *); + +// nng_aio_result returns the status/result of the operation. This +// will be zero on successful completion, or an nng error code on +// failure. +NNG_DECL int nng_aio_result(nng_aio *); + +// nng_aio_cancel attempts to cancel any in-progress I/O operation. +// The AIO callback will still be executed, but if the cancellation is +// successful then the status will be NNG_ECANCELED. +NNG_DECL void nng_aio_cancel(nng_aio *); + +// nng_aio_wait waits synchronously for any pending operation to complete. +// It also waits for the callback to have completed execution. Therefore, +// the caller of this function must not hold any locks acquired by the +// callback or deadlock may occur. +NNG_DECL void nng_aio_wait(nng_aio *); + +// nng_aio_set_msg sets the message structure to use for asynchronous +// message send operations. +NNG_DECL void nng_aio_set_msg(nng_aio *, nng_msg *); + +// nng_aio_get_msg returns the message structure associated with a completed +// receive operation. +NNG_DECL nng_msg *nng_aio_get_msg(nng_aio *); + +// nng_aio_set_timeout sets a timeout on the AIO. This should be called for +// operations that should time out after a period. The timeout should be +// either a positive number of milliseconds, or NNG_DURATION_INFINITE to +// indicate that the operation has no timeout. A poll may be done by +// specifying NNG_DURATION_ZERO. The value NNG_DURATION_DEFAULT indicates +// that any socket specific timeout should be used. +NNG_DECL void nng_aio_set_timeout(nng_aio *, nng_duration); + // Message API. NNG_DECL int nng_msg_alloc(nng_msg **, size_t); NNG_DECL void nng_msg_free(nng_msg *); diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 046aa19a..07f91602 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -37,7 +37,6 @@ static void bus_pipe_putq_cb(void *); // A bus_sock is our per-socket protocol private structure. struct bus_sock { - nni_sock *nsock; int raw; nni_aio * aio_getq; nni_list pipes; @@ -85,10 +84,9 @@ bus_sock_init(void **sp, nni_sock *nsock) bus_sock_fini(s); return (rv); } - s->nsock = nsock; - s->raw = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); *sp = s; return (0); @@ -241,7 +239,7 @@ bus_pipe_recv_cb(void *arg) nni_aio_set_msg(p->aio_putq, msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -315,7 +313,7 @@ bus_sock_getq_cb(void *arg) static void bus_sock_getq(bus_sock *s) { - nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } static void @@ -349,7 +347,6 @@ bus_sock_send(void *arg, nni_aio *aio) { bus_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -358,7 +355,6 @@ bus_sock_recv(void *arg, nni_aio *aio) { bus_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index cca03cc8..29f3b59c 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -28,7 +28,6 @@ static void pair0_pipe_fini(void *); // pair0_sock is our per-socket protocol private structure. struct pair0_sock { - nni_sock * nsock; pair0_pipe *ppipe; nni_msgq * uwq; nni_msgq * urq; @@ -58,7 +57,6 @@ pair0_sock_init(void **sp, nni_sock *nsock) return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - s->nsock = nsock; s->ppipe = NULL; s->raw = 0; s->uwq = nni_sock_sendq(nsock); @@ -248,7 +246,6 @@ pair0_sock_send(void *arg, nni_aio *aio) { pair0_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -257,7 +254,6 @@ pair0_sock_recv(void *arg, nni_aio *aio) { pair0_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index f43a4785..86ee97eb 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -29,7 +29,6 @@ static void pair1_pipe_fini(void *); // pair1_sock is our per-socket protocol private structure. struct pair1_sock { - nni_sock * nsock; nni_msgq * uwq; nni_msgq * urq; int raw; @@ -89,13 +88,12 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (rv); } - s->nsock = nsock; - s->raw = 0; - s->poly = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; + s->raw = 0; + s->poly = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; return (0); } @@ -451,7 +449,6 @@ pair1_sock_send(void *arg, nni_aio *aio) { pair1_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -460,7 +457,6 @@ pair1_sock_recv(void *arg, nni_aio *aio) { pair1_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 7dd0c8ed..267352c5 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -26,7 +26,6 @@ static void pull_putq(pull_pipe *, nni_msg *); struct pull_sock { nni_msgq *urq; int raw; - nni_sock *sock; }; // A pull_pipe is our per-pipe protocol private structure. @@ -45,9 +44,8 @@ pull_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->urq = nni_sock_recvq(sock); - s->sock = sock; + s->raw = 0; + s->urq = nni_sock_recvq(sock); *sp = s; return (0); @@ -198,7 +196,6 @@ pull_sock_recv(void *arg, nni_aio *aio) { pull_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index af7b80ca..995ac56d 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -28,7 +28,6 @@ static void push_getq_cb(void *); struct push_sock { nni_msgq *uwq; int raw; - nni_sock *sock; }; // An nni_push_pipe is our per-pipe protocol private structure. @@ -50,10 +49,9 @@ push_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->sock = sock; - s->uwq = nni_sock_sendq(sock); - *sp = s; + s->raw = 0; + s->uwq = nni_sock_sendq(sock); + *sp = s; return (0); } @@ -209,7 +207,6 @@ push_sock_send(void *arg, nni_aio *aio) { push_sock *s = arg; - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 4604d0ff..29863322 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -30,7 +30,6 @@ static void pub_pipe_fini(void *); // A pub_sock is our per-socket protocol private structure. struct pub_sock { - nni_sock *sock; nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -75,8 +74,7 @@ pub_sock_init(void **sp, nni_sock *sock) return (rv); } - s->sock = sock; - s->raw = 0; + s->raw = 0; NNI_LIST_INIT(&s->pipes, pub_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -291,7 +289,6 @@ pub_sock_send(void *arg, nni_aio *aio) { pub_sock *s = arg; - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 323bbb2f..d87b42ec 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -36,7 +36,6 @@ struct sub_topic { // An nni_rep_sock is our per-socket protocol private structure. struct sub_sock { - nni_sock *sock; nni_list topics; nni_msgq *urq; int raw; @@ -61,8 +60,7 @@ sub_sock_init(void **sp, nni_sock *sock) } nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub_topic, node); - s->sock = sock; - s->raw = 0; + s->raw = 0; s->urq = nni_sock_recvq(sock); *sp = s; @@ -296,7 +294,6 @@ sub_sock_recv(void *arg, nni_aio *aio) { sub_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 510beab3..5d924e32 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -29,7 +29,6 @@ static void rep_pipe_fini(void *); // A rep_sock is our per-socket protocol private structure. struct rep_sock { - nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -84,7 +83,6 @@ rep_sock_init(void **sp, nni_sock *sock) } s->ttl = 8; // Per RFC - s->sock = sock; s->raw = 0; s->btrace = NULL; s->btrace_len = 0; @@ -420,7 +418,6 @@ rep_sock_send(void *arg, nni_aio *aio) if (s->raw) { // Pass thru nni_mtx_unlock(&s->lk); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -447,7 +444,6 @@ rep_sock_send(void *arg, nni_aio *aio) s->btrace_len = 0; nni_mtx_unlock(&s->lk); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } @@ -456,7 +452,6 @@ rep_sock_recv(void *arg, nni_aio *aio) { rep_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 81abd306..1ab49f3e 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -28,7 +28,6 @@ static void req_pipe_fini(void *); // A req_sock is our per-socket protocol private structure. struct req_sock { - nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -90,7 +89,6 @@ req_sock_init(void **sp, nni_sock *sock) // this is "semi random" start for request IDs. s->nextid = nni_random(); s->retry = NNI_SECOND * 60; - s->sock = sock; s->reqmsg = NULL; s->raw = 0; s->wantw = 0; @@ -512,7 +510,6 @@ req_sock_send(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if (s->raw) { nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -536,11 +533,6 @@ req_sock_send(void *arg, nni_aio *aio) return; } - // XXX: I think we should just not do this... and leave the - // socket "permanently writeable". This does screw up all the - // backpressure. - // nni_sock_send_pending(s->sock); - // If another message is there, this cancels it. if (s->reqmsg != NULL) { nni_msg_free(s->reqmsg); @@ -619,7 +611,6 @@ req_sock_recv(void *arg, nni_aio *aio) } } nni_mtx_unlock(&s->mtx); - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 4c3ea8e3..dbce0751 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -29,7 +29,6 @@ static void resp_pipe_fini(void *); // A resp_sock is our per-socket protocol private structure. struct resp_sock { - nni_sock * nsock; nni_msgq * urq; nni_msgq * uwq; int raw; @@ -85,7 +84,6 @@ resp_sock_init(void **sp, nni_sock *nsock) } s->ttl = 8; // Per RFC - s->nsock = nsock; s->raw = 0; s->btrace = NULL; s->btrace_len = 0; @@ -268,9 +266,9 @@ resp_send_cb(void *arg) static void resp_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq; + resp_pipe *p = arg; + resp_sock *s = p->psock; + nni_msgq * urq = s->urq; nni_msg * msg; int hops; int rv; @@ -279,8 +277,6 @@ resp_recv_cb(void *arg) goto error; } - urq = nni_sock_recvq(s->nsock); - msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); @@ -384,7 +380,6 @@ resp_sock_send(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if (s->raw) { nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -413,7 +408,6 @@ resp_sock_send(void *arg, nni_aio *aio) s->btrace_len = 0; nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -454,7 +448,6 @@ resp_sock_recv(void *arg, nni_aio *aio) { resp_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 1c15054f..f44cd63a 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -28,7 +28,6 @@ static void surv_timeout(void *); // A surv_sock is our per-socket protocol private structure. struct surv_sock { - nni_sock * nsock; nni_duration survtime; nni_time expire; int raw; @@ -84,7 +83,6 @@ surv_sock_init(void **sp, nni_sock *nsock) nni_timer_init(&s->timer, surv_timeout, s); s->nextid = nni_random(); - s->nsock = nsock; s->raw = 0; s->survtime = NNI_SECOND * 60; s->expire = NNI_TIME_ZERO; @@ -362,7 +360,6 @@ surv_sock_recv(void *arg, nni_aio *aio) return; } nni_mtx_unlock(&s->mtx); - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } @@ -378,7 +375,6 @@ surv_sock_send(void *arg, nni_aio *aio) // No automatic retry, and the request ID must // be in the header coming down. nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -404,7 +400,6 @@ surv_sock_send(void *arg, nni_aio *aio) nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } -- cgit v1.2.3-70-g09d2