aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.h3
-rw-r--r--src/core/socket.c67
-rw-r--r--src/core/socket.h12
-rw-r--r--src/core/taskq.c3
-rw-r--r--src/nng.c168
-rw-r--r--src/nng.h76
-rw-r--r--src/protocol/bus/bus.c14
-rw-r--r--src/protocol/pair/pair_v0.c4
-rw-r--r--src/protocol/pair/pair_v1.c16
-rw-r--r--src/protocol/pipeline/pull.c7
-rw-r--r--src/protocol/pipeline/push.c9
-rw-r--r--src/protocol/pubsub/pub.c5
-rw-r--r--src/protocol/pubsub/sub.c5
-rw-r--r--src/protocol/reqrep/rep.c5
-rw-r--r--src/protocol/reqrep/req.c9
-rw-r--r--src/protocol/survey/respond.c13
-rw-r--r--src/protocol/survey/survey.c5
17 files changed, 276 insertions, 145 deletions
diff --git a/src/core/aio.h b/src/core/aio.h
index b12fcc55..3bdcf433 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -35,7 +35,8 @@ struct nni_aio {
unsigned a_expiring : 1; // expiration callback in progress
unsigned a_waiting : 1; // a thread is waiting for this to finish
unsigned a_synch : 1; // run completion synchronously
- unsigned a_pad : 26; // ensure 32-bit alignment
+ unsigned a_reltime : 1; // expiration time is relative
+ unsigned a_pad : 25; // ensure 32-bit alignment
nni_task a_task;
// Read/write operations.
diff --git a/src/core/socket.c b/src/core/socket.c
index dfa49317..8fb8e67b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,7 +53,7 @@ struct nni_socket {
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
- // XXX: options
+ // options
nni_duration s_linger; // linger time
nni_duration s_sndtimeo; // send timeout
nni_duration s_rcvtimeo; // receive timeout
@@ -143,13 +143,6 @@ nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
nni_msgq_set_cb(mq, cb, fd);
-#if 0
- if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
- nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
- return (NNG_ENOMEM);
- }
-#endif
-
fd->sn_init = 1;
*szp = sizeof(int);
memcpy(val, &fd->sn_rfd, sizeof(int));
@@ -449,31 +442,11 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
}
-void
-nni_sock_send_pending(nni_sock *sock)
-{
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
- }
-}
-
-void
-nni_sock_recv_pending(nni_sock *sock)
-{
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
- }
-}
-
static void
nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
- if (s == NULL) {
- return;
- }
-
// Close any open notification pipes.
if (s->s_recv_fd.sn_init) {
nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
@@ -796,9 +769,32 @@ nni_sock_closeall(void)
}
}
+static void
+nni_sock_normalize_expiration(nni_aio *aio, nni_duration def)
+{
+ if (aio->a_reltime) {
+ if (aio->a_expire == (nni_time) -2) {
+ aio->a_expire = def;
+ }
+ switch (aio->a_expire) {
+ case (nni_time) 0:
+ aio->a_expire = NNI_TIME_ZERO;
+ break;
+ case (nni_time) -1:
+ aio->a_expire = NNI_TIME_NEVER;
+ break;
+ default:
+ aio->a_expire = nni_clock() + aio->a_expire;
+ break;
+ }
+ aio->a_reltime = 0;
+ }
+}
+
void
nni_sock_send(nni_sock *sock, nni_aio *aio)
{
+ nni_sock_normalize_expiration(aio, sock->s_sndtimeo);
sock->s_sock_ops.sock_send(sock->s_data, aio);
}
@@ -837,6 +833,7 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
void
nni_sock_recv(nni_sock *sock, nni_aio *aio)
{
+ nni_sock_normalize_expiration(aio, sock->s_rcvtimeo);
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
@@ -862,21 +859,17 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
expire += timeo;
}
- for (;;) {
- nni_aio_set_timeout(aio, expire);
- nni_sock_recv(sock, aio);
- nni_aio_wait(aio);
+ nni_aio_set_timeout(aio, expire);
+ nni_sock_recv(sock, aio);
+ nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- if (rv != 0) {
- break;
- }
+ if ((rv = nni_aio_result(aio)) == 0) {
msg = nni_aio_get_msg(aio);
nni_aio_set_msg(aio, NULL);
*msgp = msg;
- break;
}
+
nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 6d9622e3..52310ef3 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -46,18 +46,8 @@ extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
// Note that each of these should be called without any locks held, since
// the socket can reenter the protocol.
-// nni_sock_send_pending is called by the protocol when it enqueues
-// a send operation. The main purpose of this is to clear the raised
-// signal raised on the event descriptor.
-extern void nni_sock_send_pending(nni_sock *);
-
-// nni_sock_recv_pending is called by the protocl when it enqueues
-// a receive operation. The main purpose of this is to clear the raised
-// signal raised on the event descriptor.
-extern void nni_sock_recv_pending(nni_sock *);
-
// nni_socket_sendq obtains the upper writeq. The protocol should
-// recieve messages from this, and place them on the appropriate pipe.
+// receive messages from this, and place them on the appropriate pipe.
extern nni_msgq *nni_sock_sendq(nni_sock *);
// nni_socket_recvq obtains the upper readq. The protocol should
diff --git a/src/core/taskq.c b/src/core/taskq.c
index d0f04e8a..90cf7e22 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -192,6 +192,9 @@ nni_task_wait(nni_task *task)
nni_taskq *tq = task->task_tq;
int running;
+ if (task->task_cb == NULL) {
+ return;
+ }
nni_mtx_lock(&tq->tq_mtx);
for (;;) {
running = 0;
diff --git a/src/nng.c b/src/nng.c
index 9fc99dc3..39206941 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -95,6 +95,18 @@ nng_peer(nng_socket sid)
return (pnum);
}
+void *
+nng_alloc(size_t sz)
+{
+ return (nni_alloc(sz));
+}
+
+void
+nng_free(void *buf, size_t sz)
+{
+ nni_free(buf, sz);
+}
+
int
nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
{
@@ -134,20 +146,28 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
int
nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags)
{
- int rv;
- nni_sock *sock;
+ int rv;
+ nng_aio *ap;
- if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
return (rv);
}
- rv = nni_sock_recvmsg(sock, msgp, flags);
- nni_sock_rele(sock);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ }
- // Possibly massage nonblocking attempt. Note that nonblocking is
- // still done asynchronously, and the calling thread loses context.
- if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
+ nng_recv_aio(sid, ap);
+ nng_aio_wait(ap);
+
+ if ((rv = nng_aio_result(ap)) == 0) {
+ *msgp = nng_aio_get_msg(ap);
+
+ } else if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
rv = NNG_EAGAIN;
}
+ nng_aio_free(ap);
return (rv);
}
@@ -171,29 +191,27 @@ nng_send(nng_socket sid, void *buf, size_t len, int flags)
return (rv);
}
-void *
-nng_alloc(size_t sz)
-{
- return (nni_alloc(sz));
-}
-
-void
-nng_free(void *buf, size_t sz)
-{
- nni_free(buf, sz);
-}
-
int
nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
{
- int rv;
- nni_sock *sock;
+ int rv;
+ nng_aio *ap;
- if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
return (rv);
}
- rv = nni_sock_sendmsg(sock, msg, flags);
- nni_sock_rele(sock);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ }
+
+ nng_aio_set_msg(ap, msg);
+ nng_send_aio(sid, ap);
+ nng_aio_wait(ap);
+
+ rv = nng_aio_result(ap);
+ nng_aio_free(ap);
// Possibly massage nonblocking attempt. Note that nonblocking is
// still done asynchronously, and the calling thread loses context.
@@ -204,6 +222,36 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
return (rv);
}
+void
+nng_recv_aio(nng_socket sid, nng_aio *ap)
+{
+ nni_aio * aio = (nni_aio *) ap;
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_sock_recv(sock, aio);
+ nni_sock_rele(sock);
+}
+
+void
+nng_send_aio(nng_socket sid, nng_aio *ap)
+{
+ nni_aio * aio = (nni_aio *) ap;
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_find(&sock, sid)) != 0) {
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ nni_sock_send(sock, aio);
+ nni_sock_rele(sock);
+}
+
int
nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
{
@@ -947,6 +995,76 @@ nng_msg_getopt(nng_msg *msg, int opt, void *ptr, size_t *szp)
return (nni_msg_getopt(msg, opt, ptr, szp));
}
+int
+nng_aio_alloc(nng_aio **app, void (*cb)(void *), void *arg)
+{
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_aio_init(&aio, (nni_cb) cb, arg)) == 0) {
+ *app = (nng_aio *) aio;
+ }
+ aio->a_expire = (nni_time) NNG_DURATION_DEFAULT;
+ aio->a_reltime = 1;
+ return (rv);
+}
+
+void
+nng_aio_free(nng_aio *ap)
+{
+ nni_aio_fini((nni_aio *) ap);
+}
+
+int
+nng_aio_result(nng_aio *ap)
+{
+ return (nni_aio_result((nni_aio *) ap));
+}
+
+void
+nng_aio_stop(nng_aio *ap)
+{
+ nni_aio_stop((nni_aio *) ap);
+}
+
+void
+nng_aio_wait(nng_aio *ap)
+{
+ nni_aio_wait((nni_aio *) ap);
+}
+
+void
+nng_aio_cancel(nng_aio *ap)
+{
+ nni_aio_cancel((nni_aio *) ap, NNG_ECANCELED);
+}
+
+void
+nng_aio_set_msg(nng_aio *ap, nng_msg *msg)
+{
+ nni_aio_set_msg((nni_aio *) ap, msg);
+}
+
+nng_msg *
+nng_aio_get_msg(nng_aio *ap)
+{
+ return ((nng_msg *) (nni_aio_get_msg((nni_aio *) ap)));
+}
+
+void
+nng_aio_set_timeout(nng_aio *ap, nng_duration dur)
+{
+ // Durations here are relative, since we have no notion of a
+ // common clock. But underlying aio uses absolute times normally.
+ // Fortunately the absolute times are big enough; we just have to
+ // make sure that we "convert" the timeout from relative time to
+ // absolute time when submitting operations.
+ nni_aio_set_timeout((nni_aio *) ap, (nni_time) dur);
+}
+
#if 0
int
nng_snapshot_create(nng_socket sock, nng_snapshot **snapp)
diff --git a/src/nng.h b/src/nng.h
index c228a6d7..e013b9df 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -50,6 +50,12 @@ typedef int32_t nng_duration; // in milliseconds
typedef struct nng_msg nng_msg;
typedef struct nng_snapshot nng_snapshot;
typedef struct nng_stat nng_stat;
+typedef struct nng_aio nng_aio;
+
+// Some definitions for durations used with timeouts.
+#define NNG_DURATION_INFINITE (-1)
+#define NNG_DURATION_DEFAULT (-2)
+#define NNG_DURATION_ZERO (0)
// nng_fini is used to terminate the library, freeing certain global resources.
// For most cases, this call is optional, but failure to do so may cause
@@ -212,6 +218,20 @@ NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int);
// can be passed off directly to nng_sendmsg.
NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int);
+// nng_send_aio sends data on the socket asynchronously. As with nng_send,
+// the completion may be executed before the data has actually been delivered,
+// but only when it is accepted for delivery. The supplied AIO must have
+// been initialized, and have an associated message. The message will be
+// "owned" by the socket if the operation completes successfully. Otherwise
+// the caller is responsible for freeing it.
+NNG_DECL void nng_send_aio(nng_socket, nng_aio *);
+
+// nng_recv_aio receives data on the socket asynchronously. On a successful
+// result, the AIO will have an associated message, that can be obtained
+// with nng_aio_get_msg(). The caller takes ownership of the message at
+// this point.
+NNG_DECL void nng_recv_aio(nng_socket, nng_aio *);
+
// nng_alloc is used to allocate memory. It's intended purpose is for
// allocating memory suitable for message buffers with nng_send().
// Applications that need memory for other purposes should use their platform
@@ -225,6 +245,62 @@ NNG_DECL void *nng_alloc(size_t);
// calloc.
NNG_DECL void nng_free(void *, size_t);
+// Async IO API. AIO structures can be thought of as "handles" to
+// support asynchronous operations. They contain the completion callback, and
+// a pointer to consumer data. This is similar to how overlapped I/O
+// works in Windows, when used with a completion callback.
+
+// nng_aio_alloc allocates a new AIO, and associated the completion
+// callback and its opaque argument. If NULL is supplied for the
+// callback, then the caller must use nng_aio_wait() to wait for the
+// operation to complete. If the completion callback is not NULL, then
+// when a submitted operation completes (or is canceled or fails) the
+// callback will be executed, generally in a different thread, with no
+// locks held.
+NNG_DECL int nng_aio_alloc(nng_aio **, void (*)(void *), void *);
+
+// nng_aio_free frees the AIO and any associated resources.
+// It *must not* be in use at the time it is freed.
+NNG_DECL void nng_aio_free(nng_aio *);
+
+// nng_aio_stop stops any outstanding operation, and waits for the
+// AIO to be free, including for the callback to have completed
+// execution. Therefore the caller must NOT hold any locks that
+// are acquired in the callback, or deadlock will occur.
+NNG_DECL void nng_aio_stop(nng_aio *);
+
+// nng_aio_result returns the status/result of the operation. This
+// will be zero on successful completion, or an nng error code on
+// failure.
+NNG_DECL int nng_aio_result(nng_aio *);
+
+// nng_aio_cancel attempts to cancel any in-progress I/O operation.
+// The AIO callback will still be executed, but if the cancellation is
+// successful then the status will be NNG_ECANCELED.
+NNG_DECL void nng_aio_cancel(nng_aio *);
+
+// nng_aio_wait waits synchronously for any pending operation to complete.
+// It also waits for the callback to have completed execution. Therefore,
+// the caller of this function must not hold any locks acquired by the
+// callback or deadlock may occur.
+NNG_DECL void nng_aio_wait(nng_aio *);
+
+// nng_aio_set_msg sets the message structure to use for asynchronous
+// message send operations.
+NNG_DECL void nng_aio_set_msg(nng_aio *, nng_msg *);
+
+// nng_aio_get_msg returns the message structure associated with a completed
+// receive operation.
+NNG_DECL nng_msg *nng_aio_get_msg(nng_aio *);
+
+// nng_aio_set_timeout sets a timeout on the AIO. This should be called for
+// operations that should time out after a period. The timeout should be
+// either a positive number of milliseconds, or NNG_DURATION_INFINITE to
+// indicate that the operation has no timeout. A poll may be done by
+// specifying NNG_DURATION_ZERO. The value NNG_DURATION_DEFAULT indicates
+// that any socket specific timeout should be used.
+NNG_DECL void nng_aio_set_timeout(nng_aio *, nng_duration);
+
// Message API.
NNG_DECL int nng_msg_alloc(nng_msg **, size_t);
NNG_DECL void nng_msg_free(nng_msg *);
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 046aa19a..07f91602 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -37,7 +37,6 @@ static void bus_pipe_putq_cb(void *);
// A bus_sock is our per-socket protocol private structure.
struct bus_sock {
- nni_sock *nsock;
int raw;
nni_aio * aio_getq;
nni_list pipes;
@@ -85,10 +84,9 @@ bus_sock_init(void **sp, nni_sock *nsock)
bus_sock_fini(s);
return (rv);
}
- s->nsock = nsock;
- s->raw = 0;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+ s->raw = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
*sp = s;
return (0);
@@ -241,7 +239,7 @@ bus_pipe_recv_cb(void *arg)
nni_aio_set_msg(p->aio_putq, msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -315,7 +313,7 @@ bus_sock_getq_cb(void *arg)
static void
bus_sock_getq(bus_sock *s)
{
- nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
@@ -349,7 +347,6 @@ bus_sock_send(void *arg, nni_aio *aio)
{
bus_sock *s = arg;
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -358,7 +355,6 @@ bus_sock_recv(void *arg, nni_aio *aio)
{
bus_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index cca03cc8..29f3b59c 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -28,7 +28,6 @@ static void pair0_pipe_fini(void *);
// pair0_sock is our per-socket protocol private structure.
struct pair0_sock {
- nni_sock * nsock;
pair0_pipe *ppipe;
nni_msgq * uwq;
nni_msgq * urq;
@@ -58,7 +57,6 @@ pair0_sock_init(void **sp, nni_sock *nsock)
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
- s->nsock = nsock;
s->ppipe = NULL;
s->raw = 0;
s->uwq = nni_sock_sendq(nsock);
@@ -248,7 +246,6 @@ pair0_sock_send(void *arg, nni_aio *aio)
{
pair0_sock *s = arg;
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -257,7 +254,6 @@ pair0_sock_recv(void *arg, nni_aio *aio)
{
pair0_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index f43a4785..86ee97eb 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -29,7 +29,6 @@ static void pair1_pipe_fini(void *);
// pair1_sock is our per-socket protocol private structure.
struct pair1_sock {
- nni_sock * nsock;
nni_msgq * uwq;
nni_msgq * urq;
int raw;
@@ -89,13 +88,12 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (rv);
}
- s->nsock = nsock;
- s->raw = 0;
- s->poly = 0;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
- *sp = s;
+ s->raw = 0;
+ s->poly = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
return (0);
}
@@ -451,7 +449,6 @@ pair1_sock_send(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -460,7 +457,6 @@ pair1_sock_recv(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 7dd0c8ed..267352c5 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -26,7 +26,6 @@ static void pull_putq(pull_pipe *, nni_msg *);
struct pull_sock {
nni_msgq *urq;
int raw;
- nni_sock *sock;
};
// A pull_pipe is our per-pipe protocol private structure.
@@ -45,9 +44,8 @@ pull_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = 0;
- s->urq = nni_sock_recvq(sock);
- s->sock = sock;
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
*sp = s;
return (0);
@@ -198,7 +196,6 @@ pull_sock_recv(void *arg, nni_aio *aio)
{
pull_sock *s = arg;
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index af7b80ca..995ac56d 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -28,7 +28,6 @@ static void push_getq_cb(void *);
struct push_sock {
nni_msgq *uwq;
int raw;
- nni_sock *sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
@@ -50,10 +49,9 @@ push_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = 0;
- s->sock = sock;
- s->uwq = nni_sock_sendq(sock);
- *sp = s;
+ s->raw = 0;
+ s->uwq = nni_sock_sendq(sock);
+ *sp = s;
return (0);
}
@@ -209,7 +207,6 @@ push_sock_send(void *arg, nni_aio *aio)
{
push_sock *s = arg;
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
}
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 4604d0ff..29863322 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -30,7 +30,6 @@ static void pub_pipe_fini(void *);
// A pub_sock is our per-socket protocol private structure.
struct pub_sock {
- nni_sock *sock;
nni_msgq *uwq;
int raw;
nni_aio * aio_getq;
@@ -75,8 +74,7 @@ pub_sock_init(void **sp, nni_sock *sock)
return (rv);
}
- s->sock = sock;
- s->raw = 0;
+ s->raw = 0;
NNI_LIST_INIT(&s->pipes, pub_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -291,7 +289,6 @@ pub_sock_send(void *arg, nni_aio *aio)
{
pub_sock *s = arg;
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 323bbb2f..d87b42ec 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -36,7 +36,6 @@ struct sub_topic {
// An nni_rep_sock is our per-socket protocol private structure.
struct sub_sock {
- nni_sock *sock;
nni_list topics;
nni_msgq *urq;
int raw;
@@ -61,8 +60,7 @@ sub_sock_init(void **sp, nni_sock *sock)
}
nni_mtx_init(&s->lk);
NNI_LIST_INIT(&s->topics, sub_topic, node);
- s->sock = sock;
- s->raw = 0;
+ s->raw = 0;
s->urq = nni_sock_recvq(sock);
*sp = s;
@@ -296,7 +294,6 @@ sub_sock_recv(void *arg, nni_aio *aio)
{
sub_sock *s = arg;
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 510beab3..5d924e32 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -29,7 +29,6 @@ static void rep_pipe_fini(void *);
// A rep_sock is our per-socket protocol private structure.
struct rep_sock {
- nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
@@ -84,7 +83,6 @@ rep_sock_init(void **sp, nni_sock *sock)
}
s->ttl = 8; // Per RFC
- s->sock = sock;
s->raw = 0;
s->btrace = NULL;
s->btrace_len = 0;
@@ -420,7 +418,6 @@ rep_sock_send(void *arg, nni_aio *aio)
if (s->raw) {
// Pass thru
nni_mtx_unlock(&s->lk);
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -447,7 +444,6 @@ rep_sock_send(void *arg, nni_aio *aio)
s->btrace_len = 0;
nni_mtx_unlock(&s->lk);
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -456,7 +452,6 @@ rep_sock_recv(void *arg, nni_aio *aio)
{
rep_sock *s = arg;
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 81abd306..1ab49f3e 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -28,7 +28,6 @@ static void req_pipe_fini(void *);
// A req_sock is our per-socket protocol private structure.
struct req_sock {
- nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
nni_duration retry;
@@ -90,7 +89,6 @@ req_sock_init(void **sp, nni_sock *sock)
// this is "semi random" start for request IDs.
s->nextid = nni_random();
s->retry = NNI_SECOND * 60;
- s->sock = sock;
s->reqmsg = NULL;
s->raw = 0;
s->wantw = 0;
@@ -512,7 +510,6 @@ req_sock_send(void *arg, nni_aio *aio)
nni_mtx_lock(&s->mtx);
if (s->raw) {
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -536,11 +533,6 @@ req_sock_send(void *arg, nni_aio *aio)
return;
}
- // XXX: I think we should just not do this... and leave the
- // socket "permanently writeable". This does screw up all the
- // backpressure.
- // nni_sock_send_pending(s->sock);
-
// If another message is there, this cancels it.
if (s->reqmsg != NULL) {
nni_msg_free(s->reqmsg);
@@ -619,7 +611,6 @@ req_sock_recv(void *arg, nni_aio *aio)
}
}
nni_mtx_unlock(&s->mtx);
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 4c3ea8e3..dbce0751 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -29,7 +29,6 @@ static void resp_pipe_fini(void *);
// A resp_sock is our per-socket protocol private structure.
struct resp_sock {
- nni_sock * nsock;
nni_msgq * urq;
nni_msgq * uwq;
int raw;
@@ -85,7 +84,6 @@ resp_sock_init(void **sp, nni_sock *nsock)
}
s->ttl = 8; // Per RFC
- s->nsock = nsock;
s->raw = 0;
s->btrace = NULL;
s->btrace_len = 0;
@@ -268,9 +266,9 @@ resp_send_cb(void *arg)
static void
resp_recv_cb(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- nni_msgq * urq;
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+ nni_msgq * urq = s->urq;
nni_msg * msg;
int hops;
int rv;
@@ -279,8 +277,6 @@ resp_recv_cb(void *arg)
goto error;
}
- urq = nni_sock_recvq(s->nsock);
-
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, p->id);
@@ -384,7 +380,6 @@ resp_sock_send(void *arg, nni_aio *aio)
nni_mtx_lock(&s->mtx);
if (s->raw) {
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -413,7 +408,6 @@ resp_sock_send(void *arg, nni_aio *aio)
s->btrace_len = 0;
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -454,7 +448,6 @@ resp_sock_recv(void *arg, nni_aio *aio)
{
resp_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 1c15054f..f44cd63a 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -28,7 +28,6 @@ static void surv_timeout(void *);
// A surv_sock is our per-socket protocol private structure.
struct surv_sock {
- nni_sock * nsock;
nni_duration survtime;
nni_time expire;
int raw;
@@ -84,7 +83,6 @@ surv_sock_init(void **sp, nni_sock *nsock)
nni_timer_init(&s->timer, surv_timeout, s);
s->nextid = nni_random();
- s->nsock = nsock;
s->raw = 0;
s->survtime = NNI_SECOND * 60;
s->expire = NNI_TIME_ZERO;
@@ -362,7 +360,6 @@ surv_sock_recv(void *arg, nni_aio *aio)
return;
}
nni_mtx_unlock(&s->mtx);
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
@@ -378,7 +375,6 @@ surv_sock_send(void *arg, nni_aio *aio)
// No automatic retry, and the request ID must
// be in the header coming down.
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -404,7 +400,6 @@ surv_sock_send(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}