diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/aio.h | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 67 | ||||
| -rw-r--r-- | src/core/socket.h | 12 | ||||
| -rw-r--r-- | src/core/taskq.c | 3 | ||||
| -rw-r--r-- | src/nng.c | 168 | ||||
| -rw-r--r-- | src/nng.h | 76 | ||||
| -rw-r--r-- | src/protocol/bus/bus.c | 14 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v0.c | 4 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v1.c | 16 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 7 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 5 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 9 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 13 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 5 |
17 files changed, 276 insertions, 145 deletions
diff --git a/src/core/aio.h b/src/core/aio.h index b12fcc55..3bdcf433 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -35,7 +35,8 @@ struct nni_aio { unsigned a_expiring : 1; // expiration callback in progress unsigned a_waiting : 1; // a thread is waiting for this to finish unsigned a_synch : 1; // run completion synchronously - unsigned a_pad : 26; // ensure 32-bit alignment + unsigned a_reltime : 1; // expiration time is relative + unsigned a_pad : 25; // ensure 32-bit alignment nni_task a_task; // Read/write operations. diff --git a/src/core/socket.c b/src/core/socket.c index dfa49317..8fb8e67b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,7 +53,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; - // XXX: options + // options nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout @@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) nni_msgq_set_cb(mq, cb, fd); -#if 0 - if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { - nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); - return (NNG_ENOMEM); - } -#endif - fd->sn_init = 1; *szp = sizeof(int); memcpy(val, &fd->sn_rfd, sizeof(int)); @@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); } -void -nni_sock_send_pending(nni_sock *sock) -{ - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); - } -} - -void -nni_sock_recv_pending(nni_sock *sock) -{ - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); - } -} - static void nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; - if (s == NULL) { - return; - } - // Close any open notification pipes. if (s->s_recv_fd.sn_init) { nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd); @@ -796,9 +769,32 @@ nni_sock_closeall(void) } } +static void +nni_sock_normalize_expiration(nni_aio *aio, nni_duration def) +{ + if (aio->a_reltime) { + if (aio->a_expire == (nni_time) -2) { + aio->a_expire = def; + } + switch (aio->a_expire) { + case (nni_time) 0: + aio->a_expire = NNI_TIME_ZERO; + break; + case (nni_time) -1: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_expire; + break; + } + aio->a_reltime = 0; + } +} + void nni_sock_send(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_sndtimeo); sock->s_sock_ops.sock_send(sock->s_data, aio); } @@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) void nni_sock_recv(nni_sock *sock, nni_aio *aio) { + nni_sock_normalize_expiration(aio, sock->s_rcvtimeo); sock->s_sock_ops.sock_recv(sock->s_data, aio); } @@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) expire += timeo; } - for (;;) { - nni_aio_set_timeout(aio, expire); - nni_sock_recv(sock, aio); - nni_aio_wait(aio); + nni_aio_set_timeout(aio, expire); + nni_sock_recv(sock, aio); + nni_aio_wait(aio); - rv = nni_aio_result(aio); - if (rv != 0) { - break; - } + if ((rv = nni_aio_result(aio)) == 0) { msg = nni_aio_get_msg(aio); nni_aio_set_msg(aio, NULL); *msgp = msg; - break; } + nni_aio_fini(aio); return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 6d9622e3..52310ef3 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -46,18 +46,8 @@ extern void nni_sock_ep_remove(nni_sock *, nni_ep *); // Note that each of these should be called without any locks held, since // the socket can reenter the protocol. -// nni_sock_send_pending is called by the protocol when it enqueues -// a send operation. The main purpose of this is to clear the raised -// signal raised on the event descriptor. -extern void nni_sock_send_pending(nni_sock *); - -// nni_sock_recv_pending is called by the protocl when it enqueues -// a receive operation. The main purpose of this is to clear the raised -// signal raised on the event descriptor. -extern void nni_sock_recv_pending(nni_sock *); - // nni_socket_sendq obtains the upper writeq. The protocol should -// recieve messages from this, and place them on the appropriate pipe. +// receive messages from this, and place them on the appropriate pipe. extern nni_msgq *nni_sock_sendq(nni_sock *); // nni_socket_recvq obtains the upper readq. The protocol should diff --git a/src/core/taskq.c b/src/core/taskq.c index d0f04e8a..90cf7e22 100644 --- a/src/core/taskq.c +++ b/src/core/taskq.c @@ -192,6 +192,9 @@ nni_task_wait(nni_task *task) nni_taskq *tq = task->task_tq; int running; + if (task->task_cb == NULL) { + return; + } nni_mtx_lock(&tq->tq_mtx); for (;;) { running = 0; @@ -95,6 +95,18 @@ nng_peer(nng_socket sid) return (pnum); } +void * +nng_alloc(size_t sz) +{ + return (nni_alloc(sz)); +} + +void +nng_free(void *buf, size_t sz) +{ + nni_free(buf, sz); +} + int nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) { @@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_recvmsg(sock, msgp, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } - // Possibly massage nonblocking attempt. Note that nonblocking is - // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { + nng_recv_aio(sid, ap); + nng_aio_wait(ap); + + if ((rv = nng_aio_result(ap)) == 0) { + *msgp = nng_aio_get_msg(ap); + + } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } + nng_aio_free(ap); return (rv); } @@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags) return (rv); } -void * -nng_alloc(size_t sz) -{ - return (nni_alloc(sz)); -} - -void -nng_free(void *buf, size_t sz) -{ - nni_free(buf, sz); -} - int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { - int rv; - nni_sock *sock; + int rv; + nng_aio *ap; - if ((rv = nni_sock_find(&sock, sid)) != 0) { + if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) { return (rv); } - rv = nni_sock_sendmsg(sock, msg, flags); - nni_sock_rele(sock); + if (flags & NNG_FLAG_NONBLOCK) { + nng_aio_set_timeout(ap, NNG_DURATION_ZERO); + } else { + nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT); + } + + nng_aio_set_msg(ap, msg); + nng_send_aio(sid, ap); + nng_aio_wait(ap); + + rv = nng_aio_result(ap); + nng_aio_free(ap); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. @@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) return (rv); } +void +nng_recv_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_recv(sock, aio); + nni_sock_rele(sock); +} + +void +nng_send_aio(nng_socket sid, nng_aio *ap) +{ + nni_aio * aio = (nni_aio *) ap; + nni_sock *sock; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_sock_send(sock, aio); + nni_sock_rele(sock); +} + int nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { @@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp) return (nni_msg_getopt(msg, opt, ptr, szp)); } +int +nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg) +{ + nni_aio *aio; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) { + *app = (nng_aio *) aio; + } + aio->a_expire = (nni_time) NNG_DURATION_DEFAULT; + aio->a_reltime = 1; + return (rv); +} + +void +nng_aio_free(nng_aio *ap) +{ + nni_aio_fini((nni_aio *) ap); +} + +int +nng_aio_result(nng_aio *ap) +{ + return (nni_aio_result((nni_aio *) ap)); +} + +void +nng_aio_stop(nng_aio *ap) +{ + nni_aio_stop((nni_aio *) ap); +} + +void +nng_aio_wait(nng_aio *ap) +{ + nni_aio_wait((nni_aio *) ap); +} + +void +nng_aio_cancel(nng_aio *ap) +{ + nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED); +} + +void +nng_aio_set_msg(nng_aio *ap, nng_msg *msg) +{ + nni_aio_set_msg((nni_aio *) ap, msg); +} + +nng_msg * +nng_aio_get_msg(nng_aio *ap) +{ + return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap))); +} + +void +nng_aio_set_timeout(nng_aio *ap, nng_duration dur) +{ + // Durations here are relative, since we have no notion of a + // common clock. But underlying aio uses absolute times normally. + // Fortunately the absolute times are big enough; we just have to + // make sure that we "convert" the timeout from relative time to + // absolute time when submitting operations. + nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur); +} + #if 0 int nng_snapshot_create(nng_socket sock, nng_snapshot **snapp) @@ -50,6 +50,12 @@ typedef int32_t nng_duration; // in milliseconds typedef struct nng_msg nng_msg; typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; +typedef struct nng_aio nng_aio; + +// Some definitions for durations used with timeouts. +#define NNG_DURATION_INFINITE (-1) +#define NNG_DURATION_DEFAULT (-2) +#define NNG_DURATION_ZERO (0) // nng_fini is used to terminate the library, freeing certain global resources. // For most cases, this call is optional, but failure to do so may cause @@ -212,6 +218,20 @@ NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int); // can be passed off directly to nng_sendmsg. NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int); +// nng_send_aio sends data on the socket asynchronously. As with nng_send, +// the completion may be executed before the data has actually been delivered, +// but only when it is accepted for delivery. The supplied AIO must have +// been initialized, and have an associated message. The message will be +// "owned" by the socket if the operation completes successfully. Otherwise +// the caller is responsible for freeing it. +NNG_DECL void nng_send_aio(nng_socket, nng_aio *); + +// nng_recv_aio receives data on the socket asynchronously. On a successful +// result, the AIO will have an associated message, that can be obtained +// with nng_aio_get_msg(). The caller takes ownership of the message at +// this point. +NNG_DECL void nng_recv_aio(nng_socket, nng_aio *); + // nng_alloc is used to allocate memory. It's intended purpose is for // allocating memory suitable for message buffers with nng_send(). // Applications that need memory for other purposes should use their platform @@ -225,6 +245,62 @@ NNG_DECL void *nng_alloc(size_t); // calloc. NNG_DECL void nng_free(void *, size_t); +// Async IO API. AIO structures can be thought of as "handles" to +// support asynchronous operations. They contain the completion callback, and +// a pointer to consumer data. This is similar to how overlapped I/O +// works in Windows, when used with a completion callback. + +// nng_aio_alloc allocates a new AIO, and associated the completion +// callback and its opaque argument. If NULL is supplied for the +// callback, then the caller must use nng_aio_wait() to wait for the +// operation to complete. If the completion callback is not NULL, then +// when a submitted operation completes (or is canceled or fails) the +// callback will be executed, generally in a different thread, with no +// locks held. +NNG_DECL int nng_aio_alloc(nng_aio **, void (*)(void *), void *); + +// nng_aio_free frees the AIO and any associated resources. +// It *must not* be in use at the time it is freed. +NNG_DECL void nng_aio_free(nng_aio *); + +// nng_aio_stop stops any outstanding operation, and waits for the +// AIO to be free, including for the callback to have completed +// execution. Therefore the caller must NOT hold any locks that +// are acquired in the callback, or deadlock will occur. +NNG_DECL void nng_aio_stop(nng_aio *); + +// nng_aio_result returns the status/result of the operation. This +// will be zero on successful completion, or an nng error code on +// failure. +NNG_DECL int nng_aio_result(nng_aio *); + +// nng_aio_cancel attempts to cancel any in-progress I/O operation. +// The AIO callback will still be executed, but if the cancellation is +// successful then the status will be NNG_ECANCELED. +NNG_DECL void nng_aio_cancel(nng_aio *); + +// nng_aio_wait waits synchronously for any pending operation to complete. +// It also waits for the callback to have completed execution. Therefore, +// the caller of this function must not hold any locks acquired by the +// callback or deadlock may occur. +NNG_DECL void nng_aio_wait(nng_aio *); + +// nng_aio_set_msg sets the message structure to use for asynchronous +// message send operations. +NNG_DECL void nng_aio_set_msg(nng_aio *, nng_msg *); + +// nng_aio_get_msg returns the message structure associated with a completed +// receive operation. +NNG_DECL nng_msg *nng_aio_get_msg(nng_aio *); + +// nng_aio_set_timeout sets a timeout on the AIO. This should be called for +// operations that should time out after a period. The timeout should be +// either a positive number of milliseconds, or NNG_DURATION_INFINITE to +// indicate that the operation has no timeout. A poll may be done by +// specifying NNG_DURATION_ZERO. The value NNG_DURATION_DEFAULT indicates +// that any socket specific timeout should be used. +NNG_DECL void nng_aio_set_timeout(nng_aio *, nng_duration); + // Message API. NNG_DECL int nng_msg_alloc(nng_msg **, size_t); NNG_DECL void nng_msg_free(nng_msg *); diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 046aa19a..07f91602 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -37,7 +37,6 @@ static void bus_pipe_putq_cb(void *); // A bus_sock is our per-socket protocol private structure. struct bus_sock { - nni_sock *nsock; int raw; nni_aio * aio_getq; nni_list pipes; @@ -85,10 +84,9 @@ bus_sock_init(void **sp, nni_sock *nsock) bus_sock_fini(s); return (rv); } - s->nsock = nsock; - s->raw = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); *sp = s; return (0); @@ -241,7 +239,7 @@ bus_pipe_recv_cb(void *arg) nni_aio_set_msg(p->aio_putq, msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -315,7 +313,7 @@ bus_sock_getq_cb(void *arg) static void bus_sock_getq(bus_sock *s) { - nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } static void @@ -349,7 +347,6 @@ bus_sock_send(void *arg, nni_aio *aio) { bus_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -358,7 +355,6 @@ bus_sock_recv(void *arg, nni_aio *aio) { bus_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index cca03cc8..29f3b59c 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -28,7 +28,6 @@ static void pair0_pipe_fini(void *); // pair0_sock is our per-socket protocol private structure. struct pair0_sock { - nni_sock * nsock; pair0_pipe *ppipe; nni_msgq * uwq; nni_msgq * urq; @@ -58,7 +57,6 @@ pair0_sock_init(void **sp, nni_sock *nsock) return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - s->nsock = nsock; s->ppipe = NULL; s->raw = 0; s->uwq = nni_sock_sendq(nsock); @@ -248,7 +246,6 @@ pair0_sock_send(void *arg, nni_aio *aio) { pair0_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -257,7 +254,6 @@ pair0_sock_recv(void *arg, nni_aio *aio) { pair0_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index f43a4785..86ee97eb 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -29,7 +29,6 @@ static void pair1_pipe_fini(void *); // pair1_sock is our per-socket protocol private structure. struct pair1_sock { - nni_sock * nsock; nni_msgq * uwq; nni_msgq * urq; int raw; @@ -89,13 +88,12 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (rv); } - s->nsock = nsock; - s->raw = 0; - s->poly = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; + s->raw = 0; + s->poly = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; return (0); } @@ -451,7 +449,6 @@ pair1_sock_send(void *arg, nni_aio *aio) { pair1_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -460,7 +457,6 @@ pair1_sock_recv(void *arg, nni_aio *aio) { pair1_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 7dd0c8ed..267352c5 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -26,7 +26,6 @@ static void pull_putq(pull_pipe *, nni_msg *); struct pull_sock { nni_msgq *urq; int raw; - nni_sock *sock; }; // A pull_pipe is our per-pipe protocol private structure. @@ -45,9 +44,8 @@ pull_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->urq = nni_sock_recvq(sock); - s->sock = sock; + s->raw = 0; + s->urq = nni_sock_recvq(sock); *sp = s; return (0); @@ -198,7 +196,6 @@ pull_sock_recv(void *arg, nni_aio *aio) { pull_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index af7b80ca..995ac56d 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -28,7 +28,6 @@ static void push_getq_cb(void *); struct push_sock { nni_msgq *uwq; int raw; - nni_sock *sock; }; // An nni_push_pipe is our per-pipe protocol private structure. @@ -50,10 +49,9 @@ push_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->sock = sock; - s->uwq = nni_sock_sendq(sock); - *sp = s; + s->raw = 0; + s->uwq = nni_sock_sendq(sock); + *sp = s; return (0); } @@ -209,7 +207,6 @@ push_sock_send(void *arg, nni_aio *aio) { push_sock *s = arg; - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 4604d0ff..29863322 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -30,7 +30,6 @@ static void pub_pipe_fini(void *); // A pub_sock is our per-socket protocol private structure. struct pub_sock { - nni_sock *sock; nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -75,8 +74,7 @@ pub_sock_init(void **sp, nni_sock *sock) return (rv); } - s->sock = sock; - s->raw = 0; + s->raw = 0; NNI_LIST_INIT(&s->pipes, pub_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -291,7 +289,6 @@ pub_sock_send(void *arg, nni_aio *aio) { pub_sock *s = arg; - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 323bbb2f..d87b42ec 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -36,7 +36,6 @@ struct sub_topic { // An nni_rep_sock is our per-socket protocol private structure. struct sub_sock { - nni_sock *sock; nni_list topics; nni_msgq *urq; int raw; @@ -61,8 +60,7 @@ sub_sock_init(void **sp, nni_sock *sock) } nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub_topic, node); - s->sock = sock; - s->raw = 0; + s->raw = 0; s->urq = nni_sock_recvq(sock); *sp = s; @@ -296,7 +294,6 @@ sub_sock_recv(void *arg, nni_aio *aio) { sub_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 510beab3..5d924e32 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -29,7 +29,6 @@ static void rep_pipe_fini(void *); // A rep_sock is our per-socket protocol private structure. struct rep_sock { - nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -84,7 +83,6 @@ rep_sock_init(void **sp, nni_sock *sock) } s->ttl = 8; // Per RFC - s->sock = sock; s->raw = 0; s->btrace = NULL; s->btrace_len = 0; @@ -420,7 +418,6 @@ rep_sock_send(void *arg, nni_aio *aio) if (s->raw) { // Pass thru nni_mtx_unlock(&s->lk); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -447,7 +444,6 @@ rep_sock_send(void *arg, nni_aio *aio) s->btrace_len = 0; nni_mtx_unlock(&s->lk); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } @@ -456,7 +452,6 @@ rep_sock_recv(void *arg, nni_aio *aio) { rep_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 81abd306..1ab49f3e 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -28,7 +28,6 @@ static void req_pipe_fini(void *); // A req_sock is our per-socket protocol private structure. struct req_sock { - nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -90,7 +89,6 @@ req_sock_init(void **sp, nni_sock *sock) // this is "semi random" start for request IDs. s->nextid = nni_random(); s->retry = NNI_SECOND * 60; - s->sock = sock; s->reqmsg = NULL; s->raw = 0; s->wantw = 0; @@ -512,7 +510,6 @@ req_sock_send(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if (s->raw) { nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -536,11 +533,6 @@ req_sock_send(void *arg, nni_aio *aio) return; } - // XXX: I think we should just not do this... and leave the - // socket "permanently writeable". This does screw up all the - // backpressure. - // nni_sock_send_pending(s->sock); - // If another message is there, this cancels it. if (s->reqmsg != NULL) { nni_msg_free(s->reqmsg); @@ -619,7 +611,6 @@ req_sock_recv(void *arg, nni_aio *aio) } } nni_mtx_unlock(&s->mtx); - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 4c3ea8e3..dbce0751 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -29,7 +29,6 @@ static void resp_pipe_fini(void *); // A resp_sock is our per-socket protocol private structure. struct resp_sock { - nni_sock * nsock; nni_msgq * urq; nni_msgq * uwq; int raw; @@ -85,7 +84,6 @@ resp_sock_init(void **sp, nni_sock *nsock) } s->ttl = 8; // Per RFC - s->nsock = nsock; s->raw = 0; s->btrace = NULL; s->btrace_len = 0; @@ -268,9 +266,9 @@ resp_send_cb(void *arg) static void resp_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq; + resp_pipe *p = arg; + resp_sock *s = p->psock; + nni_msgq * urq = s->urq; nni_msg * msg; int hops; int rv; @@ -279,8 +277,6 @@ resp_recv_cb(void *arg) goto error; } - urq = nni_sock_recvq(s->nsock); - msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); @@ -384,7 +380,6 @@ resp_sock_send(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if (s->raw) { nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -413,7 +408,6 @@ resp_sock_send(void *arg, nni_aio *aio) s->btrace_len = 0; nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -454,7 +448,6 @@ resp_sock_recv(void *arg, nni_aio *aio) { resp_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 1c15054f..f44cd63a 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -28,7 +28,6 @@ static void surv_timeout(void *); // A surv_sock is our per-socket protocol private structure. struct surv_sock { - nni_sock * nsock; nni_duration survtime; nni_time expire; int raw; @@ -84,7 +83,6 @@ surv_sock_init(void **sp, nni_sock *nsock) nni_timer_init(&s->timer, surv_timeout, s); s->nextid = nni_random(); - s->nsock = nsock; s->raw = 0; s->survtime = NNI_SECOND * 60; s->expire = NNI_TIME_ZERO; @@ -362,7 +360,6 @@ surv_sock_recv(void *arg, nni_aio *aio) return; } nni_mtx_unlock(&s->mtx); - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } @@ -378,7 +375,6 @@ surv_sock_send(void *arg, nni_aio *aio) // No automatic retry, and the request ID must // be in the header coming down. nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -404,7 +400,6 @@ surv_sock_send(void *arg, nni_aio *aio) nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } |
