aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-31 17:59:01 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 11:47:07 -0700
commitd72076207a2fad96ff014a81366868fb47a0ed1b (patch)
tree5a4f67ab607ef6690e983c2d1ab2c64062027e52
parent366f3e5d14c5f891655ad1fa2b3cfa9a56b8830d (diff)
downloadnng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.gz
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.tar.bz2
nng-d72076207a2fad96ff014a81366868fb47a0ed1b.zip
Allocate AIOs dynamically.
We allocate AIO structures dynamically, so that we can use them abstractly in more places without inlining them. This will be used for the ZeroTier transport to allow us to create operations consisting of just the AIO. Furthermore, we provide accessors for some of the aio members, in the hopes that we will be able to wrap these for "safe" version of the AIO capability to export to applications, and to protocol and transport implementors. While here we cleaned up the protocol details to use consistently shorter names (no nni_ prefix for static symbols needed), and we also fixed a bug in the surveyor code.
-rw-r--r--src/core/aio.c75
-rw-r--r--src/core/aio.h26
-rw-r--r--src/core/endpt.c85
-rw-r--r--src/core/event.h3
-rw-r--r--src/core/msgqueue.c59
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/socket.c10
-rw-r--r--src/protocol/bus/bus.c354
-rw-r--r--src/protocol/pair/pair_v0.c90
-rw-r--r--src/protocol/pair/pair_v1.c130
-rw-r--r--src/protocol/pipeline/pull.c188
-rw-r--r--src/protocol/pipeline/push.c203
-rw-r--r--src/protocol/pubsub/pub.c305
-rw-r--r--src/protocol/pubsub/sub.c236
-rw-r--r--src/protocol/reqrep/rep.c404
-rw-r--r--src/protocol/reqrep/req.c522
-rw-r--r--src/protocol/survey/respond.c445
-rw-r--r--src/protocol/survey/survey.c393
-rw-r--r--src/transport/ipc/ipc.c133
-rw-r--r--src/transport/tcp/tcp.c163
-rw-r--r--tests/resolv.c85
21 files changed, 2032 insertions, 1891 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 5b8c7970..9b308cd1 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -56,23 +56,34 @@ static nni_list nni_aio_expire_aios;
static void nni_aio_expire_add(nni_aio *);
-void
-nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
+int
+nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
+ nni_aio *aio;
+
+ if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
+ return (NNG_ENOMEM);
+ }
memset(aio, 0, sizeof(*aio));
nni_cv_init(&aio->a_cv, &nni_aio_lk);
aio->a_expire = NNI_TIME_NEVER;
aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
+ *aiop = aio;
+ return (0);
}
void
nni_aio_fini(nni_aio *aio)
{
- nni_aio_stop(aio);
+ if (aio != NULL) {
+ nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+
+ NNI_FREE_STRUCT(aio);
+ }
}
// nni_aio_stop cancels any oustanding operation, and waits for the
@@ -97,6 +108,60 @@ nni_aio_stop(nni_aio *aio)
nni_aio_wait(aio);
}
+void
+nni_aio_set_timeout(nni_aio *aio, nni_time when)
+{
+ aio->a_expire = when;
+}
+
+void
+nni_aio_set_msg(nni_aio *aio, nni_msg *msg)
+{
+ aio->a_msg = msg;
+}
+
+nni_msg *
+nni_aio_get_msg(nni_aio *aio)
+{
+ return (aio->a_msg);
+}
+
+void
+nni_aio_set_pipe(nni_aio *aio, void *p)
+{
+ aio->a_pipe = p;
+}
+
+void *
+nni_aio_get_pipe(nni_aio *aio)
+{
+ return (aio->a_pipe);
+}
+
+void
+nni_aio_set_ep(nni_aio *aio, void *ep)
+{
+ aio->a_endpt = ep;
+}
+
+void *
+nni_aio_get_ep(nni_aio *aio)
+{
+ return (aio->a_endpt);
+}
+
+void
+nni_aio_set_data(nni_aio *aio, void *data)
+{
+ aio->a_data = data;
+}
+
+void *
+nni_aio_get_data(nni_aio *aio)
+{
+ return (aio->a_data);
+}
+
int
nni_aio_result(nni_aio *aio)
{
diff --git a/src/core/aio.h b/src/core/aio.h
index 9114c9fe..eae17446 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -52,6 +52,9 @@ struct nni_aio {
// Resolver operations.
nni_sockaddr *a_addr;
+ // Extra user data.
+ void *a_data;
+
// Provider-use fields.
nni_aio_cancelfn a_prov_cancel;
void * a_prov_data;
@@ -65,7 +68,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern void nni_aio_init(nni_aio *, nni_cb, void *);
+extern int nni_aio_init(nni_aio **, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
@@ -83,6 +86,27 @@ extern void nni_aio_fini(nni_aio *);
// use nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);
+// nni_aio_set_data sets user data. This should only be done by the
+// consumer, initiating the I/O. The intention is to be able to store
+// additional data for use when the operation callback is executed.
+extern void nni_aio_set_data(nni_aio *, void *);
+
+// nni_aio_get_data returns the user data that was previously stored
+// with nni_aio_set_data.
+extern void *nni_aio_get_data(nni_aio *);
+
+extern void nni_aio_set_msg(nni_aio *, nni_msg *);
+extern nni_msg *nni_aio_get_msg(nni_aio *);
+extern void nni_aio_set_pipe(nni_aio *, void *);
+extern void * nni_aio_get_pipe(nni_aio *);
+extern void nni_aio_set_ep(nni_aio *, void *);
+extern void * nni_aio_get_ep(nni_aio *);
+
+// nni_aio_set_timeout sets the timeout (absolute) when the AIO will
+// be canceled. The cancelation does not happen until after nni_aio_start
+// is called.
+extern void nni_aio_set_timeout(nni_aio *, nni_time);
+
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9dc33867..9122bb58 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -31,10 +31,10 @@ struct nni_ep {
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
- nni_aio ep_acc_aio;
- nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
- nni_aio ep_tmo_aio; // backoff timer
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
@@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep)
nni_sock_ep_remove(ep->ep_sock, ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
- nni_aio_fini(&ep->ep_acc_aio);
- nni_aio_fini(&ep->ep_con_aio);
- nni_aio_fini(&ep->ep_con_syn);
- nni_aio_fini(&ep->ep_tmo_aio);
+ nni_aio_fini(ep->ep_acc_aio);
+ nni_aio_fini(ep->ep_con_aio);
+ nni_aio_fini(ep->ep_con_syn);
+ nni_aio_fini(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_data != NULL) {
@@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_mtx_init(&ep->ep_mtx);
nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
- nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
- nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
- nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
- nni_aio_init(&ep->ep_con_syn, NULL, NULL);
- if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
@@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
@@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- ep->ep_tmo_aio.a_expire =
- nni_clock() + (backoff ? nni_random() % backoff : 0);
+ nni_aio_set_timeout(ep->ep_tmo_aio,
+ nni_clock() + (backoff ? nni_random() % backoff : 0));
+
ep->ep_tmo_run = 1;
- if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
+ if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
ep->ep_tmo_run = 0;
}
}
@@ -339,7 +340,7 @@ static void
nni_ep_tmo_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_tmo_aio;
+ nni_aio *aio = ep->ep_tmo_aio;
nni_mtx_lock(&ep->ep_mtx);
if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
@@ -356,11 +357,11 @@ static void
nni_ep_con_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, aio->a_pipe);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
@@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg)
static void
nni_ep_con_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
}
@@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags)
}
// Synchronous mode: so we have to wait for it to complete.
- aio = &ep->ep_con_syn;
- aio->a_endpt = ep->ep_data;
+ aio = ep->ep_con_syn;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
ep->ep_started = 1;
nni_mtx_unlock(&ep->ep_mtx);
@@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// As we're synchronous, we also have to handle the completion.
if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) {
+ ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
@@ -458,12 +459,12 @@ static void
nni_ep_acc_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(aio->a_pipe != NULL);
- rv = nni_pipe_create(ep, aio->a_pipe);
+ NNI_ASSERT(nni_aio_get_pipe(aio) != NULL);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
@@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg)
static void
nni_ep_acc_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_pipe = NULL;
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_pipe(aio, NULL);
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_accept(ep->ep_data, aio);
}
diff --git a/src/core/event.h b/src/core/event.h
index 22af096e..9536d206 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -25,7 +26,7 @@ struct nng_notify {
void * n_arg;
int n_type;
nni_sock * n_sock;
- nni_aio n_aio;
+ nni_aio * n_aio;
};
extern void nni_ev_init(nni_event *, int, nni_sock *);
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index ef7dd5a2..6fd95f98 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq)
size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
+ msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(raio);
nni_aio_list_remove(waio);
@@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq)
mq->mq_put = 0;
}
mq->mq_len++;
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_finish(waio, 0, len);
continue;
}
@@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
- raio->a_msg = msg;
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
continue;
@@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq)
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
- waio->a_msg = NULL;
+ msg = nni_aio_get_msg(waio);
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
nni_aio_list_remove(raio);
@@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- nni_msgq_aio_get(mq, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *msgp = aio.a_msg;
- aio.a_msg = NULL;
- }
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_msgq_aio_get(mq, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) == 0) {
+ *msgp = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ }
+ nni_aio_fini(aio);
return (rv);
}
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- aio.a_msg = msg;
- nni_msgq_aio_put(mq, &aio);
- nni_aio_wait(&aio);
- rv = nni_aio_result(&aio);
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_aio_set_msg(aio, msg);
+ nni_msgq_aio_put(mq, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 0c024c66..0670ed01 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -30,7 +30,7 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_list_node p_reap_node;
- nni_aio p_start_aio;
+ nni_aio * p_start_aio;
};
static nni_idhash *nni_pipes;
@@ -99,7 +99,7 @@ nni_pipe_destroy(nni_pipe *p)
}
// Stop any pending negotiation.
- nni_aio_stop(&p->p_start_aio);
+ nni_aio_stop(p->p_start_aio);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
@@ -112,7 +112,7 @@ nni_pipe_destroy(nni_pipe *p)
// We have exclusive access at this point, so we can check if
// we are still on any lists.
- nni_aio_fini(&p->p_start_aio);
+ nni_aio_fini(p->p_start_aio);
if (nni_list_node_active(&p->p_ep_node)) {
nni_ep_pipe_remove(p->p_ep, p);
@@ -172,7 +172,7 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
// abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
+ nni_aio_cancel(p->p_start_aio, NNG_ECLOSED);
}
void
@@ -205,7 +205,7 @@ static void
nni_pipe_start_cb(void *arg)
{
nni_pipe *p = arg;
- nni_aio * aio = &p->p_start_aio;
+ nni_aio * aio = p->p_start_aio;
int rv;
if ((rv = nni_aio_result(aio)) != 0) {
@@ -270,9 +270,9 @@ void
nni_pipe_start(nni_pipe *p)
{
if (p->p_tran_ops.p_start == NULL) {
- nni_aio_finish(&p->p_start_aio, 0, 0);
+ nni_aio_finish(p->p_start_aio, 0, 0);
} else {
- p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio);
+ p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
}
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 6a242558..01fd9a0c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -222,7 +222,7 @@ nni_sock_cansend_cb(void *arg)
nni_notify *notify = arg;
nni_sock * sock = notify->n_sock;
- if (nni_aio_result(&notify->n_aio) != 0) {
+ if (nni_aio_result(notify->n_aio) != 0) {
return;
}
@@ -235,7 +235,7 @@ nni_sock_canrecv_cb(void *arg)
nni_notify *notify = arg;
nni_sock * sock = notify->n_sock;
- if (nni_aio_result(&notify->n_aio) != 0) {
+ if (nni_aio_result(notify->n_aio) != 0) {
return;
}
@@ -259,11 +259,11 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
switch (type) {
case NNG_EV_CAN_RCV:
nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
+ nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio);
break;
case NNG_EV_CAN_SND:
nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
+ nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio);
break;
default:
NNI_FREE_STRUCT(notify);
@@ -276,7 +276,7 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
void
nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
{
- nni_aio_fini(&notify->n_aio);
+ nni_aio_fini(notify->n_aio);
NNI_FREE_STRUCT(notify);
}
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 6f6def0a..c8d759f2 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -18,251 +18,257 @@
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct nni_bus_pipe nni_bus_pipe;
-typedef struct nni_bus_sock nni_bus_sock;
-
-static void nni_bus_sock_getq(nni_bus_sock *);
-static void nni_bus_pipe_getq(nni_bus_pipe *);
-static void nni_bus_pipe_send(nni_bus_pipe *);
-static void nni_bus_pipe_recv(nni_bus_pipe *);
-
-static void nni_bus_sock_getq_cb(void *);
-static void nni_bus_pipe_getq_cb(void *);
-static void nni_bus_pipe_send_cb(void *);
-static void nni_bus_pipe_recv_cb(void *);
-static void nni_bus_pipe_putq_cb(void *);
-
-// An nni_bus_sock is our per-socket protocol private structure.
-struct nni_bus_sock {
+typedef struct bus_pipe bus_pipe;
+typedef struct bus_sock bus_sock;
+
+static void bus_sock_getq(bus_sock *);
+static void bus_pipe_getq(bus_pipe *);
+static void bus_pipe_send(bus_pipe *);
+static void bus_pipe_recv(bus_pipe *);
+
+static void bus_sock_getq_cb(void *);
+static void bus_pipe_getq_cb(void *);
+static void bus_pipe_send_cb(void *);
+static void bus_pipe_recv_cb(void *);
+static void bus_pipe_putq_cb(void *);
+
+// A bus_sock is our per-socket protocol private structure.
+struct bus_sock {
nni_sock *nsock;
int raw;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
};
-// An nni_bus_pipe is our per-pipe protocol private structure.
-struct nni_bus_pipe {
+// A bus_pipe is our per-pipe protocol private structure.
+struct bus_pipe {
nni_pipe * npipe;
- nni_bus_sock *psock;
+ bus_sock * psock;
nni_msgq * sendq;
nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
+ nni_aio * aio_getq;
+ nni_aio * aio_recv;
+ nni_aio * aio_send;
+ nni_aio * aio_putq;
nni_mtx mtx;
};
static void
-nni_bus_sock_fini(void *arg)
+bus_sock_fini(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_bus_sock_init(void **sp, nni_sock *nsock)
+bus_sock_init(void **sp, nni_sock *nsock)
{
- nni_bus_sock *psock;
+ bus_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
- psock->nsock = nsock;
- psock->raw = 0;
+ NNI_LIST_INIT(&s->pipes, bus_pipe, node);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) {
+ bus_sock_fini(s);
+ return (rv);
+ }
+ s->nsock = nsock;
+ s->raw = 0;
- *sp = psock;
+ *sp = s;
return (0);
}
static void
-nni_bus_sock_open(void *arg)
+bus_sock_open(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_bus_sock_getq(psock);
+ bus_sock_getq(s);
}
static void
-nni_bus_sock_close(void *arg)
+bus_sock_close(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_bus_pipe_fini(void *arg)
+bus_pipe_fini(void *arg)
{
- nni_bus_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- nni_mtx_fini(&ppipe->mtx);
- NNI_FREE_STRUCT(ppipe);
+ bus_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_bus_pipe *ppipe;
- int rv;
+ bus_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ NNI_LIST_NODE_INIT(&p->node);
+ nni_mtx_init(&p->mtx);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) {
+ bus_pipe_fini(p);
return (rv);
}
- NNI_LIST_NODE_INIT(&ppipe->node);
- nni_mtx_init(&ppipe->mtx);
- nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
- nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
+
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
return (0);
}
static int
-nni_bus_pipe_start(void *arg)
+bus_pipe_start(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
- nni_mtx_lock(&psock->mtx);
- nni_list_append(&psock->pipes, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
- nni_bus_pipe_recv(ppipe);
- nni_bus_pipe_getq(ppipe);
+ bus_pipe_recv(p);
+ bus_pipe_getq(p);
return (0);
}
static void
-nni_bus_pipe_stop(void *arg)
+bus_pipe_stop(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
- nni_msgq_close(ppipe->sendq);
+ nni_msgq_close(p->sendq);
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
- nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_mtx_lock(&ppipe->psock->mtx);
- if (nni_list_active(&psock->pipes, ppipe)) {
- nni_list_remove(&psock->pipes, ppipe);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&ppipe->psock->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_bus_pipe_getq_cb(void *arg)
+bus_pipe_getq_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
// closed?
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
-nni_bus_pipe_send_cb(void *arg)
+bus_pipe_send_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
+ if (nni_aio_result(p->aio_send) != 0) {
// closed?
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_bus_pipe_getq(ppipe);
+ bus_pipe_getq(p);
}
static void
-nni_bus_pipe_recv_cb(void *arg)
+bus_pipe_recv_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
- nni_msg * msg;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
+ nni_msg * msg;
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- msg = ppipe->aio_recv.a_msg;
+ msg = nni_aio_get_msg(p->aio_recv);
- if (nni_msg_header_insert_u32(msg, nni_pipe_id(ppipe->npipe)) != 0) {
+ if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
- nni_pipe_stop(ppipe->npipe);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq);
}
static void
-nni_bus_pipe_putq_cb(void *arg)
+bus_pipe_putq_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
// Wait for another recv.
- nni_bus_pipe_recv(ppipe);
+ bus_pipe_recv(p);
}
static void
-nni_bus_sock_getq_cb(void *arg)
+bus_sock_getq_cb(void *arg)
{
- nni_bus_sock *psock = arg;
- nni_bus_pipe *ppipe;
- nni_bus_pipe *lpipe;
- nni_msgq * uwq = nni_sock_sendq(psock->nsock);
- nni_msg * msg;
- nni_msg * dup;
- uint32_t sender;
-
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ bus_sock *s = arg;
+ bus_pipe *p;
+ bus_pipe *lastp;
+ nni_msgq *uwq = nni_sock_sendq(s->nsock);
+ nni_msg * msg;
+ nni_msg * dup;
+ uint32_t sender;
+
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = psock->aio_getq.a_msg;
+ msg = nni_aio_get_msg(s->aio_getq);
// The header being present indicates that the message
// was received locally and we are rebroadcasting. (Device
@@ -274,103 +280,101 @@ nni_bus_sock_getq_cb(void *arg)
sender = 0;
}
- nni_mtx_lock(&psock->mtx);
- lpipe = nni_list_last(&psock->pipes);
- NNI_LIST_FOREACH (&psock->pipes, ppipe) {
- if (nni_pipe_id(ppipe->npipe) == sender) {
+ nni_mtx_lock(&s->mtx);
+ lastp = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (nni_pipe_id(p->npipe) == sender) {
continue;
}
- if (ppipe != lpipe) {
+ if (p != lastp) {
if (nni_msg_dup(&dup, msg) != 0) {
continue;
}
} else {
dup = msg;
}
- if (nni_msgq_tryput(ppipe->sendq, dup) != 0) {
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
- if (lpipe == NULL) {
+ if (lastp == NULL) {
nni_msg_free(msg);
}
- nni_bus_sock_getq(psock);
+ bus_sock_getq(s);
}
static void
-nni_bus_sock_getq(nni_bus_sock *psock)
+bus_sock_getq(bus_sock *s)
{
- nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq);
+ nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq);
}
static void
-nni_bus_pipe_getq(nni_bus_pipe *ppipe)
+bus_pipe_getq(bus_pipe *p)
{
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_bus_pipe_recv(nni_bus_pipe *ppipe)
+bus_pipe_recv(bus_pipe *p)
{
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_bus_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ bus_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_bus_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ bus_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_bus_pipe_ops = {
- .pipe_init = nni_bus_pipe_init,
- .pipe_fini = nni_bus_pipe_fini,
- .pipe_start = nni_bus_pipe_start,
- .pipe_stop = nni_bus_pipe_stop,
+static nni_proto_pipe_ops bus_pipe_ops = {
+ .pipe_init = bus_pipe_init,
+ .pipe_fini = bus_pipe_fini,
+ .pipe_start = bus_pipe_start,
+ .pipe_stop = bus_pipe_stop,
};
-static nni_proto_sock_ops nni_bus_sock_ops = {
- .sock_init = nni_bus_sock_init,
- .sock_fini = nni_bus_sock_fini,
- .sock_open = nni_bus_sock_open,
- .sock_close = nni_bus_sock_close,
- .sock_setopt = nni_bus_sock_setopt,
- .sock_getopt = nni_bus_sock_getopt,
+static nni_proto_sock_ops bus_sock_ops = {
+ .sock_init = bus_sock_init,
+ .sock_fini = bus_sock_fini,
+ .sock_open = bus_sock_open,
+ .sock_close = bus_sock_close,
+ .sock_setopt = bus_sock_setopt,
+ .sock_getopt = bus_sock_getopt,
};
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-nni_proto nni_bus_proto = {
+static nni_proto bus_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_BUS_V0, "bus" },
.proto_peer = { NNG_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_bus_sock_ops,
- .proto_pipe_ops = &nni_bus_pipe_ops,
+ .proto_sock_ops = &bus_sock_ops,
+ .proto_pipe_ops = &bus_pipe_ops,
};
int
nng_bus0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_bus_proto));
+ return (nni_proto_open(sidp, &bus_proto));
}
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index ef420051..486ce43b 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -43,10 +43,10 @@ struct pair0_sock {
struct pair0_pipe {
nni_pipe * npipe;
pair0_sock *psock;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
};
static int
@@ -76,18 +76,34 @@ pair0_sock_fini(void *arg)
NNI_FREE_STRUCT(s);
}
+static void
+pair0_pipe_fini(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
pair0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair0_send_cb, p);
- nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+ if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ pair0_pipe_fini(p);
+ return (rv);
+ }
p->npipe = npipe;
p->psock = psock;
@@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (0);
}
-static void
-pair0_pipe_fini(void *arg)
-{
- pair0_pipe *p = arg;
-
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair0_pipe_start(void *arg)
{
@@ -123,8 +127,8 @@ pair0_pipe_start(void *arg)
// Schedule a getq on the upper, and a read from the pipe.
// Each of these also sets up another hold on the pipe itself.
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
nni_mtx_lock(&s->mtx);
if (s->ppipe == p) {
@@ -154,17 +158,17 @@ pair0_recv_cb(void *arg)
pair0_sock *s = p->psock;
nni_msg * msg;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_putq.a_msg = msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -172,13 +176,13 @@ pair0_putq_cb(void *arg)
{
pair0_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -187,14 +191,14 @@ pair0_getq_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- p->aio_send.a_msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
@@ -203,14 +207,14 @@ pair0_send_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index a737402f..d6c8ea75 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -41,7 +41,7 @@ struct pair1_sock {
nni_list plist;
int started;
int poly;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
};
// pair1_pipe is our per-pipe protocol private structure.
@@ -49,10 +49,10 @@ struct pair1_pipe {
nni_pipe * npipe;
pair1_sock * psock;
nni_msgq * sendq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
nni_list_node node;
};
@@ -61,7 +61,7 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(&s->aio_getq);
+ nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
@@ -85,10 +85,10 @@ pair1_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
// Raw mode uses this.
- nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s);
nni_mtx_init(&s->mtx);
- if ((rv = nni_option_register("polyamorous", &poly)) != 0) {
+ if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
+ ((rv = nni_option_register("polyamorous", &poly)) != 0)) {
pair1_sock_fini(s);
return (rv);
}
@@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (0);
}
+static void
+pair1_pipe_fini(void *arg)
+{
+ pair1_pipe *p = arg;
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
@@ -113,14 +125,14 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(p);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ pair1_pipe_fini(p);
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
- nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p);
p->npipe = npipe;
p->psock = psock;
@@ -129,18 +141,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (rv);
}
-static void
-pair1_pipe_fini(void *arg)
-{
- pair1_pipe *p = arg;
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair1_pipe_start(void *arg)
{
@@ -163,7 +163,7 @@ pair1_pipe_start(void *arg)
}
} else {
if (!s->started) {
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
}
nni_list_append(&s->plist, p);
@@ -171,16 +171,16 @@ pair1_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
// Schedule a getq. In polyamorous mode we get on the per pipe
- // sendq, as the socket distributes to us. In monogamous mode we
- // bypass and get from the upper writeq directly (saving a set of
- // context switches).
+ // sendq, as the socket distributes to us. In monogamous mode
+ // we bypass and get from the upper writeq directly (saving a
+ // set of context switches).
if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
} else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
// And the pipe read of course.
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -197,10 +197,10 @@ pair1_pipe_stop(void *arg)
nni_mtx_unlock(&s->mtx);
nni_msgq_close(p->sendq);
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
}
static void
@@ -213,13 +213,13 @@ pair1_pipe_recv_cb(void *arg)
nni_pipe * npipe = p->npipe;
int rv;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe ID.
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
@@ -241,20 +241,20 @@ pair1_pipe_recv_cb(void *arg)
// keep getting more.
if (hdr > (unsigned) s->ttl) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Store the pipe id followed by the hop count.
if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Send the message up.
- p->aio_putq.a_msg = msg;
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -265,13 +265,13 @@ pair1_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Socket closing...
return;
}
- msg = s->aio_getq.a_msg;
- s->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// By definition we are in polyamorous mode.
NNI_ASSERT(s->poly);
@@ -289,7 +289,7 @@ pair1_sock_getq_cb(void *arg)
// Pipe not present!
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
return;
}
@@ -302,7 +302,7 @@ pair1_sock_getq_cb(void *arg)
}
nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
@@ -310,13 +310,13 @@ pair1_pipe_putq_cb(void *arg)
{
pair1_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -327,13 +327,13 @@ pair1_pipe_getq_cb(void *arg)
nni_msg * msg;
uint32_t hops;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_getq);
+ nni_aio_set_msg(p->aio_getq, NULL);
// Raw mode messages have the header already formed, with
// a hop count. Cooked mode messages have no
@@ -354,13 +354,13 @@ pair1_pipe_getq_cb(void *arg)
goto badmsg;
}
- p->aio_send.a_msg = msg;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->npipe, p->aio_send);
return;
badmsg:
nni_msg_free(msg);
- nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void
@@ -369,20 +369,16 @@ pair1_pipe_send_cb(void *arg)
pair1_pipe *p = arg;
pair1_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
// In polyamorous mode, we want to get from the sendq; in
// monogamous we get from upper writeq.
- if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
- } else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- }
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 1d738ec2..0d66aab8 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -15,208 +15,212 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct nni_pull_pipe nni_pull_pipe;
-typedef struct nni_pull_sock nni_pull_sock;
+typedef struct pull_pipe pull_pipe;
+typedef struct pull_sock pull_sock;
-static void nni_pull_putq_cb(void *);
-static void nni_pull_recv_cb(void *);
-static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
+static void pull_putq_cb(void *);
+static void pull_recv_cb(void *);
+static void pull_putq(pull_pipe *, nni_msg *);
-// An nni_pull_sock is our per-socket protocol private structure.
-struct nni_pull_sock {
+// A pull_sock is our per-socket protocol private structure.
+struct pull_sock {
nni_msgq *urq;
int raw;
};
-// An nni_pull_pipe is our per-pipe protocol private structure.
-struct nni_pull_pipe {
- nni_pipe * pipe;
- nni_pull_sock *pull;
- nni_aio putq_aio;
- nni_aio recv_aio;
+// A pull_pipe is our per-pipe protocol private structure.
+struct pull_pipe {
+ nni_pipe * pipe;
+ pull_sock *pull;
+ nni_aio * putq_aio;
+ nni_aio * recv_aio;
};
static int
-nni_pull_sock_init(void **pullp, nni_sock *sock)
+pull_sock_init(void **sp, nni_sock *sock)
{
- nni_pull_sock *pull;
+ pull_sock *s;
- if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- pull->raw = 0;
- pull->urq = nni_sock_recvq(sock);
- *pullp = pull;
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
nni_sock_senderr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_pull_sock_fini(void *arg)
+pull_sock_fini(void *arg)
{
- nni_pull_sock *pull = arg;
+ pull_sock *s = arg;
- NNI_FREE_STRUCT(pull);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+pull_pipe_fini(void *arg)
+{
+ pull_pipe *p = arg;
+
+ nni_aio_fini(p->putq_aio);
+ nni_aio_fini(p->recv_aio);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_pull_pipe *pp;
+ pull_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp);
- nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp);
+ if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) {
+ pull_pipe_fini(p);
+ return (rv);
+ }
- pp->pipe = pipe;
- pp->pull = psock;
- *ppp = pp;
+ p->pipe = pipe;
+ p->pull = s;
+ *pp = p;
return (0);
}
-static void
-nni_pull_pipe_fini(void *arg)
-{
- nni_pull_pipe *pp = arg;
-
- nni_aio_fini(&pp->putq_aio);
- nni_aio_fini(&pp->recv_aio);
- NNI_FREE_STRUCT(pp);
-}
-
static int
-nni_pull_pipe_start(void *arg)
+pull_pipe_start(void *arg)
{
- nni_pull_pipe *pp = arg;
+ pull_pipe *p = arg;
// Start the pending pull...
- nni_pipe_recv(pp->pipe, &pp->recv_aio);
+ nni_pipe_recv(p->pipe, p->recv_aio);
return (0);
}
static void
-nni_pull_pipe_stop(void *arg)
+pull_pipe_stop(void *arg)
{
- nni_pull_pipe *pp = arg;
+ pull_pipe *p = arg;
- nni_aio_stop(&pp->putq_aio);
- nni_aio_stop(&pp->recv_aio);
+ nni_aio_stop(p->putq_aio);
+ nni_aio_stop(p->recv_aio);
}
static void
-nni_pull_recv_cb(void *arg)
+pull_recv_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio * aio = &pp->recv_aio;
- nni_msg * msg;
+ pull_pipe *p = arg;
+ nni_aio * aio = p->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pipe_stop(pp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
// Got a message... start the put to send it up to the application.
- msg = aio->a_msg;
- aio->a_msg = NULL;
- nni_pull_putq(pp, msg);
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ pull_putq(p, msg);
}
static void
-nni_pull_putq_cb(void *arg)
+pull_putq_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio * aio = &pp->putq_aio;
+ pull_pipe *p = arg;
+ nni_aio * aio = p->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
// we can do. Just close the pipe.
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(pp->pipe, &pp->recv_aio);
+ nni_pipe_recv(p->pipe, p->recv_aio);
}
// nni_pull_putq schedules a put operation to the user socket (sendup).
static void
-nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
+pull_putq(pull_pipe *p, nni_msg *msg)
{
- nni_pull_sock *pull = pp->pull;
+ pull_sock *s = p->pull;
- pp->putq_aio.a_msg = msg;
+ nni_aio_set_msg(p->putq_aio, msg);
- nni_msgq_aio_put(pull->urq, &pp->putq_aio);
+ nni_msgq_aio_put(s->urq, p->putq_aio);
}
static void
-nni_pull_sock_open(void *arg)
+pull_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_pull_sock_close(void *arg)
+pull_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static int
-nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_pull_sock *pull = arg;
- int rv = NNG_ENOTSUP;
+ pull_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_pull_sock *pull = arg;
- int rv = NNG_ENOTSUP;
+ pull_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&pull->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_pull_pipe_ops = {
- .pipe_init = nni_pull_pipe_init,
- .pipe_fini = nni_pull_pipe_fini,
- .pipe_start = nni_pull_pipe_start,
- .pipe_stop = nni_pull_pipe_stop,
+static nni_proto_pipe_ops pull_pipe_ops = {
+ .pipe_init = pull_pipe_init,
+ .pipe_fini = pull_pipe_fini,
+ .pipe_start = pull_pipe_start,
+ .pipe_stop = pull_pipe_stop,
};
-static nni_proto_sock_ops nni_pull_sock_ops = {
- .sock_init = nni_pull_sock_init,
- .sock_fini = nni_pull_sock_fini,
- .sock_open = nni_pull_sock_open,
- .sock_close = nni_pull_sock_close,
- .sock_setopt = nni_pull_sock_setopt,
- .sock_getopt = nni_pull_sock_getopt,
+static nni_proto_sock_ops pull_sock_ops = {
+ .sock_init = pull_sock_init,
+ .sock_fini = pull_sock_fini,
+ .sock_open = pull_sock_open,
+ .sock_close = pull_sock_close,
+ .sock_setopt = pull_sock_setopt,
+ .sock_getopt = pull_sock_getopt,
};
-nni_proto nni_pull_proto = {
+static nni_proto pull_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PULL_V0, "pull" },
.proto_peer = { NNG_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_pipe_ops = &nni_pull_pipe_ops,
- .proto_sock_ops = &nni_pull_sock_ops,
+ .proto_pipe_ops = &pull_pipe_ops,
+ .proto_sock_ops = &pull_sock_ops,
};
int
nng_pull0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_pull_proto));
+ return (nni_proto_open(sidp, &pull_proto));
}
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 10d04091..5e32efee 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -17,228 +17,231 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct nni_push_pipe nni_push_pipe;
-typedef struct nni_push_sock nni_push_sock;
+typedef struct push_pipe push_pipe;
+typedef struct push_sock push_sock;
-static void nni_push_send_cb(void *);
-static void nni_push_recv_cb(void *);
-static void nni_push_getq_cb(void *);
+static void push_send_cb(void *);
+static void push_recv_cb(void *);
+static void push_getq_cb(void *);
// An nni_push_sock is our per-socket protocol private structure.
-struct nni_push_sock {
+struct push_sock {
nni_msgq *uwq;
int raw;
nni_sock *sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
-struct nni_push_pipe {
- nni_pipe * pipe;
- nni_push_sock *push;
- nni_list_node node;
-
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_getq;
+struct push_pipe {
+ nni_pipe * pipe;
+ push_sock * push;
+ nni_list_node node;
+
+ nni_aio *aio_recv;
+ nni_aio *aio_send;
+ nni_aio *aio_getq;
};
static int
-nni_push_sock_init(void **pushp, nni_sock *sock)
+push_sock_init(void **sp, nni_sock *sock)
{
- nni_push_sock *push;
+ push_sock *s;
- if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- push->raw = 0;
- push->sock = sock;
- push->uwq = nni_sock_sendq(sock);
- *pushp = push;
+ s->raw = 0;
+ s->sock = sock;
+ s->uwq = nni_sock_sendq(sock);
+ *sp = s;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_push_sock_fini(void *arg)
+push_sock_fini(void *arg)
{
- nni_push_sock *push = arg;
+ push_sock *s = arg;
- NNI_FREE_STRUCT(push);
+ NNI_FREE_STRUCT(s);
}
static void
-nni_push_sock_open(void *arg)
+push_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_push_sock_close(void *arg)
+push_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_push_pipe_fini(void *arg)
+push_pipe_fini(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
- nni_aio_fini(&pp->aio_recv);
- nni_aio_fini(&pp->aio_send);
- nni_aio_fini(&pp->aio_getq);
- NNI_FREE_STRUCT(pp);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+push_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_push_pipe *pp;
+ push_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp);
- nni_aio_init(&pp->aio_send, nni_push_send_cb, pp);
- nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp);
-
- NNI_LIST_NODE_INIT(&pp->node);
- pp->pipe = pipe;
- pp->push = psock;
- *ppp = pp;
+ if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) {
+ push_pipe_fini(p);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&p->node);
+ p->pipe = pipe;
+ p->push = s;
+ *pp = p;
return (0);
}
static int
-nni_push_pipe_start(void *arg)
+push_pipe_start(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ push_pipe *p = arg;
+ push_sock *s = p->push;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_PULL) {
return (NNG_EPROTO);
}
// Schedule a receiver. This is mostly so that we can detect
// a closed transport pipe.
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
// Schedule a sender.
- nni_msgq_aio_get(push->uwq, &pp->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
return (0);
}
static void
-nni_push_pipe_stop(void *arg)
+push_pipe_stop(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
- nni_aio_stop(&pp->aio_recv);
- nni_aio_stop(&pp->aio_send);
- nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_getq);
}
static void
-nni_push_recv_cb(void *arg)
+push_recv_cb(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
- if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msg_free(pp->aio_recv.a_msg);
- pp->aio_recv.a_msg = NULL;
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_push_send_cb(void *arg)
+push_send_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ push_pipe *p = arg;
+ push_sock *s = p->push;
- if (nni_aio_result(&pp->aio_send) != 0) {
- nni_msg_free(pp->aio_send.a_msg);
- pp->aio_send.a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msgq_aio_get(push->uwq, &pp->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void
-nni_push_getq_cb(void *arg)
+push_getq_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_aio * aio = &pp->aio_getq;
+ push_pipe *p = arg;
+ nni_aio * aio = p->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_pipe_stop(pp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = aio->a_msg;
- aio->a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
- nni_pipe_send(pp->pipe, &pp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static int
-nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_push_sock *push = arg;
- int rv = NNG_ENOTSUP;
+ push_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&push->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_push_sock *push = arg;
- int rv = NNG_ENOTSUP;
+ push_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&push->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_push_pipe_ops = {
- .pipe_init = nni_push_pipe_init,
- .pipe_fini = nni_push_pipe_fini,
- .pipe_start = nni_push_pipe_start,
- .pipe_stop = nni_push_pipe_stop,
+static nni_proto_pipe_ops push_pipe_ops = {
+ .pipe_init = push_pipe_init,
+ .pipe_fini = push_pipe_fini,
+ .pipe_start = push_pipe_start,
+ .pipe_stop = push_pipe_stop,
};
-static nni_proto_sock_ops nni_push_sock_ops = {
- .sock_init = nni_push_sock_init,
- .sock_fini = nni_push_sock_fini,
- .sock_open = nni_push_sock_open,
- .sock_close = nni_push_sock_close,
- .sock_setopt = nni_push_sock_setopt,
- .sock_getopt = nni_push_sock_getopt,
+static nni_proto_sock_ops push_sock_ops = {
+ .sock_init = push_sock_init,
+ .sock_fini = push_sock_fini,
+ .sock_open = push_sock_open,
+ .sock_close = push_sock_close,
+ .sock_setopt = push_sock_setopt,
+ .sock_getopt = push_sock_getopt,
};
-nni_proto nni_push_proto = {
+static nni_proto push_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PUSH_V0, "push" },
.proto_peer = { NNG_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_pipe_ops = &nni_push_pipe_ops,
- .proto_sock_ops = &nni_push_sock_ops,
+ .proto_pipe_ops = &push_pipe_ops,
+ .proto_sock_ops = &push_sock_ops,
};
int
nng_push0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_push_proto));
+ return (nni_proto_open(sidp, &push_proto));
}
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index bbca1ecd..b7ac361e 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -18,183 +18,188 @@
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct nni_pub_pipe nni_pub_pipe;
-typedef struct nni_pub_sock nni_pub_sock;
-
-static void nni_pub_pipe_recv_cb(void *);
-static void nni_pub_pipe_send_cb(void *);
-static void nni_pub_pipe_getq_cb(void *);
-static void nni_pub_sock_getq_cb(void *);
-static void nni_pub_sock_fini(void *);
-static void nni_pub_pipe_fini(void *);
-
-// An nni_pub_sock is our per-socket protocol private structure.
-struct nni_pub_sock {
+typedef struct pub_pipe pub_pipe;
+typedef struct pub_sock pub_sock;
+
+static void pub_pipe_recv_cb(void *);
+static void pub_pipe_send_cb(void *);
+static void pub_pipe_getq_cb(void *);
+static void pub_sock_getq_cb(void *);
+static void pub_sock_fini(void *);
+static void pub_pipe_fini(void *);
+
+// A pub_sock is our per-socket protocol private structure.
+struct pub_sock {
nni_sock *sock;
nni_msgq *uwq;
int raw;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
};
-// An nni_pub_pipe is our per-pipe protocol private structure.
-struct nni_pub_pipe {
+// A pub_pipe is our per-pipe protocol private structure.
+struct pub_pipe {
nni_pipe * pipe;
- nni_pub_sock *pub;
+ pub_sock * pub;
nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
nni_list_node node;
};
+static void
+pub_sock_fini(void *arg)
+{
+ pub_sock *s = arg;
+
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
static int
-nni_pub_sock_init(void **pubp, nni_sock *sock)
+pub_sock_init(void **sp, nni_sock *sock)
{
- nni_pub_sock *pub;
+ pub_sock *s;
+ int rv;
- if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&pub->mtx);
- nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) {
+ pub_sock_fini(s);
+ return (rv);
+ }
- pub->sock = sock;
- pub->raw = 0;
- NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
+ s->sock = sock;
+ s->raw = 0;
+ NNI_LIST_INIT(&s->pipes, pub_pipe, node);
- pub->uwq = nni_sock_sendq(sock);
+ s->uwq = nni_sock_sendq(sock);
- *pubp = pub;
+ *sp = s;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_pub_sock_fini(void *arg)
+pub_sock_open(void *arg)
{
- nni_pub_sock *pub = arg;
+ pub_sock *s = arg;
- nni_aio_stop(&pub->aio_getq);
- nni_aio_fini(&pub->aio_getq);
- nni_mtx_fini(&pub->mtx);
- NNI_FREE_STRUCT(pub);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_pub_sock_open(void *arg)
+pub_sock_close(void *arg)
{
- nni_pub_sock *pub = arg;
+ pub_sock *s = arg;
- nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_pub_sock_close(void *arg)
+pub_pipe_fini(void *arg)
{
- nni_pub_sock *pub = arg;
-
- nni_aio_cancel(&pub->aio_getq, NNG_ECLOSED);
-}
-
-static void
-nni_pub_pipe_fini(void *arg)
-{
- nni_pub_pipe *pp = arg;
- nni_aio_fini(&pp->aio_getq);
- nni_aio_fini(&pp->aio_send);
- nni_aio_fini(&pp->aio_recv);
- nni_msgq_fini(pp->sendq);
- NNI_FREE_STRUCT(pp);
+ pub_pipe *p = arg;
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_pub_pipe *pp;
- int rv;
+ pub_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+
// XXX: consider making this depth tunable
- if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(pp);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) {
+
+ pub_pipe_fini(p);
return (rv);
}
- nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
- nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
- nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
-
- pp->pipe = pipe;
- pp->pub = psock;
- *ppp = pp;
+ p->pipe = pipe;
+ p->pub = s;
+ *pp = p;
return (0);
}
static int
-nni_pub_pipe_start(void *arg)
+pub_pipe_start(void *arg)
{
- nni_pub_pipe *pp = arg;
- nni_pub_sock *pub = pp->pub;
+ pub_pipe *p = arg;
+ pub_sock *s = p->pub;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&pub->mtx);
- nni_list_append(&pub->pipes, pp);
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
// Start the receiver and the queue reader.
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
- nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
return (0);
}
static void
-nni_pub_pipe_stop(void *arg)
+pub_pipe_stop(void *arg)
{
- nni_pub_pipe *pp = arg;
- nni_pub_sock *pub = pp->pub;
+ pub_pipe *p = arg;
+ pub_sock *s = p->pub;
- nni_aio_stop(&pp->aio_getq);
- nni_aio_stop(&pp->aio_send);
- nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
- nni_msgq_close(pp->sendq);
+ nni_msgq_close(p->sendq);
- nni_mtx_lock(&pub->mtx);
- if (nni_list_active(&pub->pipes, pp)) {
- nni_list_remove(&pub->pipes, pp);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_pub_sock_getq_cb(void *arg)
+pub_sock_getq_cb(void *arg)
{
- nni_pub_sock *pub = arg;
- nni_msgq * uwq = pub->uwq;
- nni_msg * msg, *dup;
+ pub_sock *s = arg;
+ nni_msgq *uwq = s->uwq;
+ nni_msg * msg, *dup;
- nni_pub_pipe *pp;
- nni_pub_pipe *last;
- int rv;
+ pub_pipe *p;
+ pub_pipe *last;
+ int rv;
- if (nni_aio_result(&pub->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = pub->aio_getq.a_msg;
- pub->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
- nni_mtx_lock(&pub->mtx);
- last = nni_list_last(&pub->pipes);
- NNI_LIST_FOREACH (&pub->pipes, pp) {
- if (pp != last) {
+ nni_mtx_lock(&s->mtx);
+ last = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != last) {
rv = nni_msg_dup(&dup, msg);
if (rv != 0) {
continue;
@@ -202,119 +207,117 @@ nni_pub_sock_getq_cb(void *arg)
} else {
dup = msg;
}
- if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) {
+ if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_unlock(&s->mtx);
if (last == NULL) {
nni_msg_free(msg);
}
- nni_msgq_aio_get(uwq, &pub->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
}
static void
-nni_pub_pipe_recv_cb(void *arg)
+pub_pipe_recv_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msg_free(pp->aio_recv.a_msg);
- pp->aio_recv.a_msg = NULL;
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_pub_pipe_getq_cb(void *arg)
+pub_pipe_getq_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_getq) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = pp->aio_getq.a_msg;
- pp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(pp->pipe, &pp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static void
-nni_pub_pipe_send_cb(void *arg)
+pub_pipe_send_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_send) != 0) {
- nni_msg_free(pp->aio_send.a_msg);
- pp->aio_send.a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = NULL;
- nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static int
-nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_pub_sock *pub = arg;
- int rv = NNG_ENOTSUP;
+ pub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_pub_sock *pub = arg;
- int rv = NNG_ENOTSUP;
+ pub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&pub->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_pub_pipe_ops = {
- .pipe_init = nni_pub_pipe_init,
- .pipe_fini = nni_pub_pipe_fini,
- .pipe_start = nni_pub_pipe_start,
- .pipe_stop = nni_pub_pipe_stop,
+static nni_proto_pipe_ops pub_pipe_ops = {
+ .pipe_init = pub_pipe_init,
+ .pipe_fini = pub_pipe_fini,
+ .pipe_start = pub_pipe_start,
+ .pipe_stop = pub_pipe_stop,
};
-nni_proto_sock_ops nni_pub_sock_ops = {
- .sock_init = nni_pub_sock_init,
- .sock_fini = nni_pub_sock_fini,
- .sock_open = nni_pub_sock_open,
- .sock_close = nni_pub_sock_close,
- .sock_setopt = nni_pub_sock_setopt,
- .sock_getopt = nni_pub_sock_getopt,
+static nni_proto_sock_ops pub_sock_ops = {
+ .sock_init = pub_sock_init,
+ .sock_fini = pub_sock_fini,
+ .sock_open = pub_sock_open,
+ .sock_close = pub_sock_close,
+ .sock_setopt = pub_sock_setopt,
+ .sock_getopt = pub_sock_getopt,
};
-nni_proto nni_pub_proto = {
+static nni_proto pub_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PUB_V0, "pub" },
.proto_peer = { NNG_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_sock_ops = &nni_pub_sock_ops,
- .proto_pipe_ops = &nni_pub_pipe_ops,
+ .proto_sock_ops = &pub_sock_ops,
+ .proto_pipe_ops = &pub_pipe_ops,
};
int
nng_pub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_pub_proto));
+ return (nni_proto_open(sidp, &pub_proto));
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 0563b764..0dbad081 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -17,22 +17,22 @@
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct nni_sub_pipe nni_sub_pipe;
-typedef struct nni_sub_sock nni_sub_sock;
-typedef struct nni_sub_topic nni_sub_topic;
+typedef struct sub_pipe sub_pipe;
+typedef struct sub_sock sub_sock;
+typedef struct sub_topic sub_topic;
-static void nni_sub_recv_cb(void *);
-static void nni_sub_putq_cb(void *);
-static void nni_sub_pipe_fini(void *);
+static void sub_recv_cb(void *);
+static void sub_putq_cb(void *);
+static void sub_pipe_fini(void *);
-struct nni_sub_topic {
+struct sub_topic {
nni_list_node node;
size_t len;
void * buf;
};
// An nni_rep_sock is our per-socket protocol private structure.
-struct nni_sub_sock {
+struct sub_sock {
nni_sock *sock;
nni_list topics;
nni_msgq *urq;
@@ -40,132 +40,136 @@ struct nni_sub_sock {
};
// An nni_rep_pipe is our per-pipe protocol private structure.
-struct nni_sub_pipe {
- nni_pipe * pipe;
- nni_sub_sock *sub;
- nni_aio aio_recv;
- nni_aio aio_putq;
+struct sub_pipe {
+ nni_pipe *pipe;
+ sub_sock *sub;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static int
-nni_sub_sock_init(void **subp, nni_sock *sock)
+sub_sock_init(void **sp, nni_sock *sock)
{
- nni_sub_sock *sub;
+ sub_sock *s;
- if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
- sub->sock = sock;
- sub->raw = 0;
+ NNI_LIST_INIT(&s->topics, sub_topic, node);
+ s->sock = sock;
+ s->raw = 0;
- sub->urq = nni_sock_recvq(sock);
+ s->urq = nni_sock_recvq(sock);
nni_sock_senderr(sock, NNG_ENOTSUP);
- *subp = sub;
+ *sp = s;
return (0);
}
static void
-nni_sub_sock_fini(void *arg)
+sub_sock_fini(void *arg)
{
- nni_sub_sock * sub = arg;
- nni_sub_topic *topic;
+ sub_sock * s = arg;
+ sub_topic *topic;
- while ((topic = nni_list_first(&sub->topics)) != NULL) {
- nni_list_remove(&sub->topics, topic);
+ while ((topic = nni_list_first(&s->topics)) != NULL) {
+ nni_list_remove(&s->topics, topic);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
}
- NNI_FREE_STRUCT(sub);
+ NNI_FREE_STRUCT(s);
}
static void
-nni_sub_sock_open(void *arg)
+sub_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_sub_sock_close(void *arg)
+sub_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
+static void
+sub_pipe_fini(void *arg)
+{
+ sub_pipe *p = arg;
+
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_recv);
+ NNI_FREE_STRUCT(p);
+}
+
static int
-nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
+sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_sub_pipe *sp;
+ sub_pipe *p;
+ int rv;
- if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp);
- nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp);
+ if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) {
+ sub_pipe_fini(p);
+ return (rv);
+ }
- sp->pipe = pipe;
- sp->sub = ssock;
- *spp = sp;
+ p->pipe = pipe;
+ p->sub = s;
+ *pp = p;
return (0);
}
-static void
-nni_sub_pipe_fini(void *arg)
-{
- nni_sub_pipe *sp = arg;
-
- nni_aio_fini(&sp->aio_putq);
- nni_aio_fini(&sp->aio_recv);
- NNI_FREE_STRUCT(sp);
-}
-
static int
-nni_sub_pipe_start(void *arg)
+sub_pipe_start(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- nni_pipe_recv(sp->pipe, &sp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_sub_pipe_stop(void *arg)
+sub_pipe_stop(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- nni_aio_stop(&sp->aio_putq);
- nni_aio_stop(&sp->aio_recv);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
}
static void
-nni_sub_recv_cb(void *arg)
+sub_recv_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
- nni_sub_sock *sub = sp->sub;
- nni_msgq * urq = sub->urq;
+ sub_pipe *p = arg;
+ sub_sock *s = p->sub;
+ nni_msgq *urq = s->urq;
- if (nni_aio_result(&sp->aio_recv) != 0) {
- nni_pipe_stop(sp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- sp->aio_putq.a_msg = sp->aio_recv.a_msg;
- sp->aio_recv.a_msg = NULL;
- nni_msgq_aio_put(sub->urq, &sp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msgq_aio_put(urq, p->aio_putq);
}
static void
-nni_sub_putq_cb(void *arg)
+sub_putq_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- if (nni_aio_result(&sp->aio_putq) != 0) {
- nni_msg_free(sp->aio_putq.a_msg);
- sp->aio_putq.a_msg = NULL;
- nni_pipe_stop(sp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(sp->pipe, &sp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
// For now we maintain subscriptions on a sorted linked list. As we do not
@@ -174,12 +178,12 @@ nni_sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+sub_subscribe(sub_sock *s, const void *buf, size_t sz)
{
- nni_sub_topic *topic;
- nni_sub_topic *newtopic;
+ sub_topic *topic;
+ sub_topic *newtopic;
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
int rv;
if (topic->len >= sz) {
@@ -210,20 +214,20 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
newtopic->len = sz;
memcpy(newtopic->buf, buf, sz);
if (topic != NULL) {
- nni_list_insert_before(&sub->topics, newtopic, topic);
+ nni_list_insert_before(&s->topics, newtopic, topic);
} else {
- nni_list_append(&sub->topics, newtopic);
+ nni_list_append(&s->topics, newtopic);
}
return (0);
}
static int
-nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+sub_unsubscribe(sub_sock *s, const void *buf, size_t sz)
{
- nni_sub_topic *topic;
- int rv;
+ sub_topic *topic;
+ int rv;
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
if (topic->len >= sz) {
rv = memcmp(topic->buf, buf, sz);
} else {
@@ -231,7 +235,7 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
if (rv == 0) {
if (topic->len == sz) {
- nni_list_remove(&sub->topics, topic);
+ nni_list_remove(&s->topics, topic);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
return (0);
@@ -248,43 +252,43 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
static int
-nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_sub_sock *sub = arg;
- int rv = NNG_ENOTSUP;
+ sub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
} else if (opt == nng_optid_sub_subscribe) {
- rv = nni_sub_subscribe(sub, buf, sz);
+ rv = sub_subscribe(s, buf, sz);
} else if (opt == nng_optid_sub_unsubscribe) {
- rv = nni_sub_unsubscribe(sub, buf, sz);
+ rv = sub_unsubscribe(s, buf, sz);
}
return (rv);
}
static int
-nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_sub_sock *sub = arg;
- int rv = NNG_ENOTSUP;
+ sub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&sub->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_sub_sock_rfilter(void *arg, nni_msg *msg)
+sub_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_sub_sock * sub = arg;
- nni_sub_topic *topic;
- char * body;
- size_t len;
- int match;
+ sub_sock * s = arg;
+ sub_topic *topic;
+ char * body;
+ size_t len;
+ int match;
- if (sub->raw) {
+ if (s->raw) {
return (msg);
}
@@ -293,7 +297,7 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
match = 0;
// Check to see if the message matches one of our subscriptions.
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
if (len >= topic->len) {
int rv = memcmp(topic->buf, body, topic->len);
if (rv == 0) {
@@ -319,34 +323,34 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_sub_pipe_ops = {
- .pipe_init = nni_sub_pipe_init,
- .pipe_fini = nni_sub_pipe_fini,
- .pipe_start = nni_sub_pipe_start,
- .pipe_stop = nni_sub_pipe_stop,
+static nni_proto_pipe_ops sub_pipe_ops = {
+ .pipe_init = sub_pipe_init,
+ .pipe_fini = sub_pipe_fini,
+ .pipe_start = sub_pipe_start,
+ .pipe_stop = sub_pipe_stop,
};
-static nni_proto_sock_ops nni_sub_sock_ops = {
- .sock_init = nni_sub_sock_init,
- .sock_fini = nni_sub_sock_fini,
- .sock_open = nni_sub_sock_open,
- .sock_close = nni_sub_sock_close,
- .sock_setopt = nni_sub_sock_setopt,
- .sock_getopt = nni_sub_sock_getopt,
- .sock_rfilter = nni_sub_sock_rfilter,
+static nni_proto_sock_ops sub_sock_ops = {
+ .sock_init = sub_sock_init,
+ .sock_fini = sub_sock_fini,
+ .sock_open = sub_sock_open,
+ .sock_close = sub_sock_close,
+ .sock_setopt = sub_sock_setopt,
+ .sock_getopt = sub_sock_getopt,
+ .sock_rfilter = sub_sock_rfilter,
};
-nni_proto nni_sub_proto = {
+static nni_proto sub_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_SUB_V0, "sub" },
.proto_peer = { NNG_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &nni_sub_sock_ops,
- .proto_pipe_ops = &nni_sub_pipe_ops,
+ .proto_sock_ops = &sub_sock_ops,
+ .proto_pipe_ops = &sub_pipe_ops,
};
int
nng_sub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_sub_proto));
+ return (nni_proto_open(sidp, &sub_proto));
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index cd33d019..14a3e46b 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -17,18 +17,18 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct nni_rep_pipe nni_rep_pipe;
-typedef struct nni_rep_sock nni_rep_sock;
-
-static void nni_rep_sock_getq_cb(void *);
-static void nni_rep_pipe_getq_cb(void *);
-static void nni_rep_pipe_putq_cb(void *);
-static void nni_rep_pipe_send_cb(void *);
-static void nni_rep_pipe_recv_cb(void *);
-static void nni_rep_pipe_fini(void *);
-
-// An nni_rep_sock is our per-socket protocol private structure.
-struct nni_rep_sock {
+typedef struct rep_pipe rep_pipe;
+typedef struct rep_sock rep_sock;
+
+static void rep_sock_getq_cb(void *);
+static void rep_pipe_getq_cb(void *);
+static void rep_pipe_putq_cb(void *);
+static void rep_pipe_send_cb(void *);
+static void rep_pipe_recv_cb(void *);
+static void rep_pipe_fini(void *);
+
+// A rep_sock is our per-socket protocol private structure.
+struct rep_sock {
nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
@@ -37,179 +37,176 @@ struct nni_rep_sock {
nni_idhash *pipes;
char * btrace;
size_t btrace_len;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
-struct nni_rep_pipe {
- nni_pipe * pipe;
- nni_rep_sock *rep;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_putq;
+// A rep_pipe is our per-pipe protocol private structure.
+struct rep_pipe {
+ nni_pipe *pipe;
+ rep_sock *rep;
+ nni_msgq *sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static void
-nni_rep_sock_fini(void *arg)
+rep_sock_fini(void *arg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
- nni_aio_stop(&rep->aio_getq);
- nni_aio_fini(&rep->aio_getq);
- nni_idhash_fini(rep->pipes);
- if (rep->btrace != NULL) {
- nni_free(rep->btrace, rep->btrace_len);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
}
- NNI_FREE_STRUCT(rep);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_rep_sock_init(void **repp, nni_sock *sock)
+rep_sock_init(void **sp, nni_sock *sock)
{
- nni_rep_sock *rep;
- int rv;
+ rep_sock *s;
+ int rv;
- if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
- NNI_FREE_STRUCT(rep);
+ if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
+ ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
+ rep_sock_fini(s);
return (rv);
}
- rep->ttl = 8; // Per RFC
- rep->sock = sock;
- rep->raw = 0;
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ s->ttl = 8; // Per RFC
+ s->sock = sock;
+ s->raw = 0;
+ s->btrace = NULL;
+ s->btrace_len = 0;
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
- nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep);
-
- rep->uwq = nni_sock_sendq(sock);
- rep->urq = nni_sock_recvq(sock);
-
- *repp = rep;
+ *sp = s;
nni_sock_senderr(sock, NNG_ESTATE);
return (0);
}
static void
-nni_rep_sock_open(void *arg)
+rep_sock_open(void *arg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
- nni_msgq_aio_get(rep->uwq, &rep->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_rep_sock_close(void *arg)
+rep_sock_close(void *arg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
+
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
+}
- nni_aio_cancel(&rep->aio_getq, NNG_ECLOSED);
+static void
+rep_pipe_fini(void *arg)
+{
+ rep_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
+rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_rep_pipe *rp;
- int rv;
+ rep_pipe *p;
+ int rv;
- if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(rp);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) {
+ rep_pipe_fini(p);
return (rv);
}
- nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp);
- nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp);
- nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp);
- nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp);
-
- rp->pipe = pipe;
- rp->rep = rsock;
- *rpp = rp;
- return (0);
-}
-static void
-nni_rep_pipe_fini(void *arg)
-{
- nni_rep_pipe *rp = arg;
-
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_send);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_putq);
- nni_msgq_fini(rp->sendq);
- NNI_FREE_STRUCT(rp);
+ p->pipe = pipe;
+ p->rep = s;
+ *pp = p;
+ return (0);
}
static int
-nni_rep_pipe_start(void *arg)
+rep_pipe_start(void *arg)
{
- nni_rep_pipe *rp = arg;
- nni_rep_sock *rep = rp->rep;
- int rv;
+ rep_pipe *p = arg;
+ rep_sock *s = p->rep;
+ int rv;
- rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
- if (rv != 0) {
+ if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
- nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_rep_pipe_stop(void *arg)
+rep_pipe_stop(void *arg)
{
- nni_rep_pipe *rp = arg;
- nni_rep_sock *rep = rp->rep;
+ rep_pipe *p = arg;
+ rep_sock *s = p->rep;
- nni_msgq_close(rp->sendq);
- nni_aio_stop(&rp->aio_getq);
- nni_aio_stop(&rp->aio_send);
- nni_aio_stop(&rp->aio_recv);
- nni_aio_stop(&rp->aio_putq);
+ nni_msgq_close(p->sendq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
}
static void
-nni_rep_sock_getq_cb(void *arg)
+rep_sock_getq_cb(void *arg)
{
- nni_rep_sock *rep = arg;
- nni_msgq * uwq = rep->uwq;
- nni_msg * msg;
- uint32_t id;
- nni_rep_pipe *rp;
- int rv;
+ rep_sock *s = arg;
+ nni_msgq *uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ rep_pipe *p;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.
- if (nni_aio_result(&rep->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Closed socket?
return;
}
- msg = rep->aio_getq.a_msg;
- rep->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// We yank the outgoing pipe id from the header
if (nni_msg_header_len(msg) < 4) {
nni_msg_free(msg);
// Look for another message on the upper write queue.
- nni_msgq_aio_get(uwq, &rep->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
return;
}
@@ -218,69 +215,68 @@ nni_rep_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- rv = nni_idhash_find(rep->pipes, id, (void **) &rp);
- if (rv == 0) {
- rv = nni_msgq_tryput(rp->sendq, msg);
+ if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
+ rv = nni_msgq_tryput(p->sendq, msg);
}
if (rv != 0) {
nni_msg_free(msg);
}
// Now look for another message on the upper write queue.
- nni_msgq_aio_get(uwq, &rep->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
}
static void
-nni_rep_pipe_getq_cb(void *arg)
+rep_pipe_getq_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ rep_pipe *p = arg;
- if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- rp->aio_send.a_msg = rp->aio_getq.a_msg;
- rp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(rp->pipe, &rp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static void
-nni_rep_pipe_send_cb(void *arg)
+rep_pipe_send_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ rep_pipe *p = arg;
- if (nni_aio_result(&rp->aio_send) != 0) {
- nni_msg_free(rp->aio_send.a_msg);
- rp->aio_send.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_rep_pipe_recv_cb(void *arg)
+rep_pipe_recv_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
- nni_rep_sock *rep = rp->rep;
- nni_msg * msg;
- int rv;
- uint8_t * body;
- int hops;
-
- if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_pipe_stop(rp->pipe);
+ rep_pipe *p = arg;
+ rep_sock *s = p->rep;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+ int hops;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- msg = rp->aio_recv.a_msg;
- rp->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe id in the header, first thing.
- rv = nni_msg_header_append_u32(msg, nni_pipe_id(rp->pipe));
+ rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
if (rv != 0) {
// Failure here causes us to drop the message.
goto drop;
@@ -290,7 +286,7 @@ nni_rep_pipe_recv_cb(void *arg)
hops = 1;
for (;;) {
int end = 0;
- if (hops >= rep->ttl) {
+ if (hops >= s->ttl) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
// can legitimately receive messages with too many
@@ -300,7 +296,7 @@ nni_rep_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_pipe_stop(rp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
body = nni_msg_body(msg);
@@ -320,74 +316,74 @@ nni_rep_pipe_recv_cb(void *arg)
}
// Go ahead and send it up.
- rp->aio_putq.a_msg = msg;
- nni_msgq_aio_put(rp->rep->urq, &rp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
return;
drop:
nni_msg_free(msg);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_rep_pipe_putq_cb(void *arg)
+rep_pipe_putq_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ rep_pipe *p = arg;
- if (nni_aio_result(&rp->aio_putq) != 0) {
- nni_msg_free(rp->aio_putq.a_msg);
- rp->aio_putq.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static int
-nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_rep_sock *rep = arg;
- int rv = NNG_ENOTSUP;
+ rep_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_maxttl) {
- rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255);
} else if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1);
- nni_sock_senderr(rep->sock, rep->raw ? 0 : NNG_ESTATE);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ nni_sock_senderr(s->sock, s->raw ? 0 : NNG_ESTATE);
}
return (rv);
}
static int
-nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_rep_sock *rep = arg;
- int rv = NNG_ENOTSUP;
+ rep_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_maxttl) {
- rv = nni_getopt_int(&rep->ttl, buf, szp);
+ rv = nni_getopt_int(&s->ttl, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&rep->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_rep_sock_sfilter(void *arg, nni_msg *msg)
+rep_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
- if (rep->raw) {
+ if (s->raw) {
return (msg);
}
// Cannot send again until a receive is done...
- nni_sock_senderr(rep->sock, NNG_ESTATE);
+ nni_sock_senderr(s->sock, NNG_ESTATE);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
- if (rep->btrace == NULL) {
+ if (s->btrace == NULL) {
nni_msg_free(msg);
return (NULL);
}
@@ -395,76 +391,76 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_header_clear(msg);
- if (nni_msg_header_append(msg, rep->btrace, rep->btrace_len) != 0) {
- nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
- nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
return (msg);
}
static nni_msg *
-nni_rep_sock_rfilter(void *arg, nni_msg *msg)
+rep_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_rep_sock *rep = arg;
- char * header;
- size_t len;
+ rep_sock *s = arg;
+ char * header;
+ size_t len;
- if (rep->raw) {
+ if (s->raw) {
return (msg);
}
- nni_sock_senderr(rep->sock, 0);
+ nni_sock_senderr(s->sock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
- if (rep->btrace != NULL) {
- nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
}
- if ((rep->btrace = nni_alloc(len)) == NULL) {
+ if ((s->btrace = nni_alloc(len)) == NULL) {
nni_msg_free(msg);
return (NULL);
}
- rep->btrace_len = len;
- memcpy(rep->btrace, header, len);
+ s->btrace_len = len;
+ memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
return (msg);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_rep_pipe_ops = {
- .pipe_init = nni_rep_pipe_init,
- .pipe_fini = nni_rep_pipe_fini,
- .pipe_start = nni_rep_pipe_start,
- .pipe_stop = nni_rep_pipe_stop,
+static nni_proto_pipe_ops rep_pipe_ops = {
+ .pipe_init = rep_pipe_init,
+ .pipe_fini = rep_pipe_fini,
+ .pipe_start = rep_pipe_start,
+ .pipe_stop = rep_pipe_stop,
};
-static nni_proto_sock_ops nni_rep_sock_ops = {
- .sock_init = nni_rep_sock_init,
- .sock_fini = nni_rep_sock_fini,
- .sock_open = nni_rep_sock_open,
- .sock_close = nni_rep_sock_close,
- .sock_setopt = nni_rep_sock_setopt,
- .sock_getopt = nni_rep_sock_getopt,
- .sock_rfilter = nni_rep_sock_rfilter,
- .sock_sfilter = nni_rep_sock_sfilter,
+static nni_proto_sock_ops rep_sock_ops = {
+ .sock_init = rep_sock_init,
+ .sock_fini = rep_sock_fini,
+ .sock_open = rep_sock_open,
+ .sock_close = rep_sock_close,
+ .sock_setopt = rep_sock_setopt,
+ .sock_getopt = rep_sock_getopt,
+ .sock_rfilter = rep_sock_rfilter,
+ .sock_sfilter = rep_sock_sfilter,
};
-nni_proto nni_rep_proto = {
+static nni_proto nni_rep_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_REP_V0, "rep" },
.proto_peer = { NNG_PROTO_REQ_V0, "req" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_rep_sock_ops,
- .proto_pipe_ops = &nni_rep_pipe_ops,
+ .proto_sock_ops = &rep_sock_ops,
+ .proto_pipe_ops = &rep_pipe_ops,
};
int
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 2579417a..5da5003f 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -18,15 +18,15 @@
// request-reply pair. This is useful for building RPC clients, for
// example.
-typedef struct nni_req_pipe nni_req_pipe;
-typedef struct nni_req_sock nni_req_sock;
+typedef struct req_pipe req_pipe;
+typedef struct req_sock req_sock;
-static void nni_req_resend(nni_req_sock *);
-static void nni_req_timeout(void *);
-static void nni_req_pipe_fini(void *);
+static void req_resend(req_sock *);
+static void req_timeout(void *);
+static void req_pipe_fini(void *);
-// An nni_req_sock is our per-socket protocol private structure.
-struct nni_req_sock {
+// A req_sock is our per-socket protocol private structure.
+struct req_sock {
nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
@@ -38,7 +38,7 @@ struct nni_req_sock {
int ttl;
nni_msg * reqmsg;
- nni_req_pipe *pendpipe;
+ req_pipe *pendpipe;
nni_list readypipes;
nni_list busypipes;
@@ -51,230 +51,235 @@ struct nni_req_sock {
nni_cv cv;
};
-// An nni_req_pipe is our per-pipe protocol private structure.
-struct nni_req_pipe {
+// A req_pipe is our per-pipe protocol private structure.
+struct req_pipe {
nni_pipe * pipe;
- nni_req_sock *req;
+ req_sock * req;
nni_list_node node;
- nni_aio aio_getq; // raw mode only
- nni_aio aio_sendraw; // raw mode only
- nni_aio aio_sendcooked; // cooked mode only
- nni_aio aio_recv;
- nni_aio aio_putq;
+ nni_aio * aio_getq; // raw mode only
+ nni_aio * aio_sendraw; // raw mode only
+ nni_aio * aio_sendcooked; // cooked mode only
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
nni_mtx mtx;
};
-static void nni_req_resender(void *);
-static void nni_req_getq_cb(void *);
-static void nni_req_sendraw_cb(void *);
-static void nni_req_sendcooked_cb(void *);
-static void nni_req_recv_cb(void *);
-static void nni_req_putq_cb(void *);
+static void req_resender(void *);
+static void req_getq_cb(void *);
+static void req_sendraw_cb(void *);
+static void req_sendcooked_cb(void *);
+static void req_recv_cb(void *);
+static void req_putq_cb(void *);
static int
-nni_req_sock_init(void **reqp, nni_sock *sock)
+req_sock_init(void **sp, nni_sock *sock)
{
- nni_req_sock *req;
+ req_sock *s;
- if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&req->mtx);
- nni_cv_init(&req->cv, &req->mtx);
+ nni_mtx_init(&s->mtx);
+ nni_cv_init(&s->cv, &s->mtx);
- NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
- NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
- nni_timer_init(&req->timer, nni_req_timeout, req);
+ NNI_LIST_INIT(&s->readypipes, req_pipe, node);
+ NNI_LIST_INIT(&s->busypipes, req_pipe, node);
+ nni_timer_init(&s->timer, req_timeout, s);
// this is "semi random" start for request IDs.
- req->nextid = nni_random();
-
- req->retry = NNI_SECOND * 60;
- req->sock = sock;
- req->reqmsg = NULL;
- req->raw = 0;
- req->wantw = 0;
- req->resend = NNI_TIME_ZERO;
- req->ttl = 8;
-
- req->uwq = nni_sock_sendq(sock);
- req->urq = nni_sock_recvq(sock);
- *reqp = req;
+ s->nextid = nni_random();
+ s->retry = NNI_SECOND * 60;
+ s->sock = sock;
+ s->reqmsg = NULL;
+ s->raw = 0;
+ s->wantw = 0;
+ s->resend = NNI_TIME_ZERO;
+ s->ttl = 8;
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
nni_sock_recverr(sock, NNG_ESTATE);
return (0);
}
static void
-nni_req_sock_open(void *arg)
+req_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_req_sock_close(void *arg)
+req_sock_close(void *arg)
{
- nni_req_sock *req = arg;
+ req_sock *s = arg;
- nni_mtx_lock(&req->mtx);
- req->closed = 1;
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
+ s->closed = 1;
+ nni_mtx_unlock(&s->mtx);
- nni_timer_cancel(&req->timer);
+ nni_timer_cancel(&s->timer);
}
static void
-nni_req_sock_fini(void *arg)
+req_sock_fini(void *arg)
{
- nni_req_sock *req = arg;
+ req_sock *s = arg;
- nni_mtx_lock(&req->mtx);
- while ((!nni_list_empty(&req->readypipes)) ||
- (!nni_list_empty(&req->busypipes))) {
- nni_cv_wait(&req->cv);
+ nni_mtx_lock(&s->mtx);
+ while ((!nni_list_empty(&s->readypipes)) ||
+ (!nni_list_empty(&s->busypipes))) {
+ nni_cv_wait(&s->cv);
}
- if (req->reqmsg != NULL) {
- nni_msg_free(req->reqmsg);
+ if (s->reqmsg != NULL) {
+ nni_msg_free(s->reqmsg);
}
- nni_mtx_unlock(&req->mtx);
- nni_cv_fini(&req->cv);
- nni_mtx_fini(&req->mtx);
- NNI_FREE_STRUCT(req);
+ nni_mtx_unlock(&s->mtx);
+ nni_cv_fini(&s->cv);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+req_pipe_fini(void *arg)
+{
+ req_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_sendcooked);
+ nni_aio_fini(p->aio_sendraw);
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
+req_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_req_pipe *rp;
+ req_pipe *p;
+ int rv;
- if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&rp->mtx);
- nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp);
- nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp);
- nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp);
- nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
- nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
-
- NNI_LIST_NODE_INIT(&rp->node);
- rp->pipe = pipe;
- rp->req = rsock;
- *rpp = rp;
- return (0);
-}
+ nni_mtx_init(&p->mtx);
+ if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) !=
+ 0)) {
+ req_pipe_fini(p);
+ return (rv);
+ }
-static void
-nni_req_pipe_fini(void *arg)
-{
- nni_req_pipe *rp = arg;
-
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_putq);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_sendcooked);
- nni_aio_fini(&rp->aio_sendraw);
- nni_mtx_fini(&rp->mtx);
- NNI_FREE_STRUCT(rp);
+ NNI_LIST_NODE_INIT(&p->node);
+ p->pipe = pipe;
+ p->req = s;
+ *pp = p;
+ return (0);
}
static int
-nni_req_pipe_start(void *arg)
+req_pipe_start(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
- if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&req->mtx);
- if (req->closed) {
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
+ if (s->closed) {
+ nni_mtx_unlock(&s->mtx);
return (NNG_ECLOSED);
}
- nni_list_append(&req->readypipes, rp);
- if (req->wantw) {
- nni_req_resend(req);
+ nni_list_append(&s->readypipes, p);
+ // If sock was waiting for somewhere to send data, go ahead and
+ // send it to this pipe.
+ if (s->wantw) {
+ req_resend(s);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(req->uwq, &rp->aio_getq);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_req_pipe_stop(void *arg)
+req_pipe_stop(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
- nni_aio_stop(&rp->aio_getq);
- nni_aio_stop(&rp->aio_putq);
- nni_aio_stop(&rp->aio_recv);
- nni_aio_stop(&rp->aio_sendcooked);
- nni_aio_stop(&rp->aio_sendraw);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_sendcooked);
+ nni_aio_stop(p->aio_sendraw);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
- nni_mtx_lock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
- if (nni_list_node_active(&rp->node)) {
- nni_list_node_remove(&rp->node);
- if (req->closed) {
- nni_cv_wake(&req->cv);
+ if (nni_list_node_active(&p->node)) {
+ nni_list_node_remove(&p->node);
+ if (s->closed) {
+ nni_cv_wake(&s->cv);
}
}
- if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
+ if ((p == s->pendpipe) && (s->reqmsg != NULL)) {
// removing the pipe we sent the last request on...
// schedule immediate resend.
- req->pendpipe = NULL;
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
- nni_req_resend(req);
+ s->pendpipe = NULL;
+ s->resend = NNI_TIME_ZERO;
+ s->wantw = 1;
+ req_resend(s);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static int
-nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_req_sock *req = arg;
- int rv = NNG_ENOTSUP;
+ req_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_req_resendtime) {
- rv = nni_setopt_usec(&req->retry, buf, sz);
+ rv = nni_setopt_usec(&s->retry, buf, sz);
} else if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&req->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
if (rv == 0) {
- nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE);
+ nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE);
}
} else if (opt == nng_optid_maxttl) {
- rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255);
}
return (rv);
}
static int
-nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_req_sock *req = arg;
- int rv = NNG_ENOTSUP;
+ req_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_req_resendtime) {
- rv = nni_getopt_usec(&req->retry, buf, szp);
+ rv = nni_getopt_usec(&s->retry, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&req->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
} else if (opt == nng_optid_maxttl) {
- rv = nni_getopt_int(&req->ttl, buf, szp);
+ rv = nni_getopt_int(&s->ttl, buf, szp);
}
return (rv);
@@ -297,10 +302,10 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
// kind of priority.)
static void
-nni_req_getq_cb(void *arg)
+req_getq_cb(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
// We should be in RAW mode. Cooked mode traffic bypasses
// the upper write queue entirely, and should never end up here.
@@ -308,47 +313,47 @@ nni_req_getq_cb(void *arg)
// that's ok (there's an inherent race anyway). (One minor
// exception: we wind up here in error state when the uwq is closed.)
- if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- rp->aio_sendraw.a_msg = rp->aio_getq.a_msg;
- rp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
// Send the message, but use the raw mode aio.
- nni_pipe_send(rp->pipe, &rp->aio_sendraw);
+ nni_pipe_send(p->pipe, p->aio_sendraw);
}
static void
-nni_req_sendraw_cb(void *arg)
+req_sendraw_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ req_pipe *p = arg;
- if (nni_aio_result(&rp->aio_sendraw) != 0) {
- nni_msg_free(rp->aio_sendraw.a_msg);
- rp->aio_sendraw.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_sendraw) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_sendraw));
+ nni_aio_set_msg(p->aio_sendraw, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
// Sent a message so we just need to look for another one.
- nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq);
+ nni_msgq_aio_get(p->req->uwq, p->aio_getq);
}
static void
-nni_req_sendcooked_cb(void *arg)
+req_sendcooked_cb(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
- if (nni_aio_result(&rp->aio_sendcooked) != 0) {
+ if (nni_aio_result(p->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
// We leave ourselves on the busy list for now, which
// means no new asynchronous traffic can occur here.
- nni_msg_free(rp->aio_sendcooked.a_msg);
- rp->aio_sendcooked.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ nni_msg_free(nni_aio_get_msg(p->aio_sendcooked));
+ nni_aio_set_msg(p->aio_sendcooked, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
@@ -356,49 +361,50 @@ nni_req_sendcooked_cb(void *arg)
// reinsert ourselves in the ready list, and possibly schedule
// a resend.
- nni_mtx_lock(&req->mtx);
- if (nni_list_active(&req->busypipes, rp)) {
- nni_list_remove(&req->busypipes, rp);
- nni_list_append(&req->readypipes, rp);
- nni_req_resend(req);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->busypipes, p)) {
+ nni_list_remove(&s->busypipes, p);
+ nni_list_append(&s->readypipes, p);
+ req_resend(s);
} else {
// We wind up here if stop was called from the reader
// side while we were waiting to be scheduled to run for the
// writer side. In this case we can't complete the operation,
// and we have to abort.
- nni_pipe_stop(rp->pipe);
+ nni_pipe_stop(p->pipe);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_req_putq_cb(void *arg)
+req_putq_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ req_pipe *p = arg;
- if (nni_aio_result(&rp->aio_putq) != 0) {
- nni_msg_free(rp->aio_putq.a_msg);
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- rp->aio_putq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_req_recv_cb(void *arg)
+req_recv_cb(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_msg * msg;
+ req_pipe *p = arg;
+ nni_msg * msg;
- if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- msg = rp->aio_recv.a_msg;
- rp->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
@@ -414,92 +420,90 @@ nni_req_recv_cb(void *arg)
}
(void) nni_msg_trim(msg, 4); // Cannot fail
- rp->aio_putq.a_msg = msg;
- nni_msgq_aio_put(rp->req->urq, &rp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(p->req->urq, p->aio_putq);
return;
malformed:
nni_msg_free(msg);
- nni_pipe_stop(rp->pipe);
+ nni_pipe_stop(p->pipe);
}
static void
-nni_req_timeout(void *arg)
+req_timeout(void *arg)
{
- nni_req_sock *req = arg;
+ req_sock *s = arg;
- nni_mtx_lock(&req->mtx);
- if (req->reqmsg != NULL) {
- req->wantw = 1;
- nni_req_resend(req);
+ nni_mtx_lock(&s->mtx);
+ if (s->reqmsg != NULL) {
+ s->wantw = 1;
+ req_resend(s);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_req_resend(nni_req_sock *req)
+req_resend(req_sock *s)
{
- nni_req_pipe *rp;
- nni_msg * msg;
+ req_pipe *p;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
// requests.
- if ((msg = req->reqmsg) == NULL) {
+ if ((msg = s->reqmsg) == NULL) {
return;
}
- if (req->closed) {
- req->reqmsg = NULL;
+ if (s->closed) {
+ s->reqmsg = NULL;
nni_msg_free(msg);
}
- if (req->wantw) {
- req->wantw = 0;
+ if (s->wantw) {
+ s->wantw = 0;
- if (nni_msg_dup(&msg, req->reqmsg) != 0) {
+ if (nni_msg_dup(&msg, s->reqmsg) != 0) {
// Failed to alloc message, reschedule it. Also,
// mark that we have a message we want to resend,
// in case something comes available.
- req->wantw = 1;
- nni_timer_schedule(
- &req->timer, nni_clock() + req->retry);
+ s->wantw = 1;
+ nni_timer_schedule(&s->timer, nni_clock() + s->retry);
return;
}
// Now we iterate across all possible outpipes, until
// one accepts it.
- rp = nni_list_first(&req->readypipes);
- if (rp == NULL) {
+ if ((p = nni_list_first(&s->readypipes)) == NULL) {
// No pipes ready to process us. Note that we have
// something to send, and schedule it.
nni_msg_free(msg);
- req->wantw = 1;
+ s->wantw = 1;
return;
}
- nni_list_remove(&req->readypipes, rp);
- nni_list_append(&req->busypipes, rp);
+ nni_list_remove(&s->readypipes, p);
+ nni_list_append(&s->busypipes, p);
- req->pendpipe = rp;
- req->resend = nni_clock() + req->retry;
- rp->aio_sendcooked.a_msg = msg;
+ s->pendpipe = p;
+ s->resend = nni_clock() + s->retry;
+ nni_aio_set_msg(p->aio_sendcooked, msg);
// Note that because we were ready rather than busy, we
// should not have any I/O oustanding and hence the aio
// object will be available for our use.
- nni_pipe_send(rp->pipe, &rp->aio_sendcooked);
- nni_timer_schedule(&req->timer, req->resend);
+ nni_pipe_send(p->pipe, p->aio_sendcooked);
+ nni_timer_schedule(&s->timer, s->resend);
}
}
static nni_msg *
-nni_req_sock_sfilter(void *arg, nni_msg *msg)
+req_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_req_sock *req = arg;
- uint32_t id;
+ req_sock *s = arg;
+ uint32_t id;
- if (req->raw) {
+ if (s->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
return (msg);
@@ -508,12 +512,12 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
- id = (req->nextid++) | 0x80000000u;
+ id = (s->nextid++) | 0x80000000u;
// Request ID is in big endian format.
- NNI_PUT32(req->reqid, id);
+ NNI_PUT32(s->reqid, id);
- if (nni_msg_header_append(msg, req->reqid, 4) != 0) {
+ if (nni_msg_header_append(msg, s->reqid, 4) != 0) {
// Should be ENOMEM.
nni_msg_free(msg);
return (NULL);
@@ -521,36 +525,36 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
// NB: The socket lock is also held, so this is always self-serialized.
// But we have to serialize against other async callbacks.
- nni_mtx_lock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
// If another message is there, this cancels it.
- if (req->reqmsg != NULL) {
- nni_msg_free(req->reqmsg);
- req->reqmsg = NULL;
+ if (s->reqmsg != NULL) {
+ nni_msg_free(s->reqmsg);
+ s->reqmsg = NULL;
}
// Make a duplicate message... for retries.
- req->reqmsg = msg;
+ s->reqmsg = msg;
// Schedule for immediate send
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
+ s->resend = NNI_TIME_ZERO;
+ s->wantw = 1;
- nni_req_resend(req);
- nni_mtx_unlock(&req->mtx);
+ req_resend(s);
+ nni_mtx_unlock(&s->mtx);
// Clear the error condition.
- nni_sock_recverr(req->sock, 0);
+ nni_sock_recverr(s->sock, 0);
return (NULL);
}
static nni_msg *
-nni_req_sock_rfilter(void *arg, nni_msg *msg)
+req_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_req_sock *req = arg;
- nni_msg * rmsg;
+ req_sock *s = arg;
+ nni_msg * rmsg;
- if (req->raw) {
+ if (s->raw) {
// Pass it unmolested
return (msg);
}
@@ -560,62 +564,60 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
return (NULL);
}
- nni_mtx_lock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
- if ((rmsg = req->reqmsg) == NULL) {
+ if ((rmsg = s->reqmsg) == NULL) {
// We had no outstanding request.
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
- if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) {
+ if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) {
// Wrong request id
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
- req->reqmsg = NULL;
- req->pendpipe = NULL;
- nni_mtx_unlock(&req->mtx);
+ s->reqmsg = NULL;
+ s->pendpipe = NULL;
+ nni_mtx_unlock(&s->mtx);
- nni_sock_recverr(req->sock, NNG_ESTATE);
+ nni_sock_recverr(s->sock, NNG_ESTATE);
nni_msg_free(rmsg);
return (msg);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_req_pipe_ops = {
- .pipe_init = nni_req_pipe_init,
- .pipe_fini = nni_req_pipe_fini,
- .pipe_start = nni_req_pipe_start,
- .pipe_stop = nni_req_pipe_stop,
+static nni_proto_pipe_ops req_pipe_ops = {
+ .pipe_init = req_pipe_init,
+ .pipe_fini = req_pipe_fini,
+ .pipe_start = req_pipe_start,
+ .pipe_stop = req_pipe_stop,
};
-static nni_proto_sock_ops nni_req_sock_ops = {
- .sock_init = nni_req_sock_init,
- .sock_fini = nni_req_sock_fini,
- .sock_open = nni_req_sock_open,
- .sock_close = nni_req_sock_close,
- .sock_setopt = nni_req_sock_setopt,
- .sock_getopt = nni_req_sock_getopt,
- .sock_rfilter = nni_req_sock_rfilter,
- .sock_sfilter = nni_req_sock_sfilter,
+static nni_proto_sock_ops req_sock_ops = {
+ .sock_init = req_sock_init,
+ .sock_fini = req_sock_fini,
+ .sock_open = req_sock_open,
+ .sock_close = req_sock_close,
+ .sock_setopt = req_sock_setopt,
+ .sock_getopt = req_sock_getopt,
+ .sock_rfilter = req_sock_rfilter,
+ .sock_sfilter = req_sock_sfilter,
};
-nni_proto nni_req_proto = {
+static nni_proto req_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_REQ_V0, "req" },
.proto_peer = { NNG_PROTO_REP_V0, "rep" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_req_sock_ops,
- .proto_pipe_ops = &nni_req_pipe_ops,
+ .proto_sock_ops = &req_sock_ops,
+ .proto_pipe_ops = &req_pipe_ops,
};
int
nng_req0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_req_proto));
+ return (nni_proto_open(sidp, &req_proto));
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index e695a987..c3a9dd81 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -17,18 +17,18 @@
// the surveyor pattern. This is useful for building service discovery, or
// voting algorithsm, for example.
-typedef struct nni_resp_pipe nni_resp_pipe;
-typedef struct nni_resp_sock nni_resp_sock;
-
-static void nni_resp_recv_cb(void *);
-static void nni_resp_putq_cb(void *);
-static void nni_resp_getq_cb(void *);
-static void nni_resp_send_cb(void *);
-static void nni_resp_sock_getq_cb(void *);
-static void nni_resp_pipe_fini(void *);
-
-// An nni_resp_sock is our per-socket protocol private structure.
-struct nni_resp_sock {
+typedef struct resp_pipe resp_pipe;
+typedef struct resp_sock resp_sock;
+
+static void resp_recv_cb(void *);
+static void resp_putq_cb(void *);
+static void resp_getq_cb(void *);
+static void resp_send_cb(void *);
+static void resp_sock_getq_cb(void *);
+static void resp_pipe_fini(void *);
+
+// A resp_sock is our per-socket protocol private structure.
+struct resp_sock {
nni_sock * nsock;
nni_msgq * urq;
nni_msgq * uwq;
@@ -37,259 +37,257 @@ struct nni_resp_sock {
nni_idhash *pipes;
char * btrace;
size_t btrace_len;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_mtx mtx;
};
-// An nni_resp_pipe is our per-pipe protocol private structure.
-struct nni_resp_pipe {
- nni_pipe * npipe;
- nni_resp_sock *psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+// A resp_pipe is our per-pipe protocol private structure.
+struct resp_pipe {
+ nni_pipe * npipe;
+ resp_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-nni_resp_sock_fini(void *arg)
+resp_sock_fini(void *arg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_idhash_fini(psock->pipes);
- if (psock->btrace != NULL) {
- nni_free(psock->btrace, psock->btrace_len);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
}
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_resp_sock_init(void **pp, nni_sock *nsock)
+resp_sock_init(void **sp, nni_sock *nsock)
{
- nni_resp_sock *psock;
- int rv;
+ resp_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
- NNI_FREE_STRUCT(psock);
+ if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
+ ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
+ resp_sock_fini(s);
return (rv);
}
- psock->ttl = 8; // Per RFC
- psock->nsock = nsock;
- psock->raw = 0;
- psock->btrace = NULL;
- psock->btrace_len = 0;
- psock->urq = nni_sock_recvq(nsock);
- psock->uwq = nni_sock_sendq(nsock);
+ s->ttl = 8; // Per RFC
+ s->nsock = nsock;
+ s->raw = 0;
+ s->btrace = NULL;
+ s->btrace_len = 0;
+ s->urq = nni_sock_recvq(nsock);
+ s->uwq = nni_sock_sendq(nsock);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock);
+ nni_mtx_init(&s->mtx);
- *pp = psock;
+ *sp = s;
nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
}
static void
-nni_resp_sock_open(void *arg)
+resp_sock_open(void *arg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_resp_sock_close(void *arg)
+resp_sock_close(void *arg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
+}
+
+static void
+resp_pipe_fini(void *arg)
+{
+ resp_pipe *p = arg;
+
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_resp_pipe *ppipe;
- int rv;
+ resp_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) {
+ resp_pipe_fini(p);
return (rv);
}
- nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe);
- nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
- return (0);
-}
-static void
-nni_resp_pipe_fini(void *arg)
-{
- nni_resp_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_putq);
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_msgq_fini(ppipe->sendq);
- NNI_FREE_STRUCT(ppipe);
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
+ return (0);
}
static int
-nni_resp_pipe_start(void *arg)
+resp_pipe_start(void *arg)
{
- nni_resp_pipe *ppipe = arg;
- nni_resp_sock *psock = ppipe->psock;
- int rv;
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+ int rv;
- ppipe->id = nni_pipe_id(ppipe->npipe);
+ p->id = nni_pipe_id(p->npipe);
- nni_mtx_lock(&psock->mtx);
- rv = nni_idhash_insert(psock->pipes, ppipe->id, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ rv = nni_idhash_insert(s->pipes, p->id, p);
+ nni_mtx_unlock(&s->mtx);
if (rv != 0) {
return (rv);
}
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
return (rv);
}
static void
-nni_resp_pipe_stop(void *arg)
+resp_pipe_stop(void *arg)
{
- nni_resp_pipe *ppipe = arg;
- nni_resp_sock *psock = ppipe->psock;
-
- nni_msgq_close(ppipe->sendq);
- nni_aio_stop(&ppipe->aio_putq);
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
-
- if (ppipe->id != 0) {
- nni_mtx_lock(&psock->mtx);
- nni_idhash_remove(psock->pipes, ppipe->id);
- nni_mtx_unlock(&psock->mtx);
- ppipe->id = 0;
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+
+ nni_msgq_close(p->sendq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+
+ if (p->id != 0) {
+ nni_mtx_lock(&s->mtx);
+ nni_idhash_remove(s->pipes, p->id);
+ nni_mtx_unlock(&s->mtx);
+ p->id = 0;
}
}
-// nni_resp_sock_send watches for messages from the upper write queue,
+// resp_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.s
void
-nni_resp_sock_getq_cb(void *arg)
+resp_sock_getq_cb(void *arg)
{
- nni_resp_sock *psock = arg;
- nni_msg * msg;
- uint32_t id;
- nni_resp_pipe *ppipe;
- int rv;
+ resp_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+ resp_pipe *p;
+ int rv;
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = psock->aio_getq.a_msg;
- psock->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// We yank the outgoing pipe id from the header
if (nni_msg_header_len(msg) < 4) {
nni_msg_free(msg);
- // We can't really close down the socket, so just
- // keep going.
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
+ // We can't really close down the socket, so just keep going.
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
return;
}
id = nni_msg_header_trim_u32(msg);
- nni_mtx_lock(&psock->mtx);
- rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe);
-
- if (rv != 0) {
+ nni_mtx_lock(&s->mtx);
+ if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) {
+ // Destination pipe not present.
nni_msg_free(msg);
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
} else {
// Non-blocking put.
- if (nni_msgq_tryput(ppipe->sendq, msg) != 0) {
+ if (nni_msgq_tryput(p->sendq, msg) != 0) {
nni_msg_free(msg);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_mtx_unlock(&s->mtx);
}
void
-nni_resp_getq_cb(void *arg)
+resp_getq_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
+ resp_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
void
-nni_resp_send_cb(void *arg)
+resp_send_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
+ resp_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_resp_recv_cb(void *arg)
+resp_recv_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
- nni_resp_sock *psock = ppipe->psock;
- nni_msgq * urq;
- nni_msg * msg;
- int hops;
- int rv;
-
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+ nni_msgq * urq;
+ nni_msg * msg;
+ int hops;
+ int rv;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
goto error;
}
- urq = nni_sock_recvq(psock->nsock);
+ urq = nni_sock_recvq(s->nsock);
- msg = ppipe->aio_recv.a_msg;
- ppipe->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe id in the header, first thing.
- if (nni_msg_header_append_u32(msg, ppipe->id) != 0) {
+ if (nni_msg_header_append_u32(msg, p->id) != 0) {
nni_msg_free(msg);
goto error;
}
@@ -300,7 +298,7 @@ nni_resp_recv_cb(void *arg)
int end = 0;
uint8_t *body;
- if (hops >= psock->ttl) {
+ if (hops >= s->ttl) {
nni_msg_free(msg);
goto error;
}
@@ -322,46 +320,46 @@ nni_resp_recv_cb(void *arg)
}
// Now send it up.
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(urq, &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(urq, p->aio_putq);
return;
error:
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
}
static void
-nni_resp_putq_cb(void *arg)
+resp_putq_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
+ resp_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
}
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_resp_sock *psock = arg;
- int rv = NNG_ENOTSUP;
- int oldraw;
+ resp_sock *s = arg;
+ int rv = NNG_ENOTSUP;
+ int oldraw;
if (opt == nng_optid_maxttl) {
- rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255);
} else if (opt == nng_optid_raw) {
- oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
- if (oldraw != psock->raw) {
- if (!psock->raw) {
- nni_sock_senderr(psock->nsock, 0);
+ oldraw = s->raw;
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ if (oldraw != s->raw) {
+ if (!s->raw) {
+ nni_sock_senderr(s->nsock, 0);
} else {
- nni_sock_senderr(psock->nsock, NNG_ESTATE);
+ nni_sock_senderr(s->nsock, NNG_ESTATE);
}
}
}
@@ -370,34 +368,34 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
}
static int
-nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_resp_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ resp_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_maxttl) {
- rv = nni_getopt_int(&psock->ttl, buf, szp);
+ rv = nni_getopt_int(&s->ttl, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_resp_sock_sfilter(void *arg, nni_msg *msg)
+resp_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- if (psock->raw) {
+ if (s->raw) {
return (msg);
}
// Cannot send again until a receive is done...
- nni_sock_senderr(psock->nsock, NNG_ESTATE);
+ nni_sock_senderr(s->nsock, NNG_ESTATE);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
- if (psock->btrace == NULL) {
+ if (s->btrace == NULL) {
nni_msg_free(msg);
return (NULL);
}
@@ -405,79 +403,78 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_header_clear(msg);
- if (nni_msg_header_append(msg, psock->btrace, psock->btrace_len) !=
- 0) {
- nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
- psock->btrace_len = 0;
+ if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
- nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
- psock->btrace_len = 0;
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
return (msg);
}
static nni_msg *
-nni_resp_sock_rfilter(void *arg, nni_msg *msg)
+resp_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *psock = arg;
- char * header;
- size_t len;
+ resp_sock *s = arg;
+ char * header;
+ size_t len;
- if (psock->raw) {
+ if (s->raw) {
return (msg);
}
- nni_sock_senderr(psock->nsock, 0);
+ nni_sock_senderr(s->nsock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
- if (psock->btrace != NULL) {
- nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
- psock->btrace_len = 0;
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
}
- if ((psock->btrace = nni_alloc(len)) == NULL) {
+ if ((s->btrace = nni_alloc(len)) == NULL) {
nni_msg_free(msg);
return (NULL);
}
- psock->btrace_len = len;
- memcpy(psock->btrace, header, len);
+ s->btrace_len = len;
+ memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
return (msg);
}
-static nni_proto_pipe_ops nni_resp_pipe_ops = {
- .pipe_init = nni_resp_pipe_init,
- .pipe_fini = nni_resp_pipe_fini,
- .pipe_start = nni_resp_pipe_start,
- .pipe_stop = nni_resp_pipe_stop,
+static nni_proto_pipe_ops resp_pipe_ops = {
+ .pipe_init = resp_pipe_init,
+ .pipe_fini = resp_pipe_fini,
+ .pipe_start = resp_pipe_start,
+ .pipe_stop = resp_pipe_stop,
};
-static nni_proto_sock_ops nni_resp_sock_ops = {
- .sock_init = nni_resp_sock_init,
- .sock_fini = nni_resp_sock_fini,
- .sock_open = nni_resp_sock_open,
- .sock_close = nni_resp_sock_close,
- .sock_setopt = nni_resp_sock_setopt,
- .sock_getopt = nni_resp_sock_getopt,
- .sock_rfilter = nni_resp_sock_rfilter,
- .sock_sfilter = nni_resp_sock_sfilter,
+static nni_proto_sock_ops resp_sock_ops = {
+ .sock_init = resp_sock_init,
+ .sock_fini = resp_sock_fini,
+ .sock_open = resp_sock_open,
+ .sock_close = resp_sock_close,
+ .sock_setopt = resp_sock_setopt,
+ .sock_getopt = resp_sock_getopt,
+ .sock_rfilter = resp_sock_rfilter,
+ .sock_sfilter = resp_sock_sfilter,
};
-nni_proto nni_respondent_proto = {
+static nni_proto resp_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_RESPONDENT_V0, "respondent" },
.proto_peer = { NNG_PROTO_SURVEYOR_V0, "surveyor" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_resp_sock_ops,
- .proto_pipe_ops = &nni_resp_pipe_ops,
+ .proto_sock_ops = &resp_sock_ops,
+ .proto_pipe_ops = &resp_pipe_ops,
};
int
nng_respondent0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_respondent_proto));
+ return (nni_proto_open(sidp, &resp_proto));
}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index d7341025..09fe0768 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -16,18 +16,18 @@
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct nni_surv_pipe nni_surv_pipe;
-typedef struct nni_surv_sock nni_surv_sock;
-
-static void nni_surv_sock_getq_cb(void *);
-static void nni_surv_getq_cb(void *);
-static void nni_surv_putq_cb(void *);
-static void nni_surv_send_cb(void *);
-static void nni_surv_recv_cb(void *);
-static void nni_surv_timeout(void *);
-
-// An nni_surv_sock is our per-socket protocol private structure.
-struct nni_surv_sock {
+typedef struct surv_pipe surv_pipe;
+typedef struct surv_sock surv_sock;
+
+static void surv_sock_getq_cb(void *);
+static void surv_getq_cb(void *);
+static void surv_putq_cb(void *);
+static void surv_send_cb(void *);
+static void surv_recv_cb(void *);
+static void surv_timeout(void *);
+
+// A surv_sock is our per-socket protocol private structure.
+struct surv_sock {
nni_sock * nsock;
nni_duration survtime;
nni_time expire;
@@ -36,211 +36,214 @@ struct nni_surv_sock {
uint32_t nextid; // next id
uint32_t survid; // outstanding request ID (big endian)
nni_list pipes;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_timer_node timer;
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx mtx;
};
-// An nni_surv_pipe is our per-pipe protocol private structure.
-struct nni_surv_pipe {
- nni_pipe * npipe;
- nni_surv_sock *psock;
- nni_msgq * sendq;
- nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+// A surv_pipe is our per-pipe protocol private structure.
+struct surv_pipe {
+ nni_pipe * npipe;
+ surv_sock * psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-nni_surv_sock_fini(void *arg)
+surv_sock_fini(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_surv_sock_init(void **sp, nni_sock *nsock)
+surv_sock_init(void **sp, nni_sock *nsock)
{
- nni_surv_sock *psock;
+ surv_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock);
- nni_timer_init(&psock->timer, nni_surv_timeout, psock);
-
- psock->nextid = nni_random();
- psock->nsock = nsock;
- psock->raw = 0;
- psock->survtime = NNI_SECOND * 60;
- psock->expire = NNI_TIME_ZERO;
- psock->uwq = nni_sock_sendq(nsock);
- psock->urq = nni_sock_recvq(nsock);
-
- *sp = psock;
+ if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) {
+ surv_sock_fini(s);
+ return (rv);
+ }
+ NNI_LIST_INIT(&s->pipes, surv_pipe, node);
+ nni_mtx_init(&s->mtx);
+ nni_timer_init(&s->timer, surv_timeout, s);
+
+ s->nextid = nni_random();
+ s->nsock = nsock;
+ s->raw = 0;
+ s->survtime = NNI_SECOND * 60;
+ s->expire = NNI_TIME_ZERO;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+
+ *sp = s;
nni_sock_recverr(nsock, NNG_ESTATE);
return (0);
}
static void
-nni_surv_sock_open(void *arg)
+surv_sock_open(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_surv_sock_close(void *arg)
+surv_sock_close(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_timer_cancel(&psock->timer);
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_timer_cancel(&s->timer);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_surv_pipe_fini(void *arg)
+surv_pipe_fini(void *arg)
{
- nni_surv_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- NNI_FREE_STRUCT(ppipe);
+ surv_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_surv_pipe *ppipe;
- int rv;
+ surv_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
// This depth could be tunable.
- if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) {
+ surv_pipe_fini(p);
return (rv);
}
- nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
return (0);
}
static int
-nni_surv_pipe_start(void *arg)
+surv_pipe_start(void *arg)
{
- nni_surv_pipe *ppipe = arg;
- nni_surv_sock *psock = ppipe->psock;
+ surv_pipe *p = arg;
+ surv_sock *s = p->psock;
- nni_mtx_lock(&psock->mtx);
- nni_list_append(&psock->pipes, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
static void
-nni_surv_pipe_stop(void *arg)
+surv_pipe_stop(void *arg)
{
- nni_surv_pipe *ppipe = arg;
- nni_surv_sock *psock = ppipe->psock;
+ surv_pipe *p = arg;
+ surv_sock *s = p->psock;
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
- nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_msgq_close(ppipe->sendq);
+ nni_msgq_close(p->sendq);
- nni_mtx_lock(&psock->mtx);
- if (nni_list_active(&psock->pipes, ppipe)) {
- nni_list_remove(&psock->pipes, ppipe);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_surv_getq_cb(void *arg)
+surv_getq_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
+ surv_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
-nni_surv_send_cb(void *arg)
+surv_send_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
+ surv_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(ppipe->psock->uwq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->psock->uwq, p->aio_getq);
}
static void
-nni_surv_putq_cb(void *arg)
+surv_putq_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
+ surv_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
-nni_surv_recv_cb(void *arg)
+surv_recv_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
- nni_msg * msg;
+ surv_pipe *p = arg;
+ nni_msg * msg;
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
goto failed;
}
- msg = ppipe->aio_recv.a_msg;
- ppipe->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
@@ -255,35 +258,35 @@ nni_surv_recv_cb(void *arg)
}
(void) nni_msg_trim(msg, 4);
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(ppipe->psock->urq, &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(p->psock->urq, p->aio_putq);
return;
failed:
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
}
static int
-nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_surv_sock *psock = arg;
- int rv = NNG_ENOTSUP;
- int oldraw;
+ surv_sock *s = arg;
+ int rv = NNG_ENOTSUP;
+ int oldraw;
if (opt == nng_optid_surveyor_surveytime) {
- rv = nni_setopt_usec(&psock->survtime, buf, sz);
+ rv = nni_setopt_usec(&s->survtime, buf, sz);
} else if (opt == nng_optid_raw) {
- oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
- if (oldraw != psock->raw) {
- if (psock->raw) {
- nni_sock_recverr(psock->nsock, 0);
+ oldraw = s->raw;
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ if (oldraw != s->raw) {
+ if (s->raw) {
+ nni_sock_recverr(s->nsock, 0);
} else {
- nni_sock_recverr(psock->nsock, NNG_ESTATE);
+ nni_sock_recverr(s->nsock, NNG_ESTATE);
}
- psock->survid = 0;
- nni_timer_cancel(&psock->timer);
+ s->survid = 0;
+ nni_timer_cancel(&s->timer);
}
}
@@ -291,49 +294,49 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
}
static int
-nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_surv_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ surv_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_surveyor_surveytime) {
- rv = nni_getopt_usec(&psock->survtime, buf, szp);
+ rv = nni_getopt_usec(&s->survtime, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static void
-nni_surv_sock_getq_cb(void *arg)
+surv_sock_getq_cb(void *arg)
{
- nni_surv_sock *psock = arg;
- nni_surv_pipe *ppipe;
- nni_surv_pipe *last;
- nni_msg * msg, *dup;
+ surv_sock *s = arg;
+ surv_pipe *p;
+ surv_pipe *last;
+ nni_msg * msg, *dup;
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Should be NNG_ECLOSED.
return;
}
- msg = psock->aio_getq.a_msg;
- psock->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
- nni_mtx_lock(&psock->mtx);
- last = nni_list_last(&psock->pipes);
- NNI_LIST_FOREACH (&psock->pipes, ppipe) {
- if (ppipe != last) {
+ nni_mtx_lock(&s->mtx);
+ last = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != last) {
if (nni_msg_dup(&dup, msg) != 0) {
continue;
}
} else {
dup = msg;
}
- if (nni_msgq_tryput(ppipe->sendq, dup) != 0) {
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
if (last == NULL) {
// If there were no pipes to send on, just toss the message.
@@ -342,23 +345,23 @@ nni_surv_sock_getq_cb(void *arg)
}
static void
-nni_surv_timeout(void *arg)
+surv_timeout(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_sock_lock(psock->nsock);
- psock->survid = 0;
- nni_sock_recverr(psock->nsock, NNG_ESTATE);
- nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT);
- nni_sock_unlock(psock->nsock);
+ nni_sock_lock(s->nsock);
+ s->survid = 0;
+ nni_sock_recverr(s->nsock, NNG_ESTATE);
+ nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT);
+ nni_sock_unlock(s->nsock);
}
static nni_msg *
-nni_surv_sock_sfilter(void *arg, nni_msg *msg)
+surv_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- if (psock->raw) {
+ if (s->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
return (msg);
@@ -367,9 +370,9 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
- psock->survid = (psock->nextid++) | 0x80000000u;
+ s->survid = (s->nextid++) | 0x80000000u;
- if (nni_msg_header_append_u32(msg, psock->survid) != 0) {
+ if (nni_msg_header_append_u32(msg, s->survid) != 0) {
// Should be ENOMEM.
nni_msg_free(msg);
return (NULL);
@@ -378,28 +381,28 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// If another message is there, this cancels it. We move the
// survey expiration out. The timeout thread will wake up in
// the wake below, and reschedule itself appropriately.
- psock->expire = nni_clock() + psock->survtime;
- nni_timer_schedule(&psock->timer, psock->expire);
+ s->expire = nni_clock() + s->survtime;
+ nni_timer_schedule(&s->timer, s->expire);
// Clear the error condition.
- nni_sock_recverr(psock->nsock, 0);
+ nni_sock_recverr(s->nsock, 0);
// nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
return (msg);
}
static nni_msg *
-nni_surv_sock_rfilter(void *arg, nni_msg *msg)
+surv_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_surv_sock *ssock = arg;
+ surv_sock *s = arg;
- if (ssock->raw) {
+ if (s->raw) {
// Pass it unmolested
return (msg);
}
if ((nni_msg_header_len(msg) < sizeof(uint32_t)) ||
- (nni_msg_header_trim_u32(msg) != ssock->survid)) {
+ (nni_msg_header_trim_u32(msg) != s->survid)) {
// Wrong request id
nni_msg_free(msg);
return (NULL);
@@ -408,35 +411,35 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-static nni_proto_pipe_ops nni_surv_pipe_ops = {
- .pipe_init = nni_surv_pipe_init,
- .pipe_fini = nni_surv_pipe_fini,
- .pipe_start = nni_surv_pipe_start,
- .pipe_stop = nni_surv_pipe_stop,
+static nni_proto_pipe_ops surv_pipe_ops = {
+ .pipe_init = surv_pipe_init,
+ .pipe_fini = surv_pipe_fini,
+ .pipe_start = surv_pipe_start,
+ .pipe_stop = surv_pipe_stop,
};
-static nni_proto_sock_ops nni_surv_sock_ops = {
- .sock_init = nni_surv_sock_init,
- .sock_fini = nni_surv_sock_fini,
- .sock_open = nni_surv_sock_open,
- .sock_close = nni_surv_sock_close,
- .sock_setopt = nni_surv_sock_setopt,
- .sock_getopt = nni_surv_sock_getopt,
- .sock_rfilter = nni_surv_sock_rfilter,
- .sock_sfilter = nni_surv_sock_sfilter,
+static nni_proto_sock_ops surv_sock_ops = {
+ .sock_init = surv_sock_init,
+ .sock_fini = surv_sock_fini,
+ .sock_open = surv_sock_open,
+ .sock_close = surv_sock_close,
+ .sock_setopt = surv_sock_setopt,
+ .sock_getopt = surv_sock_getopt,
+ .sock_rfilter = surv_sock_rfilter,
+ .sock_sfilter = surv_sock_sfilter,
};
-nni_proto nni_surveyor_proto = {
+static nni_proto surv_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNG_PROTO_RESPONDENT_V0, "respondent" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_surv_sock_ops,
- .proto_pipe_ops = &nni_surv_pipe_ops,
+ .proto_sock_ops = &surv_sock_ops,
+ .proto_pipe_ops = &surv_pipe_ops,
};
int
nng_surveyor0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_surveyor_proto));
+ return (nni_proto_open(sidp, &surv_proto));
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index b2c382fb..f10f80b4 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -39,9 +39,9 @@ struct nni_ipc_pipe {
nni_aio *user_txaio;
nni_aio *user_rxaio;
nni_aio *user_negaio;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -51,7 +51,7 @@ struct nni_ipc_ep {
nni_plat_ipc_ep *iep;
uint16_t proto;
size_t rcvmax;
- nni_aio aio;
+ nni_aio * aio;
nni_aio * user_aio;
nni_mtx mtx;
};
@@ -94,13 +94,13 @@ nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_aio_stop(&pipe->rxaio);
- nni_aio_stop(&pipe->txaio);
- nni_aio_stop(&pipe->negaio);
+ nni_aio_stop(pipe->rxaio);
+ nni_aio_stop(pipe->txaio);
+ nni_aio_stop(pipe->negaio);
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_aio_fini(&pipe->negaio);
+ nni_aio_fini(pipe->rxaio);
+ nni_aio_fini(pipe->txaio);
+ nni_aio_fini(pipe->negaio);
if (pipe->ipp != NULL) {
nni_plat_ipc_pipe_fini(pipe->ipp);
}
@@ -115,14 +115,18 @@ static int
nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p);
- nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p);
+ if (((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) {
+ nni_ipc_pipe_fini(p);
+ return (rv);
+ }
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -146,7 +150,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv)
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->negaio, rv);
+ nni_aio_cancel(pipe->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -154,7 +158,7 @@ static void
nni_ipc_pipe_nego_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_aio * aio = &pipe->negaio;
+ nni_aio * aio = pipe->negaio;
int rv;
nni_mtx_lock(&pipe->mtx);
@@ -219,12 +223,13 @@ nni_ipc_pipe_send_cb(void *arg)
return;
}
pipe->user_txaio = NULL;
- if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ if ((rv = nni_aio_result(pipe->txaio)) != 0) {
len = 0;
} else {
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
+ nni_msg *msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_msg_free(msg);
+ nni_aio_set_msg(aio, NULL);
}
nni_aio_finish(aio, rv, len);
nni_mtx_unlock(&pipe->mtx);
@@ -245,7 +250,7 @@ nni_ipc_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ if ((rv = nni_aio_result(pipe->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
@@ -264,6 +269,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// message to allocate and how much more to expect.
if (pipe->rxmsg == NULL) {
uint64_t len;
+ nni_aio *rxaio;
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
@@ -298,11 +304,12 @@ nni_ipc_pipe_recv_cb(void *arg)
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- pipe->rxaio.a_niov = 1;
+ rxaio = pipe->rxaio;
+ rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -329,7 +336,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->txaio, rv);
+ nni_aio_cancel(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -337,8 +344,9 @@ static void
nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
+ nni_aio * txaio;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -353,15 +361,16 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- pipe->txaio.a_iov[0].iov_buf = pipe->txhead;
- pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead);
- pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- pipe->txaio.a_niov = 3;
+ txaio = pipe->txaio;
+ txaio->a_iov[0].iov_buf = pipe->txhead;
+ txaio->a_iov[0].iov_len = sizeof(pipe->txhead);
+ txaio->a_iov[1].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
+ txaio->a_iov[2].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[2].iov_len = nni_msg_len(msg);
+ txaio->a_niov = 3;
- nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -378,7 +387,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv)
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->rxaio, rv);
+ nni_aio_cancel(pipe->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -386,6 +395,7 @@ static void
nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ nni_aio * rxaio;
nni_mtx_lock(&pipe->mtx);
@@ -398,11 +408,12 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
- pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead;
- pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead);
- pipe->rxaio.a_niov = 1;
+ rxaio = pipe->rxaio;
+ rxaio->a_iov[0].iov_buf = pipe->rxhead;
+ rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead);
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -411,6 +422,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
int rv;
+ nni_aio * negaio;
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
@@ -420,20 +432,21 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&pipe->txhead[4], pipe->proto);
NNI_PUT16(&pipe->txhead[6], 0);
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- pipe->negaio.a_niov = 1;
- pipe->negaio.a_iov[0].iov_len = 8;
- pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ negaio = pipe->negaio;
+ negaio->a_niov = 1;
+ negaio->a_iov[0].iov_len = 8;
+ negaio->a_iov[0].iov_buf = &pipe->txhead[0];
rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
if (rv != 0) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -473,9 +486,9 @@ nni_ipc_ep_fini(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
nni_plat_ipc_ep_fini(ep->iep);
- nni_aio_fini(&ep->aio);
+ nni_aio_fini(ep->aio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -532,7 +545,7 @@ nni_ipc_ep_close(void *arg)
nni_plat_ipc_ep_close(ep->iep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
}
static int
@@ -554,19 +567,19 @@ nni_ipc_ep_finish(nni_ipc_ep *ep)
int rv;
nni_ipc_pipe *pipe = NULL;
- if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(ep->aio.a_pipe != NULL);
+ NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe);
+ rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
done:
- ep->aio.a_pipe = NULL;
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ nni_aio_set_pipe(ep->aio, NULL);
+ aio = ep->user_aio;
+ ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
NNI_ASSERT(pipe != NULL);
@@ -607,7 +620,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, rv);
+ nni_aio_cancel(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -627,7 +640,7 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
+ nni_plat_ipc_ep_accept(ep->iep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -648,7 +661,7 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
+ nni_plat_ipc_ep_connect(ep->iep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f0f07592..759e20f5 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -38,9 +38,9 @@ struct nni_tcp_pipe {
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -52,7 +52,7 @@ struct nni_tcp_ep {
size_t rcvmax;
nni_duration linger;
int ipv4only;
- nni_aio aio;
+ nni_aio * aio;
nni_aio * user_aio;
nni_mtx mtx;
};
@@ -98,13 +98,13 @@ nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *p = arg;
- nni_aio_stop(&p->rxaio);
- nni_aio_stop(&p->txaio);
- nni_aio_stop(&p->negaio);
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+ nni_aio_stop(p->negaio);
- nni_aio_fini(&p->rxaio);
- nni_aio_fini(&p->txaio);
- nni_aio_fini(&p->negaio);
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+ nni_aio_fini(p->negaio);
if (p->tpp != NULL) {
nni_plat_tcp_pipe_fini(p->tpp);
}
@@ -119,14 +119,18 @@ static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p);
- nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p);
+ if (((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) {
+ nni_tcp_pipe_fini(p);
+ return (rv);
+ }
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -150,7 +154,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv)
p->user_negaio = NULL;
nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(&p->negaio, rv);
+ nni_aio_cancel(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -158,7 +162,7 @@ static void
nni_tcp_pipe_nego_cb(void *arg)
{
nni_tcp_pipe *p = arg;
- nni_aio * aio = &p->negaio;
+ nni_aio * aio = p->negaio;
int rv;
nni_mtx_lock(&p->mtx);
@@ -224,12 +228,12 @@ nni_tcp_pipe_send_cb(void *arg)
}
p->user_txaio = NULL;
- if ((rv = nni_aio_result(&p->txaio)) != 0) {
+ if ((rv = nni_aio_result(p->txaio)) != 0) {
len = 0;
} else {
len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
}
nni_aio_finish(aio, 0, len);
nni_mtx_unlock(&p->mtx);
@@ -251,7 +255,7 @@ nni_tcp_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(&p->rxaio)) != 0) {
+ if ((rv = nni_aio_result(p->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
@@ -269,6 +273,7 @@ nni_tcp_pipe_recv_cb(void *arg)
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
if (p->rxmsg == NULL) {
+ nni_aio *rxaio;
uint64_t len;
// We should have gotten a message header.
NNI_GET64(p->rxlen, len);
@@ -291,11 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg)
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- p->rxaio.a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- p->rxaio.a_iov[0].iov_len = nni_msg_len(p->rxmsg);
- p->rxaio.a_niov = 1;
+ rxaio = p->rxaio;
+ rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
+ rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg);
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -322,7 +328,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&p->txaio, rv);
+ nni_aio_cancel(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -330,8 +336,9 @@ static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
+ nni_aio * txaio;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -346,15 +353,16 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- p->txaio.a_iov[0].iov_buf = p->txlen;
- p->txaio.a_iov[0].iov_len = sizeof(p->txlen);
- p->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- p->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- p->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- p->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- p->txaio.a_niov = 3;
+ txaio = p->txaio;
+ txaio->a_iov[0].iov_buf = p->txlen;
+ txaio->a_iov[0].iov_len = sizeof(p->txlen);
+ txaio->a_iov[1].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
+ txaio->a_iov[2].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[2].iov_len = nni_msg_len(msg);
+ txaio->a_niov = 3;
- nni_plat_tcp_pipe_send(p->tpp, &p->txaio);
+ nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
}
@@ -372,7 +380,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&p->rxaio, rv);
+ nni_aio_cancel(p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -380,6 +388,7 @@ static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ nni_aio * rxaio;
nni_mtx_lock(&p->mtx);
@@ -392,11 +401,12 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- p->rxaio.a_iov[0].iov_buf = p->rxlen;
- p->rxaio.a_iov[0].iov_len = sizeof(p->rxlen);
- p->rxaio.a_niov = 1;
+ rxaio = p->rxaio;
+ rxaio->a_iov[0].iov_buf = p->rxlen;
+ rxaio->a_iov[0].iov_len = sizeof(p->rxlen);
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -525,6 +535,7 @@ static void
nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ nni_aio * negaio;
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
@@ -534,19 +545,20 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&p->txlen[4], p->proto);
NNI_PUT16(&p->txlen[6], 0);
- p->user_negaio = aio;
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- p->negaio.a_niov = 1;
- p->negaio.a_iov[0].iov_len = 8;
- p->negaio.a_iov[0].iov_buf = &p->txlen[0];
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ negaio->a_niov = 1;
+ negaio->a_iov[0].iov_len = 8;
+ negaio->a_iov[0].iov_buf = &p->txlen[0];
if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- nni_plat_tcp_pipe_send(p->tpp, &p->negaio);
+ nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -555,11 +567,11 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
if (ep->tep != NULL) {
nni_plat_tcp_ep_fini(ep->tep);
}
- nni_aio_fini(&ep->aio);
+ nni_aio_fini(ep->aio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -575,7 +587,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
char * lhost;
char * lserv;
nni_sockaddr rsa, lsa;
- nni_aio aio;
+ nni_aio * aio;
int passive;
// Make a copy of the url (to allow for destructive operations)
@@ -590,17 +602,19 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
passive = (mode == NNI_EP_MODE_DIAL ? 0 : 1);
- nni_aio_init(&aio, NULL, NULL);
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
// XXX: arguably we could defer this part to the point we do a bind
// or connect!
if ((rhost != NULL) || (rserv != NULL)) {
- aio.a_addr = &rsa;
- nni_plat_tcp_resolv(
- rhost, rserv, NNG_AF_UNSPEC, passive, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) != 0) {
+ aio->a_addr = &rsa;
+ nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
return (rv);
}
} else {
@@ -608,16 +622,17 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
if ((lhost != NULL) || (lserv != NULL)) {
- aio.a_addr = &lsa;
- nni_plat_tcp_resolv(
- lhost, lserv, NNG_AF_UNSPEC, passive, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) != 0) {
+ aio->a_addr = &lsa;
+ nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
return (rv);
}
} else {
lsa.s_un.s_family = NNG_AF_UNSPEC;
}
+ nni_aio_fini(aio);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -633,8 +648,10 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
nni_mtx_init(&ep->mtx);
- nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep);
-
+ if ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) {
+ nni_tcp_ep_fini(ep);
+ return (rv);
+ }
ep->proto = nni_sock_proto(sock);
*epp = ep;
@@ -650,7 +667,7 @@ nni_tcp_ep_close(void *arg)
nni_plat_tcp_ep_close(ep->tep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
}
static int
@@ -673,19 +690,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep)
int rv;
nni_tcp_pipe *pipe = NULL;
- if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(ep->aio.a_pipe != NULL);
+ NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe);
+ rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
done:
- ep->aio.a_pipe = NULL;
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ nni_aio_set_pipe(ep->aio, NULL);
+ aio = ep->user_aio;
+ ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
nni_aio_finish_pipe(aio, pipe);
@@ -723,7 +740,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, rv);
+ nni_aio_cancel(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -743,7 +760,7 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_tcp_ep_accept(ep->tep, &ep->aio);
+ nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -764,7 +781,7 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_tcp_ep_connect(ep->tep, &ep->aio);
+ nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/tests/resolv.c b/tests/resolv.c
index 849b41cd..49d258c3 100644
--- a/tests/resolv.c
+++ b/tests/resolv.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -130,103 +131,103 @@ TestMain("Resolver", {
nni_init();
Convey("Google DNS IPv4 resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
+ aio->a_addr = &sa;
nni_plat_tcp_resolv("google-public-dns-a.google.com", "80",
- NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(80));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.8.8") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("Numeric UDP resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_udp_resolv("8.8.4.4", "69", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_udp_resolv("8.8.4.4", "69", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(69));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("Numeric v4 resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_tcp_resolv("8.8.4.4", "80", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("8.8.4.4", "80", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(80));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("Numeric v6 resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in6.sa_family == NNG_AF_INET6);
So(sa.s_un.s_in6.sa_port == ntohs(80));
str = ip6tostr(&sa.s_un.s_in6.sa_addr);
So(strcmp(str, "::1") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("TCP Name service resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_tcp_resolv("8.8.4.4", "http", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("8.8.4.4", "http", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(80));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("UDP Name service resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_udp_resolv("8.8.4.4", "tftp", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_udp_resolv("8.8.4.4", "tftp", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(69));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
nni_fini();