aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c75
-rw-r--r--src/core/aio.h26
-rw-r--r--src/core/endpt.c85
-rw-r--r--src/core/event.h3
-rw-r--r--src/core/msgqueue.c59
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/socket.c10
-rw-r--r--src/protocol/bus/bus.c354
-rw-r--r--src/protocol/pair/pair_v0.c90
-rw-r--r--src/protocol/pair/pair_v1.c130
-rw-r--r--src/protocol/pipeline/pull.c188
-rw-r--r--src/protocol/pipeline/push.c203
-rw-r--r--src/protocol/pubsub/pub.c305
-rw-r--r--src/protocol/pubsub/sub.c236
-rw-r--r--src/protocol/reqrep/rep.c404
-rw-r--r--src/protocol/reqrep/req.c522
-rw-r--r--src/protocol/survey/respond.c445
-rw-r--r--src/protocol/survey/survey.c393
-rw-r--r--src/transport/ipc/ipc.c133
-rw-r--r--src/transport/tcp/tcp.c163
-rw-r--r--tests/resolv.c85
21 files changed, 2032 insertions, 1891 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 5b8c7970..9b308cd1 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -56,23 +56,34 @@ static nni_list nni_aio_expire_aios;
static void nni_aio_expire_add(nni_aio *);
-void
-nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
+int
+nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
+ nni_aio *aio;
+
+ if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
+ return (NNG_ENOMEM);
+ }
memset(aio, 0, sizeof(*aio));
nni_cv_init(&aio->a_cv, &nni_aio_lk);
aio->a_expire = NNI_TIME_NEVER;
aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
+ *aiop = aio;
+ return (0);
}
void
nni_aio_fini(nni_aio *aio)
{
- nni_aio_stop(aio);
+ if (aio != NULL) {
+ nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+
+ NNI_FREE_STRUCT(aio);
+ }
}
// nni_aio_stop cancels any oustanding operation, and waits for the
@@ -97,6 +108,60 @@ nni_aio_stop(nni_aio *aio)
nni_aio_wait(aio);
}
+void
+nni_aio_set_timeout(nni_aio *aio, nni_time when)
+{
+ aio->a_expire = when;
+}
+
+void
+nni_aio_set_msg(nni_aio *aio, nni_msg *msg)
+{
+ aio->a_msg = msg;
+}
+
+nni_msg *
+nni_aio_get_msg(nni_aio *aio)
+{
+ return (aio->a_msg);
+}
+
+void
+nni_aio_set_pipe(nni_aio *aio, void *p)
+{
+ aio->a_pipe = p;
+}
+
+void *
+nni_aio_get_pipe(nni_aio *aio)
+{
+ return (aio->a_pipe);
+}
+
+void
+nni_aio_set_ep(nni_aio *aio, void *ep)
+{
+ aio->a_endpt = ep;
+}
+
+void *
+nni_aio_get_ep(nni_aio *aio)
+{
+ return (aio->a_endpt);
+}
+
+void
+nni_aio_set_data(nni_aio *aio, void *data)
+{
+ aio->a_data = data;
+}
+
+void *
+nni_aio_get_data(nni_aio *aio)
+{
+ return (aio->a_data);
+}
+
int
nni_aio_result(nni_aio *aio)
{
diff --git a/src/core/aio.h b/src/core/aio.h
index 9114c9fe..eae17446 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -52,6 +52,9 @@ struct nni_aio {
// Resolver operations.
nni_sockaddr *a_addr;
+ // Extra user data.
+ void *a_data;
+
// Provider-use fields.
nni_aio_cancelfn a_prov_cancel;
void * a_prov_data;
@@ -65,7 +68,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern void nni_aio_init(nni_aio *, nni_cb, void *);
+extern int nni_aio_init(nni_aio **, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
@@ -83,6 +86,27 @@ extern void nni_aio_fini(nni_aio *);
// use nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);
+// nni_aio_set_data sets user data. This should only be done by the
+// consumer, initiating the I/O. The intention is to be able to store
+// additional data for use when the operation callback is executed.
+extern void nni_aio_set_data(nni_aio *, void *);
+
+// nni_aio_get_data returns the user data that was previously stored
+// with nni_aio_set_data.
+extern void *nni_aio_get_data(nni_aio *);
+
+extern void nni_aio_set_msg(nni_aio *, nni_msg *);
+extern nni_msg *nni_aio_get_msg(nni_aio *);
+extern void nni_aio_set_pipe(nni_aio *, void *);
+extern void * nni_aio_get_pipe(nni_aio *);
+extern void nni_aio_set_ep(nni_aio *, void *);
+extern void * nni_aio_get_ep(nni_aio *);
+
+// nni_aio_set_timeout sets the timeout (absolute) when the AIO will
+// be canceled. The cancelation does not happen until after nni_aio_start
+// is called.
+extern void nni_aio_set_timeout(nni_aio *, nni_time);
+
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9dc33867..9122bb58 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -31,10 +31,10 @@ struct nni_ep {
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
- nni_aio ep_acc_aio;
- nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
- nni_aio ep_tmo_aio; // backoff timer
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
@@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep)
nni_sock_ep_remove(ep->ep_sock, ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
- nni_aio_fini(&ep->ep_acc_aio);
- nni_aio_fini(&ep->ep_con_aio);
- nni_aio_fini(&ep->ep_con_syn);
- nni_aio_fini(&ep->ep_tmo_aio);
+ nni_aio_fini(ep->ep_acc_aio);
+ nni_aio_fini(ep->ep_con_aio);
+ nni_aio_fini(ep->ep_con_syn);
+ nni_aio_fini(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_data != NULL) {
@@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_mtx_init(&ep->ep_mtx);
nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
- nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
- nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
- nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
- nni_aio_init(&ep->ep_con_syn, NULL, NULL);
- if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
@@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
@@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- ep->ep_tmo_aio.a_expire =
- nni_clock() + (backoff ? nni_random() % backoff : 0);
+ nni_aio_set_timeout(ep->ep_tmo_aio,
+ nni_clock() + (backoff ? nni_random() % backoff : 0));
+
ep->ep_tmo_run = 1;
- if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
+ if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
ep->ep_tmo_run = 0;
}
}
@@ -339,7 +340,7 @@ static void
nni_ep_tmo_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_tmo_aio;
+ nni_aio *aio = ep->ep_tmo_aio;
nni_mtx_lock(&ep->ep_mtx);
if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
@@ -356,11 +357,11 @@ static void
nni_ep_con_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, aio->a_pipe);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
@@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg)
static void
nni_ep_con_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
}
@@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags)
}
// Synchronous mode: so we have to wait for it to complete.
- aio = &ep->ep_con_syn;
- aio->a_endpt = ep->ep_data;
+ aio = ep->ep_con_syn;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
ep->ep_started = 1;
nni_mtx_unlock(&ep->ep_mtx);
@@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// As we're synchronous, we also have to handle the completion.
if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) {
+ ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
@@ -458,12 +459,12 @@ static void
nni_ep_acc_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(aio->a_pipe != NULL);
- rv = nni_pipe_create(ep, aio->a_pipe);
+ NNI_ASSERT(nni_aio_get_pipe(aio) != NULL);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
@@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg)
static void
nni_ep_acc_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_pipe = NULL;
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_pipe(aio, NULL);
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_accept(ep->ep_data, aio);
}
diff --git a/src/core/event.h b/src/core/event.h
index 22af096e..9536d206 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -25,7 +26,7 @@ struct nng_notify {
void * n_arg;
int n_type;
nni_sock * n_sock;
- nni_aio n_aio;
+ nni_aio * n_aio;
};
extern void nni_ev_init(nni_event *, int, nni_sock *);
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index ef7dd5a2..6fd95f98 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq)
size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
+ msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(raio);
nni_aio_list_remove(waio);
@@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq)
mq->mq_put = 0;
}
mq->mq_len++;
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_finish(waio, 0, len);
continue;
}
@@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
- raio->a_msg = msg;
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
continue;
@@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq)
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
- waio->a_msg = NULL;
+ msg = nni_aio_get_msg(waio);
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
nni_aio_list_remove(raio);
@@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- nni_msgq_aio_get(mq, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *msgp = aio.a_msg;
- aio.a_msg = NULL;
- }
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_msgq_aio_get(mq, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) == 0) {
+ *msgp = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ }
+ nni_aio_fini(aio);
return (rv);
}
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- aio.a_msg = msg;
- nni_msgq_aio_put(mq, &aio);
- nni_aio_wait(&aio);
- rv = nni_aio_result(&aio);
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_aio_set_msg(aio, msg);
+ nni_msgq_aio_put(mq, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 0c024c66..0670ed01 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -30,7 +30,7 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_list_node p_reap_node;
- nni_aio p_start_aio;
+ nni_aio * p_start_aio;
};
static nni_idhash *nni_pipes;
@@ -99,7 +99,7 @@ nni_pipe_destroy(nni_pipe *p)
}
// Stop any pending negotiation.
- nni_aio_stop(&p->p_start_aio);
+ nni_aio_stop(p->p_start_aio);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
@@ -112,7 +112,7 @@ nni_pipe_destroy(nni_pipe *p)
// We have exclusive access at this point, so we can check if
// we are still on any lists.
- nni_aio_fini(&p->p_start_aio);
+ nni_aio_fini(p->p_start_aio);
if (nni_list_node_active(&p->p_ep_node)) {
nni_ep_pipe_remove(p->p_ep, p);
@@ -172,7 +172,7 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
// abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
+ nni_aio_cancel(p->p_start_aio, NNG_ECLOSED);
}
void
@@ -205,7 +205,7 @@ static void
nni_pipe_start_cb(void *arg)
{
nni_pipe *p = arg;
- nni_aio * aio = &p->p_start_aio;
+ nni_aio * aio = p->p_start_aio;
int rv;
if ((rv = nni_aio_result(aio)) != 0) {
@@ -270,9 +270,9 @@ void
nni_pipe_start(nni_pipe *p)
{
if (p->p_tran_ops.p_start == NULL) {
- nni_aio_finish(&p->p_start_aio, 0, 0);
+ nni_aio_finish(p->p_start_aio, 0, 0);
} else {
- p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio);
+ p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
}
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 6a242558..01fd9a0c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -222,7 +222,7 @@ nni_sock_cansend_cb(void *arg)
nni_notify *notify = arg;
nni_sock * sock = notify->n_sock;
- if (nni_aio_result(&notify->n_aio) != 0) {
+ if (nni_aio_result(notify->n_aio) != 0) {
return;
}
@@ -235,7 +235,7 @@ nni_sock_canrecv_cb(void *arg)
nni_notify *notify = arg;
nni_sock * sock = notify->n_sock;
- if (nni_aio_result(&notify->n_aio) != 0) {
+ if (nni_aio_result(notify->n_aio) != 0) {
return;
}
@@ -259,11 +259,11 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
switch (type) {
case NNG_EV_CAN_RCV:
nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
+ nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio);
break;
case NNG_EV_CAN_SND:
nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
+ nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio);
break;
default:
NNI_FREE_STRUCT(notify);
@@ -276,7 +276,7 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
void
nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
{
- nni_aio_fini(&notify->n_aio);
+ nni_aio_fini(notify->n_aio);
NNI_FREE_STRUCT(notify);
}
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 6f6def0a..c8d759f2 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -18,251 +18,257 @@
// for each participant to receive the message, each sender must be connected
// to every other node in the network (full mesh).
-typedef struct nni_bus_pipe nni_bus_pipe;
-typedef struct nni_bus_sock nni_bus_sock;
-
-static void nni_bus_sock_getq(nni_bus_sock *);
-static void nni_bus_pipe_getq(nni_bus_pipe *);
-static void nni_bus_pipe_send(nni_bus_pipe *);
-static void nni_bus_pipe_recv(nni_bus_pipe *);
-
-static void nni_bus_sock_getq_cb(void *);
-static void nni_bus_pipe_getq_cb(void *);
-static void nni_bus_pipe_send_cb(void *);
-static void nni_bus_pipe_recv_cb(void *);
-static void nni_bus_pipe_putq_cb(void *);
-
-// An nni_bus_sock is our per-socket protocol private structure.
-struct nni_bus_sock {
+typedef struct bus_pipe bus_pipe;
+typedef struct bus_sock bus_sock;
+
+static void bus_sock_getq(bus_sock *);
+static void bus_pipe_getq(bus_pipe *);
+static void bus_pipe_send(bus_pipe *);
+static void bus_pipe_recv(bus_pipe *);
+
+static void bus_sock_getq_cb(void *);
+static void bus_pipe_getq_cb(void *);
+static void bus_pipe_send_cb(void *);
+static void bus_pipe_recv_cb(void *);
+static void bus_pipe_putq_cb(void *);
+
+// A bus_sock is our per-socket protocol private structure.
+struct bus_sock {
nni_sock *nsock;
int raw;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
};
-// An nni_bus_pipe is our per-pipe protocol private structure.
-struct nni_bus_pipe {
+// A bus_pipe is our per-pipe protocol private structure.
+struct bus_pipe {
nni_pipe * npipe;
- nni_bus_sock *psock;
+ bus_sock * psock;
nni_msgq * sendq;
nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_putq;
+ nni_aio * aio_getq;
+ nni_aio * aio_recv;
+ nni_aio * aio_send;
+ nni_aio * aio_putq;
nni_mtx mtx;
};
static void
-nni_bus_sock_fini(void *arg)
+bus_sock_fini(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_bus_sock_init(void **sp, nni_sock *nsock)
+bus_sock_init(void **sp, nni_sock *nsock)
{
- nni_bus_sock *psock;
+ bus_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&psock->pipes, nni_bus_pipe, node);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_bus_sock_getq_cb, psock);
- psock->nsock = nsock;
- psock->raw = 0;
+ NNI_LIST_INIT(&s->pipes, bus_pipe, node);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, bus_sock_getq_cb, s)) != 0) {
+ bus_sock_fini(s);
+ return (rv);
+ }
+ s->nsock = nsock;
+ s->raw = 0;
- *sp = psock;
+ *sp = s;
return (0);
}
static void
-nni_bus_sock_open(void *arg)
+bus_sock_open(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_bus_sock_getq(psock);
+ bus_sock_getq(s);
}
static void
-nni_bus_sock_close(void *arg)
+bus_sock_close(void *arg)
{
- nni_bus_sock *psock = arg;
+ bus_sock *s = arg;
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_bus_pipe_fini(void *arg)
+bus_pipe_fini(void *arg)
{
- nni_bus_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- nni_mtx_fini(&ppipe->mtx);
- NNI_FREE_STRUCT(ppipe);
+ bus_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_bus_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+bus_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_bus_pipe *ppipe;
- int rv;
+ bus_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ NNI_LIST_NODE_INIT(&p->node);
+ nni_mtx_init(&p->mtx);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, bus_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, bus_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, bus_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, bus_pipe_putq_cb, p)) != 0)) {
+ bus_pipe_fini(p);
return (rv);
}
- NNI_LIST_NODE_INIT(&ppipe->node);
- nni_mtx_init(&ppipe->mtx);
- nni_aio_init(&ppipe->aio_getq, nni_bus_pipe_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_bus_pipe_send_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_bus_pipe_recv_cb, ppipe);
- nni_aio_init(&ppipe->aio_putq, nni_bus_pipe_putq_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
+
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
return (0);
}
static int
-nni_bus_pipe_start(void *arg)
+bus_pipe_start(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
- nni_mtx_lock(&psock->mtx);
- nni_list_append(&psock->pipes, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
- nni_bus_pipe_recv(ppipe);
- nni_bus_pipe_getq(ppipe);
+ bus_pipe_recv(p);
+ bus_pipe_getq(p);
return (0);
}
static void
-nni_bus_pipe_stop(void *arg)
+bus_pipe_stop(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
- nni_msgq_close(ppipe->sendq);
+ nni_msgq_close(p->sendq);
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
- nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_mtx_lock(&ppipe->psock->mtx);
- if (nni_list_active(&psock->pipes, ppipe)) {
- nni_list_remove(&psock->pipes, ppipe);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&ppipe->psock->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_bus_pipe_getq_cb(void *arg)
+bus_pipe_getq_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
// closed?
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
-nni_bus_pipe_send_cb(void *arg)
+bus_pipe_send_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
+ if (nni_aio_result(p->aio_send) != 0) {
// closed?
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_bus_pipe_getq(ppipe);
+ bus_pipe_getq(p);
}
static void
-nni_bus_pipe_recv_cb(void *arg)
+bus_pipe_recv_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
- nni_bus_sock *psock = ppipe->psock;
- nni_msg * msg;
+ bus_pipe *p = arg;
+ bus_sock *s = p->psock;
+ nni_msg * msg;
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- msg = ppipe->aio_recv.a_msg;
+ msg = nni_aio_get_msg(p->aio_recv);
- if (nni_msg_header_insert_u32(msg, nni_pipe_id(ppipe->npipe)) != 0) {
+ if (nni_msg_header_insert_u32(msg, nni_pipe_id(p->npipe)) != 0) {
// XXX: bump a nomemory stat
nni_msg_free(msg);
- nni_pipe_stop(ppipe->npipe);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(nni_sock_recvq(psock->nsock), &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq);
}
static void
-nni_bus_pipe_putq_cb(void *arg)
+bus_pipe_putq_cb(void *arg)
{
- nni_bus_pipe *ppipe = arg;
+ bus_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
// Wait for another recv.
- nni_bus_pipe_recv(ppipe);
+ bus_pipe_recv(p);
}
static void
-nni_bus_sock_getq_cb(void *arg)
+bus_sock_getq_cb(void *arg)
{
- nni_bus_sock *psock = arg;
- nni_bus_pipe *ppipe;
- nni_bus_pipe *lpipe;
- nni_msgq * uwq = nni_sock_sendq(psock->nsock);
- nni_msg * msg;
- nni_msg * dup;
- uint32_t sender;
-
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ bus_sock *s = arg;
+ bus_pipe *p;
+ bus_pipe *lastp;
+ nni_msgq *uwq = nni_sock_sendq(s->nsock);
+ nni_msg * msg;
+ nni_msg * dup;
+ uint32_t sender;
+
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = psock->aio_getq.a_msg;
+ msg = nni_aio_get_msg(s->aio_getq);
// The header being present indicates that the message
// was received locally and we are rebroadcasting. (Device
@@ -274,103 +280,101 @@ nni_bus_sock_getq_cb(void *arg)
sender = 0;
}
- nni_mtx_lock(&psock->mtx);
- lpipe = nni_list_last(&psock->pipes);
- NNI_LIST_FOREACH (&psock->pipes, ppipe) {
- if (nni_pipe_id(ppipe->npipe) == sender) {
+ nni_mtx_lock(&s->mtx);
+ lastp = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (nni_pipe_id(p->npipe) == sender) {
continue;
}
- if (ppipe != lpipe) {
+ if (p != lastp) {
if (nni_msg_dup(&dup, msg) != 0) {
continue;
}
} else {
dup = msg;
}
- if (nni_msgq_tryput(ppipe->sendq, dup) != 0) {
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
- if (lpipe == NULL) {
+ if (lastp == NULL) {
nni_msg_free(msg);
}
- nni_bus_sock_getq(psock);
+ bus_sock_getq(s);
}
static void
-nni_bus_sock_getq(nni_bus_sock *psock)
+bus_sock_getq(bus_sock *s)
{
- nni_msgq_aio_get(nni_sock_sendq(psock->nsock), &psock->aio_getq);
+ nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq);
}
static void
-nni_bus_pipe_getq(nni_bus_pipe *ppipe)
+bus_pipe_getq(bus_pipe *p)
{
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_bus_pipe_recv(nni_bus_pipe *ppipe)
+bus_pipe_recv(bus_pipe *p)
{
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-nni_bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+bus_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_bus_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ bus_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+bus_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_bus_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ bus_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_bus_pipe_ops = {
- .pipe_init = nni_bus_pipe_init,
- .pipe_fini = nni_bus_pipe_fini,
- .pipe_start = nni_bus_pipe_start,
- .pipe_stop = nni_bus_pipe_stop,
+static nni_proto_pipe_ops bus_pipe_ops = {
+ .pipe_init = bus_pipe_init,
+ .pipe_fini = bus_pipe_fini,
+ .pipe_start = bus_pipe_start,
+ .pipe_stop = bus_pipe_stop,
};
-static nni_proto_sock_ops nni_bus_sock_ops = {
- .sock_init = nni_bus_sock_init,
- .sock_fini = nni_bus_sock_fini,
- .sock_open = nni_bus_sock_open,
- .sock_close = nni_bus_sock_close,
- .sock_setopt = nni_bus_sock_setopt,
- .sock_getopt = nni_bus_sock_getopt,
+static nni_proto_sock_ops bus_sock_ops = {
+ .sock_init = bus_sock_init,
+ .sock_fini = bus_sock_fini,
+ .sock_open = bus_sock_open,
+ .sock_close = bus_sock_close,
+ .sock_setopt = bus_sock_setopt,
+ .sock_getopt = bus_sock_getopt,
};
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-nni_proto nni_bus_proto = {
+static nni_proto bus_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_BUS_V0, "bus" },
.proto_peer = { NNG_PROTO_BUS_V0, "bus" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_bus_sock_ops,
- .proto_pipe_ops = &nni_bus_pipe_ops,
+ .proto_sock_ops = &bus_sock_ops,
+ .proto_pipe_ops = &bus_pipe_ops,
};
int
nng_bus0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_bus_proto));
+ return (nni_proto_open(sidp, &bus_proto));
}
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index ef420051..486ce43b 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -43,10 +43,10 @@ struct pair0_sock {
struct pair0_pipe {
nni_pipe * npipe;
pair0_sock *psock;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
};
static int
@@ -76,18 +76,34 @@ pair0_sock_fini(void *arg)
NNI_FREE_STRUCT(s);
}
+static void
+pair0_pipe_fini(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
pair0_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair0_send_cb, p);
- nni_aio_init(&p->aio_recv, pair0_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair0_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair0_putq_cb, p);
+ if (((rv = nni_aio_init(&p->aio_send, pair0_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair0_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair0_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair0_putq_cb, p)) != 0)) {
+ pair0_pipe_fini(p);
+ return (rv);
+ }
p->npipe = npipe;
p->psock = psock;
@@ -95,18 +111,6 @@ pair0_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (0);
}
-static void
-pair0_pipe_fini(void *arg)
-{
- pair0_pipe *p = arg;
-
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair0_pipe_start(void *arg)
{
@@ -123,8 +127,8 @@ pair0_pipe_start(void *arg)
// Schedule a getq on the upper, and a read from the pipe.
// Each of these also sets up another hold on the pipe itself.
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -135,10 +139,10 @@ pair0_pipe_stop(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
nni_mtx_lock(&s->mtx);
if (s->ppipe == p) {
@@ -154,17 +158,17 @@ pair0_recv_cb(void *arg)
pair0_sock *s = p->psock;
nni_msg * msg;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_putq.a_msg = msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -172,13 +176,13 @@ pair0_putq_cb(void *arg)
{
pair0_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -187,14 +191,14 @@ pair0_getq_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- p->aio_send.a_msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
@@ -203,14 +207,14 @@ pair0_send_cb(void *arg)
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index a737402f..d6c8ea75 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -41,7 +41,7 @@ struct pair1_sock {
nni_list plist;
int started;
int poly;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
};
// pair1_pipe is our per-pipe protocol private structure.
@@ -49,10 +49,10 @@ struct pair1_pipe {
nni_pipe * npipe;
pair1_sock * psock;
nni_msgq * sendq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
nni_list_node node;
};
@@ -61,7 +61,7 @@ pair1_sock_fini(void *arg)
{
pair1_sock *s = arg;
- nni_aio_fini(&s->aio_getq);
+ nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
@@ -85,10 +85,10 @@ pair1_sock_init(void **sp, nni_sock *nsock)
NNI_LIST_INIT(&s->plist, pair1_pipe, node);
// Raw mode uses this.
- nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s);
nni_mtx_init(&s->mtx);
- if ((rv = nni_option_register("polyamorous", &poly)) != 0) {
+ if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
+ ((rv = nni_option_register("polyamorous", &poly)) != 0)) {
pair1_sock_fini(s);
return (rv);
}
@@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (0);
}
+static void
+pair1_pipe_fini(void *arg)
+{
+ pair1_pipe *p = arg;
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
+}
+
static int
pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
{
@@ -113,14 +125,14 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&p->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(p);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p)) != 0)) {
+ pair1_pipe_fini(p);
return (NNG_ENOMEM);
}
- nni_aio_init(&p->aio_send, pair1_pipe_send_cb, p);
- nni_aio_init(&p->aio_recv, pair1_pipe_recv_cb, p);
- nni_aio_init(&p->aio_getq, pair1_pipe_getq_cb, p);
- nni_aio_init(&p->aio_putq, pair1_pipe_putq_cb, p);
p->npipe = npipe;
p->psock = psock;
@@ -129,18 +141,6 @@ pair1_pipe_init(void **pp, nni_pipe *npipe, void *psock)
return (rv);
}
-static void
-pair1_pipe_fini(void *arg)
-{
- pair1_pipe *p = arg;
- nni_aio_fini(&p->aio_send);
- nni_aio_fini(&p->aio_recv);
- nni_aio_fini(&p->aio_putq);
- nni_aio_fini(&p->aio_getq);
- nni_msgq_fini(p->sendq);
- NNI_FREE_STRUCT(p);
-}
-
static int
pair1_pipe_start(void *arg)
{
@@ -163,7 +163,7 @@ pair1_pipe_start(void *arg)
}
} else {
if (!s->started) {
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
}
nni_list_append(&s->plist, p);
@@ -171,16 +171,16 @@ pair1_pipe_start(void *arg)
nni_mtx_unlock(&s->mtx);
// Schedule a getq. In polyamorous mode we get on the per pipe
- // sendq, as the socket distributes to us. In monogamous mode we
- // bypass and get from the upper writeq directly (saving a set of
- // context switches).
+ // sendq, as the socket distributes to us. In monogamous mode
+ // we bypass and get from the upper writeq directly (saving a
+ // set of context switches).
if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
} else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
// And the pipe read of course.
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
@@ -197,10 +197,10 @@ pair1_pipe_stop(void *arg)
nni_mtx_unlock(&s->mtx);
nni_msgq_close(p->sendq);
- nni_aio_cancel(&p->aio_send, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_recv, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_putq, NNG_ECANCELED);
- nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_send, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_recv, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_putq, NNG_ECANCELED);
+ nni_aio_cancel(p->aio_getq, NNG_ECANCELED);
}
static void
@@ -213,13 +213,13 @@ pair1_pipe_recv_cb(void *arg)
nni_pipe * npipe = p->npipe;
int rv;
- if (nni_aio_result(&p->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_recv.a_msg;
- p->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe ID.
nni_msg_set_pipe(msg, nni_pipe_id(p->npipe));
@@ -241,20 +241,20 @@ pair1_pipe_recv_cb(void *arg)
// keep getting more.
if (hdr > (unsigned) s->ttl) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Store the pipe id followed by the hop count.
if ((rv = nni_msg_header_append_u32(msg, hdr)) != 0) {
nni_msg_free(msg);
- nni_pipe_recv(npipe, &p->aio_recv);
+ nni_pipe_recv(npipe, p->aio_recv);
return;
}
// Send the message up.
- p->aio_putq.a_msg = msg;
- nni_msgq_aio_put(s->urq, &p->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -265,13 +265,13 @@ pair1_sock_getq_cb(void *arg)
nni_msg * msg;
uint32_t id;
- if (nni_aio_result(&s->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Socket closing...
return;
}
- msg = s->aio_getq.a_msg;
- s->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// By definition we are in polyamorous mode.
NNI_ASSERT(s->poly);
@@ -289,7 +289,7 @@ pair1_sock_getq_cb(void *arg)
// Pipe not present!
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
return;
}
@@ -302,7 +302,7 @@ pair1_sock_getq_cb(void *arg)
}
nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->uwq, &s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
@@ -310,13 +310,13 @@ pair1_pipe_putq_cb(void *arg)
{
pair1_pipe *p = arg;
- if (nni_aio_result(&p->aio_putq) != 0) {
- nni_msg_free(p->aio_putq.a_msg);
- p->aio_putq.a_msg = NULL;
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(p->npipe, &p->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
@@ -327,13 +327,13 @@ pair1_pipe_getq_cb(void *arg)
nni_msg * msg;
uint32_t hops;
- if (nni_aio_result(&p->aio_getq) != 0) {
+ if (nni_aio_result(p->aio_getq) != 0) {
nni_pipe_stop(p->npipe);
return;
}
- msg = p->aio_getq.a_msg;
- p->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_getq);
+ nni_aio_set_msg(p->aio_getq, NULL);
// Raw mode messages have the header already formed, with
// a hop count. Cooked mode messages have no
@@ -354,13 +354,13 @@ pair1_pipe_getq_cb(void *arg)
goto badmsg;
}
- p->aio_send.a_msg = msg;
- nni_pipe_send(p->npipe, &p->aio_send);
+ nni_aio_set_msg(p->aio_send, msg);
+ nni_pipe_send(p->npipe, p->aio_send);
return;
badmsg:
nni_msg_free(msg);
- nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, &p->aio_getq);
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void
@@ -369,20 +369,16 @@ pair1_pipe_send_cb(void *arg)
pair1_pipe *p = arg;
pair1_sock *s = p->psock;
- if (nni_aio_result(&p->aio_send) != 0) {
- nni_msg_free(p->aio_send.a_msg);
- p->aio_send.a_msg = NULL;
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
nni_pipe_stop(p->npipe);
return;
}
// In polyamorous mode, we want to get from the sendq; in
// monogamous we get from upper writeq.
- if (s->poly) {
- nni_msgq_aio_get(p->sendq, &p->aio_getq);
- } else {
- nni_msgq_aio_get(s->uwq, &p->aio_getq);
- }
+ nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq);
}
static void
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 1d738ec2..0d66aab8 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -15,208 +15,212 @@
// Pull protocol. The PULL protocol is the "read" side of a pipeline.
-typedef struct nni_pull_pipe nni_pull_pipe;
-typedef struct nni_pull_sock nni_pull_sock;
+typedef struct pull_pipe pull_pipe;
+typedef struct pull_sock pull_sock;
-static void nni_pull_putq_cb(void *);
-static void nni_pull_recv_cb(void *);
-static void nni_pull_putq(nni_pull_pipe *, nni_msg *);
+static void pull_putq_cb(void *);
+static void pull_recv_cb(void *);
+static void pull_putq(pull_pipe *, nni_msg *);
-// An nni_pull_sock is our per-socket protocol private structure.
-struct nni_pull_sock {
+// A pull_sock is our per-socket protocol private structure.
+struct pull_sock {
nni_msgq *urq;
int raw;
};
-// An nni_pull_pipe is our per-pipe protocol private structure.
-struct nni_pull_pipe {
- nni_pipe * pipe;
- nni_pull_sock *pull;
- nni_aio putq_aio;
- nni_aio recv_aio;
+// A pull_pipe is our per-pipe protocol private structure.
+struct pull_pipe {
+ nni_pipe * pipe;
+ pull_sock *pull;
+ nni_aio * putq_aio;
+ nni_aio * recv_aio;
};
static int
-nni_pull_sock_init(void **pullp, nni_sock *sock)
+pull_sock_init(void **sp, nni_sock *sock)
{
- nni_pull_sock *pull;
+ pull_sock *s;
- if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- pull->raw = 0;
- pull->urq = nni_sock_recvq(sock);
- *pullp = pull;
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
nni_sock_senderr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_pull_sock_fini(void *arg)
+pull_sock_fini(void *arg)
{
- nni_pull_sock *pull = arg;
+ pull_sock *s = arg;
- NNI_FREE_STRUCT(pull);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+pull_pipe_fini(void *arg)
+{
+ pull_pipe *p = arg;
+
+ nni_aio_fini(p->putq_aio);
+ nni_aio_fini(p->recv_aio);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_pull_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+pull_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_pull_pipe *pp;
+ pull_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&pp->putq_aio, nni_pull_putq_cb, pp);
- nni_aio_init(&pp->recv_aio, nni_pull_recv_cb, pp);
+ if (((rv = nni_aio_init(&p->putq_aio, pull_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->recv_aio, pull_recv_cb, p)) != 0)) {
+ pull_pipe_fini(p);
+ return (rv);
+ }
- pp->pipe = pipe;
- pp->pull = psock;
- *ppp = pp;
+ p->pipe = pipe;
+ p->pull = s;
+ *pp = p;
return (0);
}
-static void
-nni_pull_pipe_fini(void *arg)
-{
- nni_pull_pipe *pp = arg;
-
- nni_aio_fini(&pp->putq_aio);
- nni_aio_fini(&pp->recv_aio);
- NNI_FREE_STRUCT(pp);
-}
-
static int
-nni_pull_pipe_start(void *arg)
+pull_pipe_start(void *arg)
{
- nni_pull_pipe *pp = arg;
+ pull_pipe *p = arg;
// Start the pending pull...
- nni_pipe_recv(pp->pipe, &pp->recv_aio);
+ nni_pipe_recv(p->pipe, p->recv_aio);
return (0);
}
static void
-nni_pull_pipe_stop(void *arg)
+pull_pipe_stop(void *arg)
{
- nni_pull_pipe *pp = arg;
+ pull_pipe *p = arg;
- nni_aio_stop(&pp->putq_aio);
- nni_aio_stop(&pp->recv_aio);
+ nni_aio_stop(p->putq_aio);
+ nni_aio_stop(p->recv_aio);
}
static void
-nni_pull_recv_cb(void *arg)
+pull_recv_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio * aio = &pp->recv_aio;
- nni_msg * msg;
+ pull_pipe *p = arg;
+ nni_aio * aio = p->recv_aio;
+ nni_msg * msg;
if (nni_aio_result(aio) != 0) {
// Failed to get a message, probably the pipe is closed.
- nni_pipe_stop(pp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
// Got a message... start the put to send it up to the application.
- msg = aio->a_msg;
- aio->a_msg = NULL;
- nni_pull_putq(pp, msg);
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ pull_putq(p, msg);
}
static void
-nni_pull_putq_cb(void *arg)
+pull_putq_cb(void *arg)
{
- nni_pull_pipe *pp = arg;
- nni_aio * aio = &pp->putq_aio;
+ pull_pipe *p = arg;
+ nni_aio * aio = p->putq_aio;
if (nni_aio_result(aio) != 0) {
// If we failed to put, probably NNG_ECLOSED, nothing else
// we can do. Just close the pipe.
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(pp->pipe, &pp->recv_aio);
+ nni_pipe_recv(p->pipe, p->recv_aio);
}
// nni_pull_putq schedules a put operation to the user socket (sendup).
static void
-nni_pull_putq(nni_pull_pipe *pp, nni_msg *msg)
+pull_putq(pull_pipe *p, nni_msg *msg)
{
- nni_pull_sock *pull = pp->pull;
+ pull_sock *s = p->pull;
- pp->putq_aio.a_msg = msg;
+ nni_aio_set_msg(p->putq_aio, msg);
- nni_msgq_aio_put(pull->urq, &pp->putq_aio);
+ nni_msgq_aio_put(s->urq, p->putq_aio);
}
static void
-nni_pull_sock_open(void *arg)
+pull_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_pull_sock_close(void *arg)
+pull_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static int
-nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_pull_sock *pull = arg;
- int rv = NNG_ENOTSUP;
+ pull_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_pull_sock *pull = arg;
- int rv = NNG_ENOTSUP;
+ pull_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&pull->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_pull_pipe_ops = {
- .pipe_init = nni_pull_pipe_init,
- .pipe_fini = nni_pull_pipe_fini,
- .pipe_start = nni_pull_pipe_start,
- .pipe_stop = nni_pull_pipe_stop,
+static nni_proto_pipe_ops pull_pipe_ops = {
+ .pipe_init = pull_pipe_init,
+ .pipe_fini = pull_pipe_fini,
+ .pipe_start = pull_pipe_start,
+ .pipe_stop = pull_pipe_stop,
};
-static nni_proto_sock_ops nni_pull_sock_ops = {
- .sock_init = nni_pull_sock_init,
- .sock_fini = nni_pull_sock_fini,
- .sock_open = nni_pull_sock_open,
- .sock_close = nni_pull_sock_close,
- .sock_setopt = nni_pull_sock_setopt,
- .sock_getopt = nni_pull_sock_getopt,
+static nni_proto_sock_ops pull_sock_ops = {
+ .sock_init = pull_sock_init,
+ .sock_fini = pull_sock_fini,
+ .sock_open = pull_sock_open,
+ .sock_close = pull_sock_close,
+ .sock_setopt = pull_sock_setopt,
+ .sock_getopt = pull_sock_getopt,
};
-nni_proto nni_pull_proto = {
+static nni_proto pull_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PULL_V0, "pull" },
.proto_peer = { NNG_PROTO_PUSH_V0, "push" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_pipe_ops = &nni_pull_pipe_ops,
- .proto_sock_ops = &nni_pull_sock_ops,
+ .proto_pipe_ops = &pull_pipe_ops,
+ .proto_sock_ops = &pull_sock_ops,
};
int
nng_pull0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_pull_proto));
+ return (nni_proto_open(sidp, &pull_proto));
}
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 10d04091..5e32efee 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -17,228 +17,231 @@
// Push distributes fairly, or tries to, by giving messages in round-robin
// order.
-typedef struct nni_push_pipe nni_push_pipe;
-typedef struct nni_push_sock nni_push_sock;
+typedef struct push_pipe push_pipe;
+typedef struct push_sock push_sock;
-static void nni_push_send_cb(void *);
-static void nni_push_recv_cb(void *);
-static void nni_push_getq_cb(void *);
+static void push_send_cb(void *);
+static void push_recv_cb(void *);
+static void push_getq_cb(void *);
// An nni_push_sock is our per-socket protocol private structure.
-struct nni_push_sock {
+struct push_sock {
nni_msgq *uwq;
int raw;
nni_sock *sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
-struct nni_push_pipe {
- nni_pipe * pipe;
- nni_push_sock *push;
- nni_list_node node;
-
- nni_aio aio_recv;
- nni_aio aio_send;
- nni_aio aio_getq;
+struct push_pipe {
+ nni_pipe * pipe;
+ push_sock * push;
+ nni_list_node node;
+
+ nni_aio *aio_recv;
+ nni_aio *aio_send;
+ nni_aio *aio_getq;
};
static int
-nni_push_sock_init(void **pushp, nni_sock *sock)
+push_sock_init(void **sp, nni_sock *sock)
{
- nni_push_sock *push;
+ push_sock *s;
- if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- push->raw = 0;
- push->sock = sock;
- push->uwq = nni_sock_sendq(sock);
- *pushp = push;
+ s->raw = 0;
+ s->sock = sock;
+ s->uwq = nni_sock_sendq(sock);
+ *sp = s;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_push_sock_fini(void *arg)
+push_sock_fini(void *arg)
{
- nni_push_sock *push = arg;
+ push_sock *s = arg;
- NNI_FREE_STRUCT(push);
+ NNI_FREE_STRUCT(s);
}
static void
-nni_push_sock_open(void *arg)
+push_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_push_sock_close(void *arg)
+push_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_push_pipe_fini(void *arg)
+push_pipe_fini(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
- nni_aio_fini(&pp->aio_recv);
- nni_aio_fini(&pp->aio_send);
- nni_aio_fini(&pp->aio_getq);
- NNI_FREE_STRUCT(pp);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_getq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_push_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+push_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_push_pipe *pp;
+ push_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&pp->aio_recv, nni_push_recv_cb, pp);
- nni_aio_init(&pp->aio_send, nni_push_send_cb, pp);
- nni_aio_init(&pp->aio_getq, nni_push_getq_cb, pp);
-
- NNI_LIST_NODE_INIT(&pp->node);
- pp->pipe = pipe;
- pp->push = psock;
- *ppp = pp;
+ if (((rv = nni_aio_init(&p->aio_recv, push_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, push_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, push_getq_cb, p)) != 0)) {
+ push_pipe_fini(p);
+ return (rv);
+ }
+ NNI_LIST_NODE_INIT(&p->node);
+ p->pipe = pipe;
+ p->push = s;
+ *pp = p;
return (0);
}
static int
-nni_push_pipe_start(void *arg)
+push_pipe_start(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ push_pipe *p = arg;
+ push_sock *s = p->push;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_PULL) {
return (NNG_EPROTO);
}
// Schedule a receiver. This is mostly so that we can detect
// a closed transport pipe.
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
// Schedule a sender.
- nni_msgq_aio_get(push->uwq, &pp->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
return (0);
}
static void
-nni_push_pipe_stop(void *arg)
+push_pipe_stop(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
- nni_aio_stop(&pp->aio_recv);
- nni_aio_stop(&pp->aio_send);
- nni_aio_stop(&pp->aio_getq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_getq);
}
static void
-nni_push_recv_cb(void *arg)
+push_recv_cb(void *arg)
{
- nni_push_pipe *pp = arg;
+ push_pipe *p = arg;
// We normally expect to receive an error. If a pipe actually
// sends us data, we just discard it.
- if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msg_free(pp->aio_recv.a_msg);
- pp->aio_recv.a_msg = NULL;
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_push_send_cb(void *arg)
+push_send_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_push_sock *push = pp->push;
+ push_pipe *p = arg;
+ push_sock *s = p->push;
- if (nni_aio_result(&pp->aio_send) != 0) {
- nni_msg_free(pp->aio_send.a_msg);
- pp->aio_send.a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msgq_aio_get(push->uwq, &pp->aio_getq);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
}
static void
-nni_push_getq_cb(void *arg)
+push_getq_cb(void *arg)
{
- nni_push_pipe *pp = arg;
- nni_aio * aio = &pp->aio_getq;
+ push_pipe *p = arg;
+ nni_aio * aio = p->aio_getq;
if (nni_aio_result(aio) != 0) {
// If the socket is closing, nothing else we can do.
- nni_pipe_stop(pp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = aio->a_msg;
- aio->a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
- nni_pipe_send(pp->pipe, &pp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static int
-nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_push_sock *push = arg;
- int rv = NNG_ENOTSUP;
+ push_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&push->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_push_sock *push = arg;
- int rv = NNG_ENOTSUP;
+ push_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&push->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-static nni_proto_pipe_ops nni_push_pipe_ops = {
- .pipe_init = nni_push_pipe_init,
- .pipe_fini = nni_push_pipe_fini,
- .pipe_start = nni_push_pipe_start,
- .pipe_stop = nni_push_pipe_stop,
+static nni_proto_pipe_ops push_pipe_ops = {
+ .pipe_init = push_pipe_init,
+ .pipe_fini = push_pipe_fini,
+ .pipe_start = push_pipe_start,
+ .pipe_stop = push_pipe_stop,
};
-static nni_proto_sock_ops nni_push_sock_ops = {
- .sock_init = nni_push_sock_init,
- .sock_fini = nni_push_sock_fini,
- .sock_open = nni_push_sock_open,
- .sock_close = nni_push_sock_close,
- .sock_setopt = nni_push_sock_setopt,
- .sock_getopt = nni_push_sock_getopt,
+static nni_proto_sock_ops push_sock_ops = {
+ .sock_init = push_sock_init,
+ .sock_fini = push_sock_fini,
+ .sock_open = push_sock_open,
+ .sock_close = push_sock_close,
+ .sock_setopt = push_sock_setopt,
+ .sock_getopt = push_sock_getopt,
};
-nni_proto nni_push_proto = {
+static nni_proto push_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PUSH_V0, "push" },
.proto_peer = { NNG_PROTO_PULL_V0, "pull" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_pipe_ops = &nni_push_pipe_ops,
- .proto_sock_ops = &nni_push_sock_ops,
+ .proto_pipe_ops = &push_pipe_ops,
+ .proto_sock_ops = &push_sock_ops,
};
int
nng_push0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_push_proto));
+ return (nni_proto_open(sidp, &push_proto));
}
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index bbca1ecd..b7ac361e 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -18,183 +18,188 @@
// perform sender-side filtering. Its best effort delivery, so anything
// that can't receive the message won't get one.
-typedef struct nni_pub_pipe nni_pub_pipe;
-typedef struct nni_pub_sock nni_pub_sock;
-
-static void nni_pub_pipe_recv_cb(void *);
-static void nni_pub_pipe_send_cb(void *);
-static void nni_pub_pipe_getq_cb(void *);
-static void nni_pub_sock_getq_cb(void *);
-static void nni_pub_sock_fini(void *);
-static void nni_pub_pipe_fini(void *);
-
-// An nni_pub_sock is our per-socket protocol private structure.
-struct nni_pub_sock {
+typedef struct pub_pipe pub_pipe;
+typedef struct pub_sock pub_sock;
+
+static void pub_pipe_recv_cb(void *);
+static void pub_pipe_send_cb(void *);
+static void pub_pipe_getq_cb(void *);
+static void pub_sock_getq_cb(void *);
+static void pub_sock_fini(void *);
+static void pub_pipe_fini(void *);
+
+// A pub_sock is our per-socket protocol private structure.
+struct pub_sock {
nni_sock *sock;
nni_msgq *uwq;
int raw;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
};
-// An nni_pub_pipe is our per-pipe protocol private structure.
-struct nni_pub_pipe {
+// A pub_pipe is our per-pipe protocol private structure.
+struct pub_pipe {
nni_pipe * pipe;
- nni_pub_sock *pub;
+ pub_sock * pub;
nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
nni_list_node node;
};
+static void
+pub_sock_fini(void *arg)
+{
+ pub_sock *s = arg;
+
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
static int
-nni_pub_sock_init(void **pubp, nni_sock *sock)
+pub_sock_init(void **sp, nni_sock *sock)
{
- nni_pub_sock *pub;
+ pub_sock *s;
+ int rv;
- if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&pub->mtx);
- nni_aio_init(&pub->aio_getq, nni_pub_sock_getq_cb, pub);
+ nni_mtx_init(&s->mtx);
+ if ((rv = nni_aio_init(&s->aio_getq, pub_sock_getq_cb, s)) != 0) {
+ pub_sock_fini(s);
+ return (rv);
+ }
- pub->sock = sock;
- pub->raw = 0;
- NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
+ s->sock = sock;
+ s->raw = 0;
+ NNI_LIST_INIT(&s->pipes, pub_pipe, node);
- pub->uwq = nni_sock_sendq(sock);
+ s->uwq = nni_sock_sendq(sock);
- *pubp = pub;
+ *sp = s;
nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
static void
-nni_pub_sock_fini(void *arg)
+pub_sock_open(void *arg)
{
- nni_pub_sock *pub = arg;
+ pub_sock *s = arg;
- nni_aio_stop(&pub->aio_getq);
- nni_aio_fini(&pub->aio_getq);
- nni_mtx_fini(&pub->mtx);
- NNI_FREE_STRUCT(pub);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_pub_sock_open(void *arg)
+pub_sock_close(void *arg)
{
- nni_pub_sock *pub = arg;
+ pub_sock *s = arg;
- nni_msgq_aio_get(pub->uwq, &pub->aio_getq);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_pub_sock_close(void *arg)
+pub_pipe_fini(void *arg)
{
- nni_pub_sock *pub = arg;
-
- nni_aio_cancel(&pub->aio_getq, NNG_ECLOSED);
-}
-
-static void
-nni_pub_pipe_fini(void *arg)
-{
- nni_pub_pipe *pp = arg;
- nni_aio_fini(&pp->aio_getq);
- nni_aio_fini(&pp->aio_send);
- nni_aio_fini(&pp->aio_recv);
- nni_msgq_fini(pp->sendq);
- NNI_FREE_STRUCT(pp);
+ pub_pipe *p = arg;
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_pub_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
+pub_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_pub_pipe *pp;
- int rv;
+ pub_pipe *p;
+ int rv;
- if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+
// XXX: consider making this depth tunable
- if ((rv = nni_msgq_init(&pp->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(pp);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, pub_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, pub_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, pub_pipe_recv_cb, p)) != 0)) {
+
+ pub_pipe_fini(p);
return (rv);
}
- nni_aio_init(&pp->aio_getq, nni_pub_pipe_getq_cb, pp);
- nni_aio_init(&pp->aio_send, nni_pub_pipe_send_cb, pp);
- nni_aio_init(&pp->aio_recv, nni_pub_pipe_recv_cb, pp);
-
- pp->pipe = pipe;
- pp->pub = psock;
- *ppp = pp;
+ p->pipe = pipe;
+ p->pub = s;
+ *pp = p;
return (0);
}
static int
-nni_pub_pipe_start(void *arg)
+pub_pipe_start(void *arg)
{
- nni_pub_pipe *pp = arg;
- nni_pub_sock *pub = pp->pub;
+ pub_pipe *p = arg;
+ pub_sock *s = p->pub;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&pub->mtx);
- nni_list_append(&pub->pipes, pp);
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
// Start the receiver and the queue reader.
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
- nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
return (0);
}
static void
-nni_pub_pipe_stop(void *arg)
+pub_pipe_stop(void *arg)
{
- nni_pub_pipe *pp = arg;
- nni_pub_sock *pub = pp->pub;
+ pub_pipe *p = arg;
+ pub_sock *s = p->pub;
- nni_aio_stop(&pp->aio_getq);
- nni_aio_stop(&pp->aio_send);
- nni_aio_stop(&pp->aio_recv);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
- nni_msgq_close(pp->sendq);
+ nni_msgq_close(p->sendq);
- nni_mtx_lock(&pub->mtx);
- if (nni_list_active(&pub->pipes, pp)) {
- nni_list_remove(&pub->pipes, pp);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_pub_sock_getq_cb(void *arg)
+pub_sock_getq_cb(void *arg)
{
- nni_pub_sock *pub = arg;
- nni_msgq * uwq = pub->uwq;
- nni_msg * msg, *dup;
+ pub_sock *s = arg;
+ nni_msgq *uwq = s->uwq;
+ nni_msg * msg, *dup;
- nni_pub_pipe *pp;
- nni_pub_pipe *last;
- int rv;
+ pub_pipe *p;
+ pub_pipe *last;
+ int rv;
- if (nni_aio_result(&pub->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = pub->aio_getq.a_msg;
- pub->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
- nni_mtx_lock(&pub->mtx);
- last = nni_list_last(&pub->pipes);
- NNI_LIST_FOREACH (&pub->pipes, pp) {
- if (pp != last) {
+ nni_mtx_lock(&s->mtx);
+ last = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != last) {
rv = nni_msg_dup(&dup, msg);
if (rv != 0) {
continue;
@@ -202,119 +207,117 @@ nni_pub_sock_getq_cb(void *arg)
} else {
dup = msg;
}
- if ((rv = nni_msgq_tryput(pp->sendq, dup)) != 0) {
+ if ((rv = nni_msgq_tryput(p->sendq, dup)) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&pub->mtx);
+ nni_mtx_unlock(&s->mtx);
if (last == NULL) {
nni_msg_free(msg);
}
- nni_msgq_aio_get(uwq, &pub->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
}
static void
-nni_pub_pipe_recv_cb(void *arg)
+pub_pipe_recv_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_recv) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msg_free(pp->aio_recv.a_msg);
- pp->aio_recv.a_msg = NULL;
- nni_pipe_recv(pp->pipe, &pp->aio_recv);
+ nni_msg_free(nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_pub_pipe_getq_cb(void *arg)
+pub_pipe_getq_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_getq) != 0) {
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = pp->aio_getq.a_msg;
- pp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(pp->pipe, &pp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static void
-nni_pub_pipe_send_cb(void *arg)
+pub_pipe_send_cb(void *arg)
{
- nni_pub_pipe *pp = arg;
+ pub_pipe *p = arg;
- if (nni_aio_result(&pp->aio_send) != 0) {
- nni_msg_free(pp->aio_send.a_msg);
- pp->aio_send.a_msg = NULL;
- nni_pipe_stop(pp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- pp->aio_send.a_msg = NULL;
- nni_msgq_aio_get(pp->sendq, &pp->aio_getq);
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static int
-nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_pub_sock *pub = arg;
- int rv = NNG_ENOTSUP;
+ pub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
}
return (rv);
}
static int
-nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_pub_sock *pub = arg;
- int rv = NNG_ENOTSUP;
+ pub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&pub->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_pub_pipe_ops = {
- .pipe_init = nni_pub_pipe_init,
- .pipe_fini = nni_pub_pipe_fini,
- .pipe_start = nni_pub_pipe_start,
- .pipe_stop = nni_pub_pipe_stop,
+static nni_proto_pipe_ops pub_pipe_ops = {
+ .pipe_init = pub_pipe_init,
+ .pipe_fini = pub_pipe_fini,
+ .pipe_start = pub_pipe_start,
+ .pipe_stop = pub_pipe_stop,
};
-nni_proto_sock_ops nni_pub_sock_ops = {
- .sock_init = nni_pub_sock_init,
- .sock_fini = nni_pub_sock_fini,
- .sock_open = nni_pub_sock_open,
- .sock_close = nni_pub_sock_close,
- .sock_setopt = nni_pub_sock_setopt,
- .sock_getopt = nni_pub_sock_getopt,
+static nni_proto_sock_ops pub_sock_ops = {
+ .sock_init = pub_sock_init,
+ .sock_fini = pub_sock_fini,
+ .sock_open = pub_sock_open,
+ .sock_close = pub_sock_close,
+ .sock_setopt = pub_sock_setopt,
+ .sock_getopt = pub_sock_getopt,
};
-nni_proto nni_pub_proto = {
+static nni_proto pub_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_PUB_V0, "pub" },
.proto_peer = { NNG_PROTO_SUB_V0, "sub" },
.proto_flags = NNI_PROTO_FLAG_SND,
- .proto_sock_ops = &nni_pub_sock_ops,
- .proto_pipe_ops = &nni_pub_pipe_ops,
+ .proto_sock_ops = &pub_sock_ops,
+ .proto_pipe_ops = &pub_pipe_ops,
};
int
nng_pub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_pub_proto));
+ return (nni_proto_open(sidp, &pub_proto));
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 0563b764..0dbad081 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -17,22 +17,22 @@
// it from publishers, and filters out those it is not interested in,
// only passing up ones that match known subscriptions.
-typedef struct nni_sub_pipe nni_sub_pipe;
-typedef struct nni_sub_sock nni_sub_sock;
-typedef struct nni_sub_topic nni_sub_topic;
+typedef struct sub_pipe sub_pipe;
+typedef struct sub_sock sub_sock;
+typedef struct sub_topic sub_topic;
-static void nni_sub_recv_cb(void *);
-static void nni_sub_putq_cb(void *);
-static void nni_sub_pipe_fini(void *);
+static void sub_recv_cb(void *);
+static void sub_putq_cb(void *);
+static void sub_pipe_fini(void *);
-struct nni_sub_topic {
+struct sub_topic {
nni_list_node node;
size_t len;
void * buf;
};
// An nni_rep_sock is our per-socket protocol private structure.
-struct nni_sub_sock {
+struct sub_sock {
nni_sock *sock;
nni_list topics;
nni_msgq *urq;
@@ -40,132 +40,136 @@ struct nni_sub_sock {
};
// An nni_rep_pipe is our per-pipe protocol private structure.
-struct nni_sub_pipe {
- nni_pipe * pipe;
- nni_sub_sock *sub;
- nni_aio aio_recv;
- nni_aio aio_putq;
+struct sub_pipe {
+ nni_pipe *pipe;
+ sub_sock *sub;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static int
-nni_sub_sock_init(void **subp, nni_sock *sock)
+sub_sock_init(void **sp, nni_sock *sock)
{
- nni_sub_sock *sub;
+ sub_sock *s;
- if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
- sub->sock = sock;
- sub->raw = 0;
+ NNI_LIST_INIT(&s->topics, sub_topic, node);
+ s->sock = sock;
+ s->raw = 0;
- sub->urq = nni_sock_recvq(sock);
+ s->urq = nni_sock_recvq(sock);
nni_sock_senderr(sock, NNG_ENOTSUP);
- *subp = sub;
+ *sp = s;
return (0);
}
static void
-nni_sub_sock_fini(void *arg)
+sub_sock_fini(void *arg)
{
- nni_sub_sock * sub = arg;
- nni_sub_topic *topic;
+ sub_sock * s = arg;
+ sub_topic *topic;
- while ((topic = nni_list_first(&sub->topics)) != NULL) {
- nni_list_remove(&sub->topics, topic);
+ while ((topic = nni_list_first(&s->topics)) != NULL) {
+ nni_list_remove(&s->topics, topic);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
}
- NNI_FREE_STRUCT(sub);
+ NNI_FREE_STRUCT(s);
}
static void
-nni_sub_sock_open(void *arg)
+sub_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_sub_sock_close(void *arg)
+sub_sock_close(void *arg)
{
NNI_ARG_UNUSED(arg);
}
+static void
+sub_pipe_fini(void *arg)
+{
+ sub_pipe *p = arg;
+
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_recv);
+ NNI_FREE_STRUCT(p);
+}
+
static int
-nni_sub_pipe_init(void **spp, nni_pipe *pipe, void *ssock)
+sub_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_sub_pipe *sp;
+ sub_pipe *p;
+ int rv;
- if ((sp = NNI_ALLOC_STRUCT(sp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_aio_init(&sp->aio_putq, nni_sub_putq_cb, sp);
- nni_aio_init(&sp->aio_recv, nni_sub_recv_cb, sp);
+ if (((rv = nni_aio_init(&p->aio_putq, sub_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, sub_recv_cb, p)) != 0)) {
+ sub_pipe_fini(p);
+ return (rv);
+ }
- sp->pipe = pipe;
- sp->sub = ssock;
- *spp = sp;
+ p->pipe = pipe;
+ p->sub = s;
+ *pp = p;
return (0);
}
-static void
-nni_sub_pipe_fini(void *arg)
-{
- nni_sub_pipe *sp = arg;
-
- nni_aio_fini(&sp->aio_putq);
- nni_aio_fini(&sp->aio_recv);
- NNI_FREE_STRUCT(sp);
-}
-
static int
-nni_sub_pipe_start(void *arg)
+sub_pipe_start(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- nni_pipe_recv(sp->pipe, &sp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_sub_pipe_stop(void *arg)
+sub_pipe_stop(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- nni_aio_stop(&sp->aio_putq);
- nni_aio_stop(&sp->aio_recv);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
}
static void
-nni_sub_recv_cb(void *arg)
+sub_recv_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
- nni_sub_sock *sub = sp->sub;
- nni_msgq * urq = sub->urq;
+ sub_pipe *p = arg;
+ sub_sock *s = p->sub;
+ nni_msgq *urq = s->urq;
- if (nni_aio_result(&sp->aio_recv) != 0) {
- nni_pipe_stop(sp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- sp->aio_putq.a_msg = sp->aio_recv.a_msg;
- sp->aio_recv.a_msg = NULL;
- nni_msgq_aio_put(sub->urq, &sp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, nni_aio_get_msg(p->aio_recv));
+ nni_aio_set_msg(p->aio_recv, NULL);
+ nni_msgq_aio_put(urq, p->aio_putq);
}
static void
-nni_sub_putq_cb(void *arg)
+sub_putq_cb(void *arg)
{
- nni_sub_pipe *sp = arg;
+ sub_pipe *p = arg;
- if (nni_aio_result(&sp->aio_putq) != 0) {
- nni_msg_free(sp->aio_putq.a_msg);
- sp->aio_putq.a_msg = NULL;
- nni_pipe_stop(sp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(sp->pipe, &sp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
// For now we maintain subscriptions on a sorted linked list. As we do not
@@ -174,12 +178,12 @@ nni_sub_putq_cb(void *arg)
// to replace this with a patricia trie, like old nanomsg had.
static int
-nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+sub_subscribe(sub_sock *s, const void *buf, size_t sz)
{
- nni_sub_topic *topic;
- nni_sub_topic *newtopic;
+ sub_topic *topic;
+ sub_topic *newtopic;
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
int rv;
if (topic->len >= sz) {
@@ -210,20 +214,20 @@ nni_sub_subscribe(nni_sub_sock *sub, const void *buf, size_t sz)
newtopic->len = sz;
memcpy(newtopic->buf, buf, sz);
if (topic != NULL) {
- nni_list_insert_before(&sub->topics, newtopic, topic);
+ nni_list_insert_before(&s->topics, newtopic, topic);
} else {
- nni_list_append(&sub->topics, newtopic);
+ nni_list_append(&s->topics, newtopic);
}
return (0);
}
static int
-nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
+sub_unsubscribe(sub_sock *s, const void *buf, size_t sz)
{
- nni_sub_topic *topic;
- int rv;
+ sub_topic *topic;
+ int rv;
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
if (topic->len >= sz) {
rv = memcmp(topic->buf, buf, sz);
} else {
@@ -231,7 +235,7 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
if (rv == 0) {
if (topic->len == sz) {
- nni_list_remove(&sub->topics, topic);
+ nni_list_remove(&s->topics, topic);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
return (0);
@@ -248,43 +252,43 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
}
static int
-nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_sub_sock *sub = arg;
- int rv = NNG_ENOTSUP;
+ sub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
} else if (opt == nng_optid_sub_subscribe) {
- rv = nni_sub_subscribe(sub, buf, sz);
+ rv = sub_subscribe(s, buf, sz);
} else if (opt == nng_optid_sub_unsubscribe) {
- rv = nni_sub_unsubscribe(sub, buf, sz);
+ rv = sub_unsubscribe(s, buf, sz);
}
return (rv);
}
static int
-nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_sub_sock *sub = arg;
- int rv = NNG_ENOTSUP;
+ sub_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&sub->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_sub_sock_rfilter(void *arg, nni_msg *msg)
+sub_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_sub_sock * sub = arg;
- nni_sub_topic *topic;
- char * body;
- size_t len;
- int match;
+ sub_sock * s = arg;
+ sub_topic *topic;
+ char * body;
+ size_t len;
+ int match;
- if (sub->raw) {
+ if (s->raw) {
return (msg);
}
@@ -293,7 +297,7 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
match = 0;
// Check to see if the message matches one of our subscriptions.
- NNI_LIST_FOREACH (&sub->topics, topic) {
+ NNI_LIST_FOREACH (&s->topics, topic) {
if (len >= topic->len) {
int rv = memcmp(topic->buf, body, topic->len);
if (rv == 0) {
@@ -319,34 +323,34 @@ nni_sub_sock_rfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_sub_pipe_ops = {
- .pipe_init = nni_sub_pipe_init,
- .pipe_fini = nni_sub_pipe_fini,
- .pipe_start = nni_sub_pipe_start,
- .pipe_stop = nni_sub_pipe_stop,
+static nni_proto_pipe_ops sub_pipe_ops = {
+ .pipe_init = sub_pipe_init,
+ .pipe_fini = sub_pipe_fini,
+ .pipe_start = sub_pipe_start,
+ .pipe_stop = sub_pipe_stop,
};
-static nni_proto_sock_ops nni_sub_sock_ops = {
- .sock_init = nni_sub_sock_init,
- .sock_fini = nni_sub_sock_fini,
- .sock_open = nni_sub_sock_open,
- .sock_close = nni_sub_sock_close,
- .sock_setopt = nni_sub_sock_setopt,
- .sock_getopt = nni_sub_sock_getopt,
- .sock_rfilter = nni_sub_sock_rfilter,
+static nni_proto_sock_ops sub_sock_ops = {
+ .sock_init = sub_sock_init,
+ .sock_fini = sub_sock_fini,
+ .sock_open = sub_sock_open,
+ .sock_close = sub_sock_close,
+ .sock_setopt = sub_sock_setopt,
+ .sock_getopt = sub_sock_getopt,
+ .sock_rfilter = sub_sock_rfilter,
};
-nni_proto nni_sub_proto = {
+static nni_proto sub_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_SUB_V0, "sub" },
.proto_peer = { NNG_PROTO_PUB_V0, "pub" },
.proto_flags = NNI_PROTO_FLAG_RCV,
- .proto_sock_ops = &nni_sub_sock_ops,
- .proto_pipe_ops = &nni_sub_pipe_ops,
+ .proto_sock_ops = &sub_sock_ops,
+ .proto_pipe_ops = &sub_pipe_ops,
};
int
nng_sub0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_sub_proto));
+ return (nni_proto_open(sidp, &sub_proto));
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index cd33d019..14a3e46b 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -17,18 +17,18 @@
// request-reply pair. This is useful for building RPC servers, for
// example.
-typedef struct nni_rep_pipe nni_rep_pipe;
-typedef struct nni_rep_sock nni_rep_sock;
-
-static void nni_rep_sock_getq_cb(void *);
-static void nni_rep_pipe_getq_cb(void *);
-static void nni_rep_pipe_putq_cb(void *);
-static void nni_rep_pipe_send_cb(void *);
-static void nni_rep_pipe_recv_cb(void *);
-static void nni_rep_pipe_fini(void *);
-
-// An nni_rep_sock is our per-socket protocol private structure.
-struct nni_rep_sock {
+typedef struct rep_pipe rep_pipe;
+typedef struct rep_sock rep_sock;
+
+static void rep_sock_getq_cb(void *);
+static void rep_pipe_getq_cb(void *);
+static void rep_pipe_putq_cb(void *);
+static void rep_pipe_send_cb(void *);
+static void rep_pipe_recv_cb(void *);
+static void rep_pipe_fini(void *);
+
+// A rep_sock is our per-socket protocol private structure.
+struct rep_sock {
nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
@@ -37,179 +37,176 @@ struct nni_rep_sock {
nni_idhash *pipes;
char * btrace;
size_t btrace_len;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
};
-// An nni_rep_pipe is our per-pipe protocol private structure.
-struct nni_rep_pipe {
- nni_pipe * pipe;
- nni_rep_sock *rep;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_putq;
+// A rep_pipe is our per-pipe protocol private structure.
+struct rep_pipe {
+ nni_pipe *pipe;
+ rep_sock *rep;
+ nni_msgq *sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
};
static void
-nni_rep_sock_fini(void *arg)
+rep_sock_fini(void *arg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
- nni_aio_stop(&rep->aio_getq);
- nni_aio_fini(&rep->aio_getq);
- nni_idhash_fini(rep->pipes);
- if (rep->btrace != NULL) {
- nni_free(rep->btrace, rep->btrace_len);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
}
- NNI_FREE_STRUCT(rep);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_rep_sock_init(void **repp, nni_sock *sock)
+rep_sock_init(void **sp, nni_sock *sock)
{
- nni_rep_sock *rep;
- int rv;
+ rep_sock *s;
+ int rv;
- if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_idhash_init(&rep->pipes)) != 0) {
- NNI_FREE_STRUCT(rep);
+ if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
+ ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
+ rep_sock_fini(s);
return (rv);
}
- rep->ttl = 8; // Per RFC
- rep->sock = sock;
- rep->raw = 0;
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ s->ttl = 8; // Per RFC
+ s->sock = sock;
+ s->raw = 0;
+ s->btrace = NULL;
+ s->btrace_len = 0;
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
- nni_aio_init(&rep->aio_getq, nni_rep_sock_getq_cb, rep);
-
- rep->uwq = nni_sock_sendq(sock);
- rep->urq = nni_sock_recvq(sock);
-
- *repp = rep;
+ *sp = s;
nni_sock_senderr(sock, NNG_ESTATE);
return (0);
}
static void
-nni_rep_sock_open(void *arg)
+rep_sock_open(void *arg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
- nni_msgq_aio_get(rep->uwq, &rep->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_rep_sock_close(void *arg)
+rep_sock_close(void *arg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
+
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
+}
- nni_aio_cancel(&rep->aio_getq, NNG_ECLOSED);
+static void
+rep_pipe_fini(void *arg)
+{
+ rep_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
+rep_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_rep_pipe *rp;
- int rv;
+ rep_pipe *p;
+ int rv;
- if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(rp);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, rep_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, rep_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, rep_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, rep_pipe_putq_cb, p)) != 0)) {
+ rep_pipe_fini(p);
return (rv);
}
- nni_aio_init(&rp->aio_getq, nni_rep_pipe_getq_cb, rp);
- nni_aio_init(&rp->aio_send, nni_rep_pipe_send_cb, rp);
- nni_aio_init(&rp->aio_recv, nni_rep_pipe_recv_cb, rp);
- nni_aio_init(&rp->aio_putq, nni_rep_pipe_putq_cb, rp);
-
- rp->pipe = pipe;
- rp->rep = rsock;
- *rpp = rp;
- return (0);
-}
-static void
-nni_rep_pipe_fini(void *arg)
-{
- nni_rep_pipe *rp = arg;
-
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_send);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_putq);
- nni_msgq_fini(rp->sendq);
- NNI_FREE_STRUCT(rp);
+ p->pipe = pipe;
+ p->rep = s;
+ *pp = p;
+ return (0);
}
static int
-nni_rep_pipe_start(void *arg)
+rep_pipe_start(void *arg)
{
- nni_rep_pipe *rp = arg;
- nni_rep_sock *rep = rp->rep;
- int rv;
+ rep_pipe *p = arg;
+ rep_sock *s = p->rep;
+ int rv;
- rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
- if (rv != 0) {
+ if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) {
return (rv);
}
- nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_rep_pipe_stop(void *arg)
+rep_pipe_stop(void *arg)
{
- nni_rep_pipe *rp = arg;
- nni_rep_sock *rep = rp->rep;
+ rep_pipe *p = arg;
+ rep_sock *s = p->rep;
- nni_msgq_close(rp->sendq);
- nni_aio_stop(&rp->aio_getq);
- nni_aio_stop(&rp->aio_send);
- nni_aio_stop(&rp->aio_recv);
- nni_aio_stop(&rp->aio_putq);
+ nni_msgq_close(p->sendq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
+ nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
}
static void
-nni_rep_sock_getq_cb(void *arg)
+rep_sock_getq_cb(void *arg)
{
- nni_rep_sock *rep = arg;
- nni_msgq * uwq = rep->uwq;
- nni_msg * msg;
- uint32_t id;
- nni_rep_pipe *rp;
- int rv;
+ rep_sock *s = arg;
+ nni_msgq *uwq = s->uwq;
+ nni_msg * msg;
+ uint32_t id;
+ rep_pipe *p;
+ int rv;
// This watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.
- if (nni_aio_result(&rep->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Closed socket?
return;
}
- msg = rep->aio_getq.a_msg;
- rep->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// We yank the outgoing pipe id from the header
if (nni_msg_header_len(msg) < 4) {
nni_msg_free(msg);
// Look for another message on the upper write queue.
- nni_msgq_aio_get(uwq, &rep->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
return;
}
@@ -218,69 +215,68 @@ nni_rep_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- rv = nni_idhash_find(rep->pipes, id, (void **) &rp);
- if (rv == 0) {
- rv = nni_msgq_tryput(rp->sendq, msg);
+ if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
+ rv = nni_msgq_tryput(p->sendq, msg);
}
if (rv != 0) {
nni_msg_free(msg);
}
// Now look for another message on the upper write queue.
- nni_msgq_aio_get(uwq, &rep->aio_getq);
+ nni_msgq_aio_get(uwq, s->aio_getq);
}
static void
-nni_rep_pipe_getq_cb(void *arg)
+rep_pipe_getq_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ rep_pipe *p = arg;
- if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- rp->aio_send.a_msg = rp->aio_getq.a_msg;
- rp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(rp->pipe, &rp->aio_send);
+ nni_pipe_send(p->pipe, p->aio_send);
}
static void
-nni_rep_pipe_send_cb(void *arg)
+rep_pipe_send_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ rep_pipe *p = arg;
- if (nni_aio_result(&rp->aio_send) != 0) {
- nni_msg_free(rp->aio_send.a_msg);
- rp->aio_send.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_rep_pipe_recv_cb(void *arg)
+rep_pipe_recv_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
- nni_rep_sock *rep = rp->rep;
- nni_msg * msg;
- int rv;
- uint8_t * body;
- int hops;
-
- if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_pipe_stop(rp->pipe);
+ rep_pipe *p = arg;
+ rep_sock *s = p->rep;
+ nni_msg * msg;
+ int rv;
+ uint8_t * body;
+ int hops;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- msg = rp->aio_recv.a_msg;
- rp->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe id in the header, first thing.
- rv = nni_msg_header_append_u32(msg, nni_pipe_id(rp->pipe));
+ rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe));
if (rv != 0) {
// Failure here causes us to drop the message.
goto drop;
@@ -290,7 +286,7 @@ nni_rep_pipe_recv_cb(void *arg)
hops = 1;
for (;;) {
int end = 0;
- if (hops >= rep->ttl) {
+ if (hops >= s->ttl) {
// This isn't malformed, but it has gone through
// too many hops. Do not disconnect, because we
// can legitimately receive messages with too many
@@ -300,7 +296,7 @@ nni_rep_pipe_recv_cb(void *arg)
if (nni_msg_len(msg) < 4) {
// Peer is speaking garbage. Kick it.
nni_msg_free(msg);
- nni_pipe_stop(rp->pipe);
+ nni_pipe_stop(p->pipe);
return;
}
body = nni_msg_body(msg);
@@ -320,74 +316,74 @@ nni_rep_pipe_recv_cb(void *arg)
}
// Go ahead and send it up.
- rp->aio_putq.a_msg = msg;
- nni_msgq_aio_put(rp->rep->urq, &rp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
return;
drop:
nni_msg_free(msg);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_rep_pipe_putq_cb(void *arg)
+rep_pipe_putq_cb(void *arg)
{
- nni_rep_pipe *rp = arg;
+ rep_pipe *p = arg;
- if (nni_aio_result(&rp->aio_putq) != 0) {
- nni_msg_free(rp->aio_putq.a_msg);
- rp->aio_putq.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static int
-nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_rep_sock *rep = arg;
- int rv = NNG_ENOTSUP;
+ rep_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_maxttl) {
- rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255);
} else if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1);
- nni_sock_senderr(rep->sock, rep->raw ? 0 : NNG_ESTATE);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ nni_sock_senderr(s->sock, s->raw ? 0 : NNG_ESTATE);
}
return (rv);
}
static int
-nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_rep_sock *rep = arg;
- int rv = NNG_ENOTSUP;
+ rep_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_maxttl) {
- rv = nni_getopt_int(&rep->ttl, buf, szp);
+ rv = nni_getopt_int(&s->ttl, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&rep->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_rep_sock_sfilter(void *arg, nni_msg *msg)
+rep_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_rep_sock *rep = arg;
+ rep_sock *s = arg;
- if (rep->raw) {
+ if (s->raw) {
return (msg);
}
// Cannot send again until a receive is done...
- nni_sock_senderr(rep->sock, NNG_ESTATE);
+ nni_sock_senderr(s->sock, NNG_ESTATE);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
- if (rep->btrace == NULL) {
+ if (s->btrace == NULL) {
nni_msg_free(msg);
return (NULL);
}
@@ -395,76 +391,76 @@ nni_rep_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_header_clear(msg);
- if (nni_msg_header_append(msg, rep->btrace, rep->btrace_len) != 0) {
- nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
- nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
return (msg);
}
static nni_msg *
-nni_rep_sock_rfilter(void *arg, nni_msg *msg)
+rep_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_rep_sock *rep = arg;
- char * header;
- size_t len;
+ rep_sock *s = arg;
+ char * header;
+ size_t len;
- if (rep->raw) {
+ if (s->raw) {
return (msg);
}
- nni_sock_senderr(rep->sock, 0);
+ nni_sock_senderr(s->sock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
- if (rep->btrace != NULL) {
- nni_free(rep->btrace, rep->btrace_len);
- rep->btrace = NULL;
- rep->btrace_len = 0;
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
}
- if ((rep->btrace = nni_alloc(len)) == NULL) {
+ if ((s->btrace = nni_alloc(len)) == NULL) {
nni_msg_free(msg);
return (NULL);
}
- rep->btrace_len = len;
- memcpy(rep->btrace, header, len);
+ s->btrace_len = len;
+ memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
return (msg);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_rep_pipe_ops = {
- .pipe_init = nni_rep_pipe_init,
- .pipe_fini = nni_rep_pipe_fini,
- .pipe_start = nni_rep_pipe_start,
- .pipe_stop = nni_rep_pipe_stop,
+static nni_proto_pipe_ops rep_pipe_ops = {
+ .pipe_init = rep_pipe_init,
+ .pipe_fini = rep_pipe_fini,
+ .pipe_start = rep_pipe_start,
+ .pipe_stop = rep_pipe_stop,
};
-static nni_proto_sock_ops nni_rep_sock_ops = {
- .sock_init = nni_rep_sock_init,
- .sock_fini = nni_rep_sock_fini,
- .sock_open = nni_rep_sock_open,
- .sock_close = nni_rep_sock_close,
- .sock_setopt = nni_rep_sock_setopt,
- .sock_getopt = nni_rep_sock_getopt,
- .sock_rfilter = nni_rep_sock_rfilter,
- .sock_sfilter = nni_rep_sock_sfilter,
+static nni_proto_sock_ops rep_sock_ops = {
+ .sock_init = rep_sock_init,
+ .sock_fini = rep_sock_fini,
+ .sock_open = rep_sock_open,
+ .sock_close = rep_sock_close,
+ .sock_setopt = rep_sock_setopt,
+ .sock_getopt = rep_sock_getopt,
+ .sock_rfilter = rep_sock_rfilter,
+ .sock_sfilter = rep_sock_sfilter,
};
-nni_proto nni_rep_proto = {
+static nni_proto nni_rep_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_REP_V0, "rep" },
.proto_peer = { NNG_PROTO_REQ_V0, "req" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_rep_sock_ops,
- .proto_pipe_ops = &nni_rep_pipe_ops,
+ .proto_sock_ops = &rep_sock_ops,
+ .proto_pipe_ops = &rep_pipe_ops,
};
int
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 2579417a..5da5003f 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -18,15 +18,15 @@
// request-reply pair. This is useful for building RPC clients, for
// example.
-typedef struct nni_req_pipe nni_req_pipe;
-typedef struct nni_req_sock nni_req_sock;
+typedef struct req_pipe req_pipe;
+typedef struct req_sock req_sock;
-static void nni_req_resend(nni_req_sock *);
-static void nni_req_timeout(void *);
-static void nni_req_pipe_fini(void *);
+static void req_resend(req_sock *);
+static void req_timeout(void *);
+static void req_pipe_fini(void *);
-// An nni_req_sock is our per-socket protocol private structure.
-struct nni_req_sock {
+// A req_sock is our per-socket protocol private structure.
+struct req_sock {
nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
@@ -38,7 +38,7 @@ struct nni_req_sock {
int ttl;
nni_msg * reqmsg;
- nni_req_pipe *pendpipe;
+ req_pipe *pendpipe;
nni_list readypipes;
nni_list busypipes;
@@ -51,230 +51,235 @@ struct nni_req_sock {
nni_cv cv;
};
-// An nni_req_pipe is our per-pipe protocol private structure.
-struct nni_req_pipe {
+// A req_pipe is our per-pipe protocol private structure.
+struct req_pipe {
nni_pipe * pipe;
- nni_req_sock *req;
+ req_sock * req;
nni_list_node node;
- nni_aio aio_getq; // raw mode only
- nni_aio aio_sendraw; // raw mode only
- nni_aio aio_sendcooked; // cooked mode only
- nni_aio aio_recv;
- nni_aio aio_putq;
+ nni_aio * aio_getq; // raw mode only
+ nni_aio * aio_sendraw; // raw mode only
+ nni_aio * aio_sendcooked; // cooked mode only
+ nni_aio * aio_recv;
+ nni_aio * aio_putq;
nni_mtx mtx;
};
-static void nni_req_resender(void *);
-static void nni_req_getq_cb(void *);
-static void nni_req_sendraw_cb(void *);
-static void nni_req_sendcooked_cb(void *);
-static void nni_req_recv_cb(void *);
-static void nni_req_putq_cb(void *);
+static void req_resender(void *);
+static void req_getq_cb(void *);
+static void req_sendraw_cb(void *);
+static void req_sendcooked_cb(void *);
+static void req_recv_cb(void *);
+static void req_putq_cb(void *);
static int
-nni_req_sock_init(void **reqp, nni_sock *sock)
+req_sock_init(void **sp, nni_sock *sock)
{
- nni_req_sock *req;
+ req_sock *s;
- if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&req->mtx);
- nni_cv_init(&req->cv, &req->mtx);
+ nni_mtx_init(&s->mtx);
+ nni_cv_init(&s->cv, &s->mtx);
- NNI_LIST_INIT(&req->readypipes, nni_req_pipe, node);
- NNI_LIST_INIT(&req->busypipes, nni_req_pipe, node);
- nni_timer_init(&req->timer, nni_req_timeout, req);
+ NNI_LIST_INIT(&s->readypipes, req_pipe, node);
+ NNI_LIST_INIT(&s->busypipes, req_pipe, node);
+ nni_timer_init(&s->timer, req_timeout, s);
// this is "semi random" start for request IDs.
- req->nextid = nni_random();
-
- req->retry = NNI_SECOND * 60;
- req->sock = sock;
- req->reqmsg = NULL;
- req->raw = 0;
- req->wantw = 0;
- req->resend = NNI_TIME_ZERO;
- req->ttl = 8;
-
- req->uwq = nni_sock_sendq(sock);
- req->urq = nni_sock_recvq(sock);
- *reqp = req;
+ s->nextid = nni_random();
+ s->retry = NNI_SECOND * 60;
+ s->sock = sock;
+ s->reqmsg = NULL;
+ s->raw = 0;
+ s->wantw = 0;
+ s->resend = NNI_TIME_ZERO;
+ s->ttl = 8;
+ s->uwq = nni_sock_sendq(sock);
+ s->urq = nni_sock_recvq(sock);
+ *sp = s;
nni_sock_recverr(sock, NNG_ESTATE);
return (0);
}
static void
-nni_req_sock_open(void *arg)
+req_sock_open(void *arg)
{
NNI_ARG_UNUSED(arg);
}
static void
-nni_req_sock_close(void *arg)
+req_sock_close(void *arg)
{
- nni_req_sock *req = arg;
+ req_sock *s = arg;
- nni_mtx_lock(&req->mtx);
- req->closed = 1;
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
+ s->closed = 1;
+ nni_mtx_unlock(&s->mtx);
- nni_timer_cancel(&req->timer);
+ nni_timer_cancel(&s->timer);
}
static void
-nni_req_sock_fini(void *arg)
+req_sock_fini(void *arg)
{
- nni_req_sock *req = arg;
+ req_sock *s = arg;
- nni_mtx_lock(&req->mtx);
- while ((!nni_list_empty(&req->readypipes)) ||
- (!nni_list_empty(&req->busypipes))) {
- nni_cv_wait(&req->cv);
+ nni_mtx_lock(&s->mtx);
+ while ((!nni_list_empty(&s->readypipes)) ||
+ (!nni_list_empty(&s->busypipes))) {
+ nni_cv_wait(&s->cv);
}
- if (req->reqmsg != NULL) {
- nni_msg_free(req->reqmsg);
+ if (s->reqmsg != NULL) {
+ nni_msg_free(s->reqmsg);
}
- nni_mtx_unlock(&req->mtx);
- nni_cv_fini(&req->cv);
- nni_mtx_fini(&req->mtx);
- NNI_FREE_STRUCT(req);
+ nni_mtx_unlock(&s->mtx);
+ nni_cv_fini(&s->cv);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
+}
+
+static void
+req_pipe_fini(void *arg)
+{
+ req_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_sendcooked);
+ nni_aio_fini(p->aio_sendraw);
+ nni_mtx_fini(&p->mtx);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
+req_pipe_init(void **pp, nni_pipe *pipe, void *s)
{
- nni_req_pipe *rp;
+ req_pipe *p;
+ int rv;
- if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- nni_mtx_init(&rp->mtx);
- nni_aio_init(&rp->aio_getq, nni_req_getq_cb, rp);
- nni_aio_init(&rp->aio_putq, nni_req_putq_cb, rp);
- nni_aio_init(&rp->aio_recv, nni_req_recv_cb, rp);
- nni_aio_init(&rp->aio_sendraw, nni_req_sendraw_cb, rp);
- nni_aio_init(&rp->aio_sendcooked, nni_req_sendcooked_cb, rp);
-
- NNI_LIST_NODE_INIT(&rp->node);
- rp->pipe = pipe;
- rp->req = rsock;
- *rpp = rp;
- return (0);
-}
+ nni_mtx_init(&p->mtx);
+ if (((rv = nni_aio_init(&p->aio_getq, req_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, req_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, req_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendraw, req_sendraw_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_sendcooked, req_sendcooked_cb, p)) !=
+ 0)) {
+ req_pipe_fini(p);
+ return (rv);
+ }
-static void
-nni_req_pipe_fini(void *arg)
-{
- nni_req_pipe *rp = arg;
-
- nni_aio_fini(&rp->aio_getq);
- nni_aio_fini(&rp->aio_putq);
- nni_aio_fini(&rp->aio_recv);
- nni_aio_fini(&rp->aio_sendcooked);
- nni_aio_fini(&rp->aio_sendraw);
- nni_mtx_fini(&rp->mtx);
- NNI_FREE_STRUCT(rp);
+ NNI_LIST_NODE_INIT(&p->node);
+ p->pipe = pipe;
+ p->req = s;
+ *pp = p;
+ return (0);
}
static int
-nni_req_pipe_start(void *arg)
+req_pipe_start(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
- if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REP) {
+ if (nni_pipe_peer(p->pipe) != NNG_PROTO_REP) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&req->mtx);
- if (req->closed) {
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
+ if (s->closed) {
+ nni_mtx_unlock(&s->mtx);
return (NNG_ECLOSED);
}
- nni_list_append(&req->readypipes, rp);
- if (req->wantw) {
- nni_req_resend(req);
+ nni_list_append(&s->readypipes, p);
+ // If sock was waiting for somewhere to send data, go ahead and
+ // send it to this pipe.
+ if (s->wantw) {
+ req_resend(s);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(req->uwq, &rp->aio_getq);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_msgq_aio_get(s->uwq, p->aio_getq);
+ nni_pipe_recv(p->pipe, p->aio_recv);
return (0);
}
static void
-nni_req_pipe_stop(void *arg)
+req_pipe_stop(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
- nni_aio_stop(&rp->aio_getq);
- nni_aio_stop(&rp->aio_putq);
- nni_aio_stop(&rp->aio_recv);
- nni_aio_stop(&rp->aio_sendcooked);
- nni_aio_stop(&rp->aio_sendraw);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_sendcooked);
+ nni_aio_stop(p->aio_sendraw);
// At this point there should not be any further AIOs running.
// Further, any completion tasks have completed.
- nni_mtx_lock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
// It doesn't much matter which.
- if (nni_list_node_active(&rp->node)) {
- nni_list_node_remove(&rp->node);
- if (req->closed) {
- nni_cv_wake(&req->cv);
+ if (nni_list_node_active(&p->node)) {
+ nni_list_node_remove(&p->node);
+ if (s->closed) {
+ nni_cv_wake(&s->cv);
}
}
- if ((rp == req->pendpipe) && (req->reqmsg != NULL)) {
+ if ((p == s->pendpipe) && (s->reqmsg != NULL)) {
// removing the pipe we sent the last request on...
// schedule immediate resend.
- req->pendpipe = NULL;
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
- nni_req_resend(req);
+ s->pendpipe = NULL;
+ s->resend = NNI_TIME_ZERO;
+ s->wantw = 1;
+ req_resend(s);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static int
-nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_req_sock *req = arg;
- int rv = NNG_ENOTSUP;
+ req_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_req_resendtime) {
- rv = nni_setopt_usec(&req->retry, buf, sz);
+ rv = nni_setopt_usec(&s->retry, buf, sz);
} else if (opt == nng_optid_raw) {
- rv = nni_setopt_int(&req->raw, buf, sz, 0, 1);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
if (rv == 0) {
- nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE);
+ nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE);
}
} else if (opt == nng_optid_maxttl) {
- rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255);
}
return (rv);
}
static int
-nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_req_sock *req = arg;
- int rv = NNG_ENOTSUP;
+ req_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_req_resendtime) {
- rv = nni_getopt_usec(&req->retry, buf, szp);
+ rv = nni_getopt_usec(&s->retry, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&req->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
} else if (opt == nng_optid_maxttl) {
- rv = nni_getopt_int(&req->ttl, buf, szp);
+ rv = nni_getopt_int(&s->ttl, buf, szp);
}
return (rv);
@@ -297,10 +302,10 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
// kind of priority.)
static void
-nni_req_getq_cb(void *arg)
+req_getq_cb(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
// We should be in RAW mode. Cooked mode traffic bypasses
// the upper write queue entirely, and should never end up here.
@@ -308,47 +313,47 @@ nni_req_getq_cb(void *arg)
// that's ok (there's an inherent race anyway). (One minor
// exception: we wind up here in error state when the uwq is closed.)
- if (nni_aio_result(&rp->aio_getq) != 0) {
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- rp->aio_sendraw.a_msg = rp->aio_getq.a_msg;
- rp->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
// Send the message, but use the raw mode aio.
- nni_pipe_send(rp->pipe, &rp->aio_sendraw);
+ nni_pipe_send(p->pipe, p->aio_sendraw);
}
static void
-nni_req_sendraw_cb(void *arg)
+req_sendraw_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ req_pipe *p = arg;
- if (nni_aio_result(&rp->aio_sendraw) != 0) {
- nni_msg_free(rp->aio_sendraw.a_msg);
- rp->aio_sendraw.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_sendraw) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_sendraw));
+ nni_aio_set_msg(p->aio_sendraw, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
// Sent a message so we just need to look for another one.
- nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq);
+ nni_msgq_aio_get(p->req->uwq, p->aio_getq);
}
static void
-nni_req_sendcooked_cb(void *arg)
+req_sendcooked_cb(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_req_sock *req = rp->req;
+ req_pipe *p = arg;
+ req_sock *s = p->req;
- if (nni_aio_result(&rp->aio_sendcooked) != 0) {
+ if (nni_aio_result(p->aio_sendcooked) != 0) {
// We failed to send... clean up and deal with it.
// We leave ourselves on the busy list for now, which
// means no new asynchronous traffic can occur here.
- nni_msg_free(rp->aio_sendcooked.a_msg);
- rp->aio_sendcooked.a_msg = NULL;
- nni_pipe_stop(rp->pipe);
+ nni_msg_free(nni_aio_get_msg(p->aio_sendcooked));
+ nni_aio_set_msg(p->aio_sendcooked, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
@@ -356,49 +361,50 @@ nni_req_sendcooked_cb(void *arg)
// reinsert ourselves in the ready list, and possibly schedule
// a resend.
- nni_mtx_lock(&req->mtx);
- if (nni_list_active(&req->busypipes, rp)) {
- nni_list_remove(&req->busypipes, rp);
- nni_list_append(&req->readypipes, rp);
- nni_req_resend(req);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->busypipes, p)) {
+ nni_list_remove(&s->busypipes, p);
+ nni_list_append(&s->readypipes, p);
+ req_resend(s);
} else {
// We wind up here if stop was called from the reader
// side while we were waiting to be scheduled to run for the
// writer side. In this case we can't complete the operation,
// and we have to abort.
- nni_pipe_stop(rp->pipe);
+ nni_pipe_stop(p->pipe);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_req_putq_cb(void *arg)
+req_putq_cb(void *arg)
{
- nni_req_pipe *rp = arg;
+ req_pipe *p = arg;
- if (nni_aio_result(&rp->aio_putq) != 0) {
- nni_msg_free(rp->aio_putq.a_msg);
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->pipe);
return;
}
- rp->aio_putq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_recv(rp->pipe, &rp->aio_recv);
+ nni_pipe_recv(p->pipe, p->aio_recv);
}
static void
-nni_req_recv_cb(void *arg)
+req_recv_cb(void *arg)
{
- nni_req_pipe *rp = arg;
- nni_msg * msg;
+ req_pipe *p = arg;
+ nni_msg * msg;
- if (nni_aio_result(&rp->aio_recv) != 0) {
- nni_pipe_stop(rp->pipe);
+ if (nni_aio_result(p->aio_recv) != 0) {
+ nni_pipe_stop(p->pipe);
return;
}
- msg = rp->aio_recv.a_msg;
- rp->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
@@ -414,92 +420,90 @@ nni_req_recv_cb(void *arg)
}
(void) nni_msg_trim(msg, 4); // Cannot fail
- rp->aio_putq.a_msg = msg;
- nni_msgq_aio_put(rp->req->urq, &rp->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(p->req->urq, p->aio_putq);
return;
malformed:
nni_msg_free(msg);
- nni_pipe_stop(rp->pipe);
+ nni_pipe_stop(p->pipe);
}
static void
-nni_req_timeout(void *arg)
+req_timeout(void *arg)
{
- nni_req_sock *req = arg;
+ req_sock *s = arg;
- nni_mtx_lock(&req->mtx);
- if (req->reqmsg != NULL) {
- req->wantw = 1;
- nni_req_resend(req);
+ nni_mtx_lock(&s->mtx);
+ if (s->reqmsg != NULL) {
+ s->wantw = 1;
+ req_resend(s);
}
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_req_resend(nni_req_sock *req)
+req_resend(req_sock *s)
{
- nni_req_pipe *rp;
- nni_msg * msg;
+ req_pipe *p;
+ nni_msg * msg;
// Note: This routine should be called with the socket lock held.
// Also, this should only be called while handling cooked mode
// requests.
- if ((msg = req->reqmsg) == NULL) {
+ if ((msg = s->reqmsg) == NULL) {
return;
}
- if (req->closed) {
- req->reqmsg = NULL;
+ if (s->closed) {
+ s->reqmsg = NULL;
nni_msg_free(msg);
}
- if (req->wantw) {
- req->wantw = 0;
+ if (s->wantw) {
+ s->wantw = 0;
- if (nni_msg_dup(&msg, req->reqmsg) != 0) {
+ if (nni_msg_dup(&msg, s->reqmsg) != 0) {
// Failed to alloc message, reschedule it. Also,
// mark that we have a message we want to resend,
// in case something comes available.
- req->wantw = 1;
- nni_timer_schedule(
- &req->timer, nni_clock() + req->retry);
+ s->wantw = 1;
+ nni_timer_schedule(&s->timer, nni_clock() + s->retry);
return;
}
// Now we iterate across all possible outpipes, until
// one accepts it.
- rp = nni_list_first(&req->readypipes);
- if (rp == NULL) {
+ if ((p = nni_list_first(&s->readypipes)) == NULL) {
// No pipes ready to process us. Note that we have
// something to send, and schedule it.
nni_msg_free(msg);
- req->wantw = 1;
+ s->wantw = 1;
return;
}
- nni_list_remove(&req->readypipes, rp);
- nni_list_append(&req->busypipes, rp);
+ nni_list_remove(&s->readypipes, p);
+ nni_list_append(&s->busypipes, p);
- req->pendpipe = rp;
- req->resend = nni_clock() + req->retry;
- rp->aio_sendcooked.a_msg = msg;
+ s->pendpipe = p;
+ s->resend = nni_clock() + s->retry;
+ nni_aio_set_msg(p->aio_sendcooked, msg);
// Note that because we were ready rather than busy, we
// should not have any I/O oustanding and hence the aio
// object will be available for our use.
- nni_pipe_send(rp->pipe, &rp->aio_sendcooked);
- nni_timer_schedule(&req->timer, req->resend);
+ nni_pipe_send(p->pipe, p->aio_sendcooked);
+ nni_timer_schedule(&s->timer, s->resend);
}
}
static nni_msg *
-nni_req_sock_sfilter(void *arg, nni_msg *msg)
+req_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_req_sock *req = arg;
- uint32_t id;
+ req_sock *s = arg;
+ uint32_t id;
- if (req->raw) {
+ if (s->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
return (msg);
@@ -508,12 +512,12 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
- id = (req->nextid++) | 0x80000000u;
+ id = (s->nextid++) | 0x80000000u;
// Request ID is in big endian format.
- NNI_PUT32(req->reqid, id);
+ NNI_PUT32(s->reqid, id);
- if (nni_msg_header_append(msg, req->reqid, 4) != 0) {
+ if (nni_msg_header_append(msg, s->reqid, 4) != 0) {
// Should be ENOMEM.
nni_msg_free(msg);
return (NULL);
@@ -521,36 +525,36 @@ nni_req_sock_sfilter(void *arg, nni_msg *msg)
// NB: The socket lock is also held, so this is always self-serialized.
// But we have to serialize against other async callbacks.
- nni_mtx_lock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
// If another message is there, this cancels it.
- if (req->reqmsg != NULL) {
- nni_msg_free(req->reqmsg);
- req->reqmsg = NULL;
+ if (s->reqmsg != NULL) {
+ nni_msg_free(s->reqmsg);
+ s->reqmsg = NULL;
}
// Make a duplicate message... for retries.
- req->reqmsg = msg;
+ s->reqmsg = msg;
// Schedule for immediate send
- req->resend = NNI_TIME_ZERO;
- req->wantw = 1;
+ s->resend = NNI_TIME_ZERO;
+ s->wantw = 1;
- nni_req_resend(req);
- nni_mtx_unlock(&req->mtx);
+ req_resend(s);
+ nni_mtx_unlock(&s->mtx);
// Clear the error condition.
- nni_sock_recverr(req->sock, 0);
+ nni_sock_recverr(s->sock, 0);
return (NULL);
}
static nni_msg *
-nni_req_sock_rfilter(void *arg, nni_msg *msg)
+req_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_req_sock *req = arg;
- nni_msg * rmsg;
+ req_sock *s = arg;
+ nni_msg * rmsg;
- if (req->raw) {
+ if (s->raw) {
// Pass it unmolested
return (msg);
}
@@ -560,62 +564,60 @@ nni_req_sock_rfilter(void *arg, nni_msg *msg)
return (NULL);
}
- nni_mtx_lock(&req->mtx);
+ nni_mtx_lock(&s->mtx);
- if ((rmsg = req->reqmsg) == NULL) {
+ if ((rmsg = s->reqmsg) == NULL) {
// We had no outstanding request.
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
- if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) {
+ if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) {
// Wrong request id
- nni_mtx_unlock(&req->mtx);
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
- req->reqmsg = NULL;
- req->pendpipe = NULL;
- nni_mtx_unlock(&req->mtx);
+ s->reqmsg = NULL;
+ s->pendpipe = NULL;
+ nni_mtx_unlock(&s->mtx);
- nni_sock_recverr(req->sock, NNG_ESTATE);
+ nni_sock_recverr(s->sock, NNG_ESTATE);
nni_msg_free(rmsg);
return (msg);
}
-// This is the global protocol structure -- our linkage to the core.
-// This should be the only global non-static symbol in this file.
-static nni_proto_pipe_ops nni_req_pipe_ops = {
- .pipe_init = nni_req_pipe_init,
- .pipe_fini = nni_req_pipe_fini,
- .pipe_start = nni_req_pipe_start,
- .pipe_stop = nni_req_pipe_stop,
+static nni_proto_pipe_ops req_pipe_ops = {
+ .pipe_init = req_pipe_init,
+ .pipe_fini = req_pipe_fini,
+ .pipe_start = req_pipe_start,
+ .pipe_stop = req_pipe_stop,
};
-static nni_proto_sock_ops nni_req_sock_ops = {
- .sock_init = nni_req_sock_init,
- .sock_fini = nni_req_sock_fini,
- .sock_open = nni_req_sock_open,
- .sock_close = nni_req_sock_close,
- .sock_setopt = nni_req_sock_setopt,
- .sock_getopt = nni_req_sock_getopt,
- .sock_rfilter = nni_req_sock_rfilter,
- .sock_sfilter = nni_req_sock_sfilter,
+static nni_proto_sock_ops req_sock_ops = {
+ .sock_init = req_sock_init,
+ .sock_fini = req_sock_fini,
+ .sock_open = req_sock_open,
+ .sock_close = req_sock_close,
+ .sock_setopt = req_sock_setopt,
+ .sock_getopt = req_sock_getopt,
+ .sock_rfilter = req_sock_rfilter,
+ .sock_sfilter = req_sock_sfilter,
};
-nni_proto nni_req_proto = {
+static nni_proto req_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_REQ_V0, "req" },
.proto_peer = { NNG_PROTO_REP_V0, "rep" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_req_sock_ops,
- .proto_pipe_ops = &nni_req_pipe_ops,
+ .proto_sock_ops = &req_sock_ops,
+ .proto_pipe_ops = &req_pipe_ops,
};
int
nng_req0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_req_proto));
+ return (nni_proto_open(sidp, &req_proto));
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index e695a987..c3a9dd81 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -17,18 +17,18 @@
// the surveyor pattern. This is useful for building service discovery, or
// voting algorithsm, for example.
-typedef struct nni_resp_pipe nni_resp_pipe;
-typedef struct nni_resp_sock nni_resp_sock;
-
-static void nni_resp_recv_cb(void *);
-static void nni_resp_putq_cb(void *);
-static void nni_resp_getq_cb(void *);
-static void nni_resp_send_cb(void *);
-static void nni_resp_sock_getq_cb(void *);
-static void nni_resp_pipe_fini(void *);
-
-// An nni_resp_sock is our per-socket protocol private structure.
-struct nni_resp_sock {
+typedef struct resp_pipe resp_pipe;
+typedef struct resp_sock resp_sock;
+
+static void resp_recv_cb(void *);
+static void resp_putq_cb(void *);
+static void resp_getq_cb(void *);
+static void resp_send_cb(void *);
+static void resp_sock_getq_cb(void *);
+static void resp_pipe_fini(void *);
+
+// A resp_sock is our per-socket protocol private structure.
+struct resp_sock {
nni_sock * nsock;
nni_msgq * urq;
nni_msgq * uwq;
@@ -37,259 +37,257 @@ struct nni_resp_sock {
nni_idhash *pipes;
char * btrace;
size_t btrace_len;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_mtx mtx;
};
-// An nni_resp_pipe is our per-pipe protocol private structure.
-struct nni_resp_pipe {
- nni_pipe * npipe;
- nni_resp_sock *psock;
- uint32_t id;
- nni_msgq * sendq;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+// A resp_pipe is our per-pipe protocol private structure.
+struct resp_pipe {
+ nni_pipe * npipe;
+ resp_sock *psock;
+ uint32_t id;
+ nni_msgq * sendq;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-nni_resp_sock_fini(void *arg)
+resp_sock_fini(void *arg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_idhash_fini(psock->pipes);
- if (psock->btrace != NULL) {
- nni_free(psock->btrace, psock->btrace_len);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_idhash_fini(s->pipes);
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
}
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_resp_sock_init(void **pp, nni_sock *nsock)
+resp_sock_init(void **sp, nni_sock *nsock)
{
- nni_resp_sock *psock;
- int rv;
+ resp_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_idhash_init(&psock->pipes)) != 0) {
- NNI_FREE_STRUCT(psock);
+ if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
+ ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
+ resp_sock_fini(s);
return (rv);
}
- psock->ttl = 8; // Per RFC
- psock->nsock = nsock;
- psock->raw = 0;
- psock->btrace = NULL;
- psock->btrace_len = 0;
- psock->urq = nni_sock_recvq(nsock);
- psock->uwq = nni_sock_sendq(nsock);
+ s->ttl = 8; // Per RFC
+ s->nsock = nsock;
+ s->raw = 0;
+ s->btrace = NULL;
+ s->btrace_len = 0;
+ s->urq = nni_sock_recvq(nsock);
+ s->uwq = nni_sock_sendq(nsock);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_resp_sock_getq_cb, psock);
+ nni_mtx_init(&s->mtx);
- *pp = psock;
+ *sp = s;
nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
}
static void
-nni_resp_sock_open(void *arg)
+resp_sock_open(void *arg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_resp_sock_close(void *arg)
+resp_sock_close(void *arg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
+}
+
+static void
+resp_pipe_fini(void *arg)
+{
+ resp_pipe *p = arg;
+
+ nni_aio_fini(p->aio_putq);
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_resp_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+resp_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_resp_pipe *ppipe;
- int rv;
+ resp_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_msgq_init(&ppipe->sendq, 2)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, resp_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, resp_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, resp_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, resp_send_cb, p)) != 0)) {
+ resp_pipe_fini(p);
return (rv);
}
- nni_aio_init(&ppipe->aio_putq, nni_resp_putq_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_resp_recv_cb, ppipe);
- nni_aio_init(&ppipe->aio_getq, nni_resp_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_resp_send_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
- return (0);
-}
-static void
-nni_resp_pipe_fini(void *arg)
-{
- nni_resp_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_putq);
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_msgq_fini(ppipe->sendq);
- NNI_FREE_STRUCT(ppipe);
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
+ return (0);
}
static int
-nni_resp_pipe_start(void *arg)
+resp_pipe_start(void *arg)
{
- nni_resp_pipe *ppipe = arg;
- nni_resp_sock *psock = ppipe->psock;
- int rv;
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+ int rv;
- ppipe->id = nni_pipe_id(ppipe->npipe);
+ p->id = nni_pipe_id(p->npipe);
- nni_mtx_lock(&psock->mtx);
- rv = nni_idhash_insert(psock->pipes, ppipe->id, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ rv = nni_idhash_insert(s->pipes, p->id, p);
+ nni_mtx_unlock(&s->mtx);
if (rv != 0) {
return (rv);
}
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
return (rv);
}
static void
-nni_resp_pipe_stop(void *arg)
+resp_pipe_stop(void *arg)
{
- nni_resp_pipe *ppipe = arg;
- nni_resp_sock *psock = ppipe->psock;
-
- nni_msgq_close(ppipe->sendq);
- nni_aio_stop(&ppipe->aio_putq);
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
-
- if (ppipe->id != 0) {
- nni_mtx_lock(&psock->mtx);
- nni_idhash_remove(psock->pipes, ppipe->id);
- nni_mtx_unlock(&psock->mtx);
- ppipe->id = 0;
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+
+ nni_msgq_close(p->sendq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+
+ if (p->id != 0) {
+ nni_mtx_lock(&s->mtx);
+ nni_idhash_remove(s->pipes, p->id);
+ nni_mtx_unlock(&s->mtx);
+ p->id = 0;
}
}
-// nni_resp_sock_send watches for messages from the upper write queue,
+// resp_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.s
void
-nni_resp_sock_getq_cb(void *arg)
+resp_sock_getq_cb(void *arg)
{
- nni_resp_sock *psock = arg;
- nni_msg * msg;
- uint32_t id;
- nni_resp_pipe *ppipe;
- int rv;
+ resp_sock *s = arg;
+ nni_msg * msg;
+ uint32_t id;
+ resp_pipe *p;
+ int rv;
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
return;
}
- msg = psock->aio_getq.a_msg;
- psock->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
// We yank the outgoing pipe id from the header
if (nni_msg_header_len(msg) < 4) {
nni_msg_free(msg);
- // We can't really close down the socket, so just
- // keep going.
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
+ // We can't really close down the socket, so just keep going.
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
return;
}
id = nni_msg_header_trim_u32(msg);
- nni_mtx_lock(&psock->mtx);
- rv = nni_idhash_find(psock->pipes, id, (void **) &ppipe);
-
- if (rv != 0) {
+ nni_mtx_lock(&s->mtx);
+ if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) {
+ // Destination pipe not present.
nni_msg_free(msg);
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
} else {
// Non-blocking put.
- if (nni_msgq_tryput(ppipe->sendq, msg) != 0) {
+ if (nni_msgq_tryput(p->sendq, msg) != 0) {
nni_msg_free(msg);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
+ nni_mtx_unlock(&s->mtx);
}
void
-nni_resp_getq_cb(void *arg)
+resp_getq_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
+ resp_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
void
-nni_resp_send_cb(void *arg)
+resp_send_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
+ resp_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
}
static void
-nni_resp_recv_cb(void *arg)
+resp_recv_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
- nni_resp_sock *psock = ppipe->psock;
- nni_msgq * urq;
- nni_msg * msg;
- int hops;
- int rv;
-
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+ nni_msgq * urq;
+ nni_msg * msg;
+ int hops;
+ int rv;
+
+ if (nni_aio_result(p->aio_recv) != 0) {
goto error;
}
- urq = nni_sock_recvq(psock->nsock);
+ urq = nni_sock_recvq(s->nsock);
- msg = ppipe->aio_recv.a_msg;
- ppipe->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// Store the pipe id in the header, first thing.
- if (nni_msg_header_append_u32(msg, ppipe->id) != 0) {
+ if (nni_msg_header_append_u32(msg, p->id) != 0) {
nni_msg_free(msg);
goto error;
}
@@ -300,7 +298,7 @@ nni_resp_recv_cb(void *arg)
int end = 0;
uint8_t *body;
- if (hops >= psock->ttl) {
+ if (hops >= s->ttl) {
nni_msg_free(msg);
goto error;
}
@@ -322,46 +320,46 @@ nni_resp_recv_cb(void *arg)
}
// Now send it up.
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(urq, &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(urq, p->aio_putq);
return;
error:
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
}
static void
-nni_resp_putq_cb(void *arg)
+resp_putq_cb(void *arg)
{
- nni_resp_pipe *ppipe = arg;
+ resp_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
}
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static int
-nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_resp_sock *psock = arg;
- int rv = NNG_ENOTSUP;
- int oldraw;
+ resp_sock *s = arg;
+ int rv = NNG_ENOTSUP;
+ int oldraw;
if (opt == nng_optid_maxttl) {
- rv = nni_setopt_int(&psock->ttl, buf, sz, 1, 255);
+ rv = nni_setopt_int(&s->ttl, buf, sz, 1, 255);
} else if (opt == nng_optid_raw) {
- oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
- if (oldraw != psock->raw) {
- if (!psock->raw) {
- nni_sock_senderr(psock->nsock, 0);
+ oldraw = s->raw;
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ if (oldraw != s->raw) {
+ if (!s->raw) {
+ nni_sock_senderr(s->nsock, 0);
} else {
- nni_sock_senderr(psock->nsock, NNG_ESTATE);
+ nni_sock_senderr(s->nsock, NNG_ESTATE);
}
}
}
@@ -370,34 +368,34 @@ nni_resp_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
}
static int
-nni_resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+resp_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_resp_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ resp_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_maxttl) {
- rv = nni_getopt_int(&psock->ttl, buf, szp);
+ rv = nni_getopt_int(&s->ttl, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static nni_msg *
-nni_resp_sock_sfilter(void *arg, nni_msg *msg)
+resp_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *psock = arg;
+ resp_sock *s = arg;
- if (psock->raw) {
+ if (s->raw) {
return (msg);
}
// Cannot send again until a receive is done...
- nni_sock_senderr(psock->nsock, NNG_ESTATE);
+ nni_sock_senderr(s->nsock, NNG_ESTATE);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
- if (psock->btrace == NULL) {
+ if (s->btrace == NULL) {
nni_msg_free(msg);
return (NULL);
}
@@ -405,79 +403,78 @@ nni_resp_sock_sfilter(void *arg, nni_msg *msg)
// drop anything else in the header...
nni_msg_header_clear(msg);
- if (nni_msg_header_append(msg, psock->btrace, psock->btrace_len) !=
- 0) {
- nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
- psock->btrace_len = 0;
+ if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
nni_msg_free(msg);
return (NULL);
}
- nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
- psock->btrace_len = 0;
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
return (msg);
}
static nni_msg *
-nni_resp_sock_rfilter(void *arg, nni_msg *msg)
+resp_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_resp_sock *psock = arg;
- char * header;
- size_t len;
+ resp_sock *s = arg;
+ char * header;
+ size_t len;
- if (psock->raw) {
+ if (s->raw) {
return (msg);
}
- nni_sock_senderr(psock->nsock, 0);
+ nni_sock_senderr(s->nsock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
- if (psock->btrace != NULL) {
- nni_free(psock->btrace, psock->btrace_len);
- psock->btrace = NULL;
- psock->btrace_len = 0;
+ if (s->btrace != NULL) {
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
}
- if ((psock->btrace = nni_alloc(len)) == NULL) {
+ if ((s->btrace = nni_alloc(len)) == NULL) {
nni_msg_free(msg);
return (NULL);
}
- psock->btrace_len = len;
- memcpy(psock->btrace, header, len);
+ s->btrace_len = len;
+ memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
return (msg);
}
-static nni_proto_pipe_ops nni_resp_pipe_ops = {
- .pipe_init = nni_resp_pipe_init,
- .pipe_fini = nni_resp_pipe_fini,
- .pipe_start = nni_resp_pipe_start,
- .pipe_stop = nni_resp_pipe_stop,
+static nni_proto_pipe_ops resp_pipe_ops = {
+ .pipe_init = resp_pipe_init,
+ .pipe_fini = resp_pipe_fini,
+ .pipe_start = resp_pipe_start,
+ .pipe_stop = resp_pipe_stop,
};
-static nni_proto_sock_ops nni_resp_sock_ops = {
- .sock_init = nni_resp_sock_init,
- .sock_fini = nni_resp_sock_fini,
- .sock_open = nni_resp_sock_open,
- .sock_close = nni_resp_sock_close,
- .sock_setopt = nni_resp_sock_setopt,
- .sock_getopt = nni_resp_sock_getopt,
- .sock_rfilter = nni_resp_sock_rfilter,
- .sock_sfilter = nni_resp_sock_sfilter,
+static nni_proto_sock_ops resp_sock_ops = {
+ .sock_init = resp_sock_init,
+ .sock_fini = resp_sock_fini,
+ .sock_open = resp_sock_open,
+ .sock_close = resp_sock_close,
+ .sock_setopt = resp_sock_setopt,
+ .sock_getopt = resp_sock_getopt,
+ .sock_rfilter = resp_sock_rfilter,
+ .sock_sfilter = resp_sock_sfilter,
};
-nni_proto nni_respondent_proto = {
+static nni_proto resp_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_RESPONDENT_V0, "respondent" },
.proto_peer = { NNG_PROTO_SURVEYOR_V0, "surveyor" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_resp_sock_ops,
- .proto_pipe_ops = &nni_resp_pipe_ops,
+ .proto_sock_ops = &resp_sock_ops,
+ .proto_pipe_ops = &resp_pipe_ops,
};
int
nng_respondent0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_respondent_proto));
+ return (nni_proto_open(sidp, &resp_proto));
}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index d7341025..09fe0768 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -16,18 +16,18 @@
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
-typedef struct nni_surv_pipe nni_surv_pipe;
-typedef struct nni_surv_sock nni_surv_sock;
-
-static void nni_surv_sock_getq_cb(void *);
-static void nni_surv_getq_cb(void *);
-static void nni_surv_putq_cb(void *);
-static void nni_surv_send_cb(void *);
-static void nni_surv_recv_cb(void *);
-static void nni_surv_timeout(void *);
-
-// An nni_surv_sock is our per-socket protocol private structure.
-struct nni_surv_sock {
+typedef struct surv_pipe surv_pipe;
+typedef struct surv_sock surv_sock;
+
+static void surv_sock_getq_cb(void *);
+static void surv_getq_cb(void *);
+static void surv_putq_cb(void *);
+static void surv_send_cb(void *);
+static void surv_recv_cb(void *);
+static void surv_timeout(void *);
+
+// A surv_sock is our per-socket protocol private structure.
+struct surv_sock {
nni_sock * nsock;
nni_duration survtime;
nni_time expire;
@@ -36,211 +36,214 @@ struct nni_surv_sock {
uint32_t nextid; // next id
uint32_t survid; // outstanding request ID (big endian)
nni_list pipes;
- nni_aio aio_getq;
+ nni_aio * aio_getq;
nni_timer_node timer;
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx mtx;
};
-// An nni_surv_pipe is our per-pipe protocol private structure.
-struct nni_surv_pipe {
- nni_pipe * npipe;
- nni_surv_sock *psock;
- nni_msgq * sendq;
- nni_list_node node;
- nni_aio aio_getq;
- nni_aio aio_putq;
- nni_aio aio_send;
- nni_aio aio_recv;
+// A surv_pipe is our per-pipe protocol private structure.
+struct surv_pipe {
+ nni_pipe * npipe;
+ surv_sock * psock;
+ nni_msgq * sendq;
+ nni_list_node node;
+ nni_aio * aio_getq;
+ nni_aio * aio_putq;
+ nni_aio * aio_send;
+ nni_aio * aio_recv;
};
static void
-nni_surv_sock_fini(void *arg)
+surv_sock_fini(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_aio_stop(&psock->aio_getq);
- nni_aio_fini(&psock->aio_getq);
- nni_mtx_fini(&psock->mtx);
- NNI_FREE_STRUCT(psock);
+ nni_aio_stop(s->aio_getq);
+ nni_aio_fini(s->aio_getq);
+ nni_mtx_fini(&s->mtx);
+ NNI_FREE_STRUCT(s);
}
static int
-nni_surv_sock_init(void **sp, nni_sock *nsock)
+surv_sock_init(void **sp, nni_sock *nsock)
{
- nni_surv_sock *psock;
+ surv_sock *s;
+ int rv;
- if ((psock = NNI_ALLOC_STRUCT(psock)) == NULL) {
+ if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- NNI_LIST_INIT(&psock->pipes, nni_surv_pipe, node);
- nni_mtx_init(&psock->mtx);
- nni_aio_init(&psock->aio_getq, nni_surv_sock_getq_cb, psock);
- nni_timer_init(&psock->timer, nni_surv_timeout, psock);
-
- psock->nextid = nni_random();
- psock->nsock = nsock;
- psock->raw = 0;
- psock->survtime = NNI_SECOND * 60;
- psock->expire = NNI_TIME_ZERO;
- psock->uwq = nni_sock_sendq(nsock);
- psock->urq = nni_sock_recvq(nsock);
-
- *sp = psock;
+ if ((rv = nni_aio_init(&s->aio_getq, surv_sock_getq_cb, s)) != 0) {
+ surv_sock_fini(s);
+ return (rv);
+ }
+ NNI_LIST_INIT(&s->pipes, surv_pipe, node);
+ nni_mtx_init(&s->mtx);
+ nni_timer_init(&s->timer, surv_timeout, s);
+
+ s->nextid = nni_random();
+ s->nsock = nsock;
+ s->raw = 0;
+ s->survtime = NNI_SECOND * 60;
+ s->expire = NNI_TIME_ZERO;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+
+ *sp = s;
nni_sock_recverr(nsock, NNG_ESTATE);
return (0);
}
static void
-nni_surv_sock_open(void *arg)
+surv_sock_open(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_msgq_aio_get(psock->uwq, &psock->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
-nni_surv_sock_close(void *arg)
+surv_sock_close(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_timer_cancel(&psock->timer);
- nni_aio_cancel(&psock->aio_getq, NNG_ECLOSED);
+ nni_timer_cancel(&s->timer);
+ nni_aio_cancel(s->aio_getq, NNG_ECLOSED);
}
static void
-nni_surv_pipe_fini(void *arg)
+surv_pipe_fini(void *arg)
{
- nni_surv_pipe *ppipe = arg;
-
- nni_aio_fini(&ppipe->aio_getq);
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_msgq_fini(ppipe->sendq);
- NNI_FREE_STRUCT(ppipe);
+ surv_pipe *p = arg;
+
+ nni_aio_fini(p->aio_getq);
+ nni_aio_fini(p->aio_send);
+ nni_aio_fini(p->aio_recv);
+ nni_aio_fini(p->aio_putq);
+ nni_msgq_fini(p->sendq);
+ NNI_FREE_STRUCT(p);
}
static int
-nni_surv_pipe_init(void **pp, nni_pipe *npipe, void *psock)
+surv_pipe_init(void **pp, nni_pipe *npipe, void *s)
{
- nni_surv_pipe *ppipe;
- int rv;
+ surv_pipe *p;
+ int rv;
- if ((ppipe = NNI_ALLOC_STRUCT(ppipe)) == NULL) {
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
// This depth could be tunable.
- if ((rv = nni_msgq_init(&ppipe->sendq, 16)) != 0) {
- NNI_FREE_STRUCT(ppipe);
+ if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_getq, surv_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_putq, surv_putq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv_recv_cb, p)) != 0)) {
+ surv_pipe_fini(p);
return (rv);
}
- nni_aio_init(&ppipe->aio_getq, nni_surv_getq_cb, ppipe);
- nni_aio_init(&ppipe->aio_putq, nni_surv_putq_cb, ppipe);
- nni_aio_init(&ppipe->aio_send, nni_surv_send_cb, ppipe);
- nni_aio_init(&ppipe->aio_recv, nni_surv_recv_cb, ppipe);
-
- ppipe->npipe = npipe;
- ppipe->psock = psock;
- *pp = ppipe;
+ p->npipe = npipe;
+ p->psock = s;
+ *pp = p;
return (0);
}
static int
-nni_surv_pipe_start(void *arg)
+surv_pipe_start(void *arg)
{
- nni_surv_pipe *ppipe = arg;
- nni_surv_sock *psock = ppipe->psock;
+ surv_pipe *p = arg;
+ surv_sock *s = p->psock;
- nni_mtx_lock(&psock->mtx);
- nni_list_append(&psock->pipes, ppipe);
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_lock(&s->mtx);
+ nni_list_append(&s->pipes, p);
+ nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(ppipe->sendq, &ppipe->aio_getq);
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_msgq_aio_get(p->sendq, p->aio_getq);
+ nni_pipe_recv(p->npipe, p->aio_recv);
return (0);
}
static void
-nni_surv_pipe_stop(void *arg)
+surv_pipe_stop(void *arg)
{
- nni_surv_pipe *ppipe = arg;
- nni_surv_sock *psock = ppipe->psock;
+ surv_pipe *p = arg;
+ surv_sock *s = p->psock;
- nni_aio_stop(&ppipe->aio_getq);
- nni_aio_stop(&ppipe->aio_send);
- nni_aio_stop(&ppipe->aio_recv);
- nni_aio_stop(&ppipe->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
- nni_msgq_close(ppipe->sendq);
+ nni_msgq_close(p->sendq);
- nni_mtx_lock(&psock->mtx);
- if (nni_list_active(&psock->pipes, ppipe)) {
- nni_list_remove(&psock->pipes, ppipe);
+ nni_mtx_lock(&s->mtx);
+ if (nni_list_active(&s->pipes, p)) {
+ nni_list_remove(&s->pipes, p);
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
}
static void
-nni_surv_getq_cb(void *arg)
+surv_getq_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
+ surv_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_getq) != 0) {
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_getq) != 0) {
+ nni_pipe_stop(p->npipe);
return;
}
- ppipe->aio_send.a_msg = ppipe->aio_getq.a_msg;
- ppipe->aio_getq.a_msg = NULL;
+ nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq));
+ nni_aio_set_msg(p->aio_getq, NULL);
- nni_pipe_send(ppipe->npipe, &ppipe->aio_send);
+ nni_pipe_send(p->npipe, p->aio_send);
}
static void
-nni_surv_send_cb(void *arg)
+surv_send_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
+ surv_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_send) != 0) {
- nni_msg_free(ppipe->aio_send.a_msg);
- ppipe->aio_send.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_send) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_send));
+ nni_aio_set_msg(p->aio_send, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_msgq_aio_get(ppipe->psock->uwq, &ppipe->aio_getq);
+ nni_msgq_aio_get(p->psock->uwq, p->aio_getq);
}
static void
-nni_surv_putq_cb(void *arg)
+surv_putq_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
+ surv_pipe *p = arg;
- if (nni_aio_result(&ppipe->aio_putq) != 0) {
- nni_msg_free(ppipe->aio_putq.a_msg);
- ppipe->aio_putq.a_msg = NULL;
- nni_pipe_stop(ppipe->npipe);
+ if (nni_aio_result(p->aio_putq) != 0) {
+ nni_msg_free(nni_aio_get_msg(p->aio_putq));
+ nni_aio_set_msg(p->aio_putq, NULL);
+ nni_pipe_stop(p->npipe);
return;
}
- nni_pipe_recv(ppipe->npipe, &ppipe->aio_recv);
+ nni_pipe_recv(p->npipe, p->aio_recv);
}
static void
-nni_surv_recv_cb(void *arg)
+surv_recv_cb(void *arg)
{
- nni_surv_pipe *ppipe = arg;
- nni_msg * msg;
+ surv_pipe *p = arg;
+ nni_msg * msg;
- if (nni_aio_result(&ppipe->aio_recv) != 0) {
+ if (nni_aio_result(p->aio_recv) != 0) {
goto failed;
}
- msg = ppipe->aio_recv.a_msg;
- ppipe->aio_recv.a_msg = NULL;
+ msg = nni_aio_get_msg(p->aio_recv);
+ nni_aio_set_msg(p->aio_recv, NULL);
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
@@ -255,35 +258,35 @@ nni_surv_recv_cb(void *arg)
}
(void) nni_msg_trim(msg, 4);
- ppipe->aio_putq.a_msg = msg;
- nni_msgq_aio_put(ppipe->psock->urq, &ppipe->aio_putq);
+ nni_aio_set_msg(p->aio_putq, msg);
+ nni_msgq_aio_put(p->psock->urq, p->aio_putq);
return;
failed:
- nni_pipe_stop(ppipe->npipe);
+ nni_pipe_stop(p->npipe);
}
static int
-nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
+surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
- nni_surv_sock *psock = arg;
- int rv = NNG_ENOTSUP;
- int oldraw;
+ surv_sock *s = arg;
+ int rv = NNG_ENOTSUP;
+ int oldraw;
if (opt == nng_optid_surveyor_surveytime) {
- rv = nni_setopt_usec(&psock->survtime, buf, sz);
+ rv = nni_setopt_usec(&s->survtime, buf, sz);
} else if (opt == nng_optid_raw) {
- oldraw = psock->raw;
- rv = nni_setopt_int(&psock->raw, buf, sz, 0, 1);
- if (oldraw != psock->raw) {
- if (psock->raw) {
- nni_sock_recverr(psock->nsock, 0);
+ oldraw = s->raw;
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ if (oldraw != s->raw) {
+ if (s->raw) {
+ nni_sock_recverr(s->nsock, 0);
} else {
- nni_sock_recverr(psock->nsock, NNG_ESTATE);
+ nni_sock_recverr(s->nsock, NNG_ESTATE);
}
- psock->survid = 0;
- nni_timer_cancel(&psock->timer);
+ s->survid = 0;
+ nni_timer_cancel(&s->timer);
}
}
@@ -291,49 +294,49 @@ nni_surv_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
}
static int
-nni_surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
+surv_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
- nni_surv_sock *psock = arg;
- int rv = NNG_ENOTSUP;
+ surv_sock *s = arg;
+ int rv = NNG_ENOTSUP;
if (opt == nng_optid_surveyor_surveytime) {
- rv = nni_getopt_usec(&psock->survtime, buf, szp);
+ rv = nni_getopt_usec(&s->survtime, buf, szp);
} else if (opt == nng_optid_raw) {
- rv = nni_getopt_int(&psock->raw, buf, szp);
+ rv = nni_getopt_int(&s->raw, buf, szp);
}
return (rv);
}
static void
-nni_surv_sock_getq_cb(void *arg)
+surv_sock_getq_cb(void *arg)
{
- nni_surv_sock *psock = arg;
- nni_surv_pipe *ppipe;
- nni_surv_pipe *last;
- nni_msg * msg, *dup;
+ surv_sock *s = arg;
+ surv_pipe *p;
+ surv_pipe *last;
+ nni_msg * msg, *dup;
- if (nni_aio_result(&psock->aio_getq) != 0) {
+ if (nni_aio_result(s->aio_getq) != 0) {
// Should be NNG_ECLOSED.
return;
}
- msg = psock->aio_getq.a_msg;
- psock->aio_getq.a_msg = NULL;
+ msg = nni_aio_get_msg(s->aio_getq);
+ nni_aio_set_msg(s->aio_getq, NULL);
- nni_mtx_lock(&psock->mtx);
- last = nni_list_last(&psock->pipes);
- NNI_LIST_FOREACH (&psock->pipes, ppipe) {
- if (ppipe != last) {
+ nni_mtx_lock(&s->mtx);
+ last = nni_list_last(&s->pipes);
+ NNI_LIST_FOREACH (&s->pipes, p) {
+ if (p != last) {
if (nni_msg_dup(&dup, msg) != 0) {
continue;
}
} else {
dup = msg;
}
- if (nni_msgq_tryput(ppipe->sendq, dup) != 0) {
+ if (nni_msgq_tryput(p->sendq, dup) != 0) {
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&psock->mtx);
+ nni_mtx_unlock(&s->mtx);
if (last == NULL) {
// If there were no pipes to send on, just toss the message.
@@ -342,23 +345,23 @@ nni_surv_sock_getq_cb(void *arg)
}
static void
-nni_surv_timeout(void *arg)
+surv_timeout(void *arg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- nni_sock_lock(psock->nsock);
- psock->survid = 0;
- nni_sock_recverr(psock->nsock, NNG_ESTATE);
- nni_msgq_set_get_error(psock->urq, NNG_ETIMEDOUT);
- nni_sock_unlock(psock->nsock);
+ nni_sock_lock(s->nsock);
+ s->survid = 0;
+ nni_sock_recverr(s->nsock, NNG_ESTATE);
+ nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT);
+ nni_sock_unlock(s->nsock);
}
static nni_msg *
-nni_surv_sock_sfilter(void *arg, nni_msg *msg)
+surv_sock_sfilter(void *arg, nni_msg *msg)
{
- nni_surv_sock *psock = arg;
+ surv_sock *s = arg;
- if (psock->raw) {
+ if (s->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
return (msg);
@@ -367,9 +370,9 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
- psock->survid = (psock->nextid++) | 0x80000000u;
+ s->survid = (s->nextid++) | 0x80000000u;
- if (nni_msg_header_append_u32(msg, psock->survid) != 0) {
+ if (nni_msg_header_append_u32(msg, s->survid) != 0) {
// Should be ENOMEM.
nni_msg_free(msg);
return (NULL);
@@ -378,28 +381,28 @@ nni_surv_sock_sfilter(void *arg, nni_msg *msg)
// If another message is there, this cancels it. We move the
// survey expiration out. The timeout thread will wake up in
// the wake below, and reschedule itself appropriately.
- psock->expire = nni_clock() + psock->survtime;
- nni_timer_schedule(&psock->timer, psock->expire);
+ s->expire = nni_clock() + s->survtime;
+ nni_timer_schedule(&s->timer, s->expire);
// Clear the error condition.
- nni_sock_recverr(psock->nsock, 0);
+ nni_sock_recverr(s->nsock, 0);
// nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
return (msg);
}
static nni_msg *
-nni_surv_sock_rfilter(void *arg, nni_msg *msg)
+surv_sock_rfilter(void *arg, nni_msg *msg)
{
- nni_surv_sock *ssock = arg;
+ surv_sock *s = arg;
- if (ssock->raw) {
+ if (s->raw) {
// Pass it unmolested
return (msg);
}
if ((nni_msg_header_len(msg) < sizeof(uint32_t)) ||
- (nni_msg_header_trim_u32(msg) != ssock->survid)) {
+ (nni_msg_header_trim_u32(msg) != s->survid)) {
// Wrong request id
nni_msg_free(msg);
return (NULL);
@@ -408,35 +411,35 @@ nni_surv_sock_rfilter(void *arg, nni_msg *msg)
return (msg);
}
-static nni_proto_pipe_ops nni_surv_pipe_ops = {
- .pipe_init = nni_surv_pipe_init,
- .pipe_fini = nni_surv_pipe_fini,
- .pipe_start = nni_surv_pipe_start,
- .pipe_stop = nni_surv_pipe_stop,
+static nni_proto_pipe_ops surv_pipe_ops = {
+ .pipe_init = surv_pipe_init,
+ .pipe_fini = surv_pipe_fini,
+ .pipe_start = surv_pipe_start,
+ .pipe_stop = surv_pipe_stop,
};
-static nni_proto_sock_ops nni_surv_sock_ops = {
- .sock_init = nni_surv_sock_init,
- .sock_fini = nni_surv_sock_fini,
- .sock_open = nni_surv_sock_open,
- .sock_close = nni_surv_sock_close,
- .sock_setopt = nni_surv_sock_setopt,
- .sock_getopt = nni_surv_sock_getopt,
- .sock_rfilter = nni_surv_sock_rfilter,
- .sock_sfilter = nni_surv_sock_sfilter,
+static nni_proto_sock_ops surv_sock_ops = {
+ .sock_init = surv_sock_init,
+ .sock_fini = surv_sock_fini,
+ .sock_open = surv_sock_open,
+ .sock_close = surv_sock_close,
+ .sock_setopt = surv_sock_setopt,
+ .sock_getopt = surv_sock_getopt,
+ .sock_rfilter = surv_sock_rfilter,
+ .sock_sfilter = surv_sock_sfilter,
};
-nni_proto nni_surveyor_proto = {
+static nni_proto surv_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNG_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNG_PROTO_RESPONDENT_V0, "respondent" },
.proto_flags = NNI_PROTO_FLAG_SNDRCV,
- .proto_sock_ops = &nni_surv_sock_ops,
- .proto_pipe_ops = &nni_surv_pipe_ops,
+ .proto_sock_ops = &surv_sock_ops,
+ .proto_pipe_ops = &surv_pipe_ops,
};
int
nng_surveyor0_open(nng_socket *sidp)
{
- return (nni_proto_open(sidp, &nni_surveyor_proto));
+ return (nni_proto_open(sidp, &surv_proto));
}
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index b2c382fb..f10f80b4 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -39,9 +39,9 @@ struct nni_ipc_pipe {
nni_aio *user_txaio;
nni_aio *user_rxaio;
nni_aio *user_negaio;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -51,7 +51,7 @@ struct nni_ipc_ep {
nni_plat_ipc_ep *iep;
uint16_t proto;
size_t rcvmax;
- nni_aio aio;
+ nni_aio * aio;
nni_aio * user_aio;
nni_mtx mtx;
};
@@ -94,13 +94,13 @@ nni_ipc_pipe_fini(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_aio_stop(&pipe->rxaio);
- nni_aio_stop(&pipe->txaio);
- nni_aio_stop(&pipe->negaio);
+ nni_aio_stop(pipe->rxaio);
+ nni_aio_stop(pipe->txaio);
+ nni_aio_stop(pipe->negaio);
- nni_aio_fini(&pipe->rxaio);
- nni_aio_fini(&pipe->txaio);
- nni_aio_fini(&pipe->negaio);
+ nni_aio_fini(pipe->rxaio);
+ nni_aio_fini(pipe->txaio);
+ nni_aio_fini(pipe->negaio);
if (pipe->ipp != NULL) {
nni_plat_ipc_pipe_fini(pipe->ipp);
}
@@ -115,14 +115,18 @@ static int
nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp)
{
nni_ipc_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p);
- nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p);
+ if (((rv = nni_aio_init(&p->txaio, nni_ipc_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_ipc_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_ipc_pipe_nego_cb, p)) != 0)) {
+ nni_ipc_pipe_fini(p);
+ return (rv);
+ }
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -146,7 +150,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv)
pipe->user_negaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->negaio, rv);
+ nni_aio_cancel(pipe->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -154,7 +158,7 @@ static void
nni_ipc_pipe_nego_cb(void *arg)
{
nni_ipc_pipe *pipe = arg;
- nni_aio * aio = &pipe->negaio;
+ nni_aio * aio = pipe->negaio;
int rv;
nni_mtx_lock(&pipe->mtx);
@@ -219,12 +223,13 @@ nni_ipc_pipe_send_cb(void *arg)
return;
}
pipe->user_txaio = NULL;
- if ((rv = nni_aio_result(&pipe->txaio)) != 0) {
+ if ((rv = nni_aio_result(pipe->txaio)) != 0) {
len = 0;
} else {
- len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
+ nni_msg *msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+ nni_msg_free(msg);
+ nni_aio_set_msg(aio, NULL);
}
nni_aio_finish(aio, rv, len);
nni_mtx_unlock(&pipe->mtx);
@@ -245,7 +250,7 @@ nni_ipc_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(&pipe->rxaio)) != 0) {
+ if ((rv = nni_aio_result(pipe->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
@@ -264,6 +269,7 @@ nni_ipc_pipe_recv_cb(void *arg)
// message to allocate and how much more to expect.
if (pipe->rxmsg == NULL) {
uint64_t len;
+ nni_aio *rxaio;
// Check to make sure we got msg type 1.
if (pipe->rxhead[0] != 1) {
@@ -298,11 +304,12 @@ nni_ipc_pipe_recv_cb(void *arg)
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- pipe->rxaio.a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
- pipe->rxaio.a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
- pipe->rxaio.a_niov = 1;
+ rxaio = pipe->rxaio;
+ rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg);
+ rxaio->a_iov[0].iov_len = nni_msg_len(pipe->rxmsg);
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
return;
}
@@ -329,7 +336,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv)
pipe->user_txaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->txaio, rv);
+ nni_aio_cancel(pipe->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -337,8 +344,9 @@ static void
nni_ipc_pipe_send(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
+ nni_aio * txaio;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -353,15 +361,16 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio)
pipe->txhead[0] = 1; // message type, 1.
NNI_PUT64(pipe->txhead + 1, len);
- pipe->txaio.a_iov[0].iov_buf = pipe->txhead;
- pipe->txaio.a_iov[0].iov_len = sizeof(pipe->txhead);
- pipe->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- pipe->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- pipe->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- pipe->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- pipe->txaio.a_niov = 3;
+ txaio = pipe->txaio;
+ txaio->a_iov[0].iov_buf = pipe->txhead;
+ txaio->a_iov[0].iov_len = sizeof(pipe->txhead);
+ txaio->a_iov[1].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
+ txaio->a_iov[2].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[2].iov_len = nni_msg_len(msg);
+ txaio->a_niov = 3;
- nni_plat_ipc_pipe_send(pipe->ipp, &pipe->txaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, txaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -378,7 +387,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv)
pipe->user_rxaio = NULL;
nni_mtx_unlock(&pipe->mtx);
- nni_aio_cancel(&pipe->rxaio, rv);
+ nni_aio_cancel(pipe->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -386,6 +395,7 @@ static void
nni_ipc_pipe_recv(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
+ nni_aio * rxaio;
nni_mtx_lock(&pipe->mtx);
@@ -398,11 +408,12 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(pipe->rxmsg == NULL);
// Schedule a read of the IPC header.
- pipe->rxaio.a_iov[0].iov_buf = pipe->rxhead;
- pipe->rxaio.a_iov[0].iov_len = sizeof(pipe->rxhead);
- pipe->rxaio.a_niov = 1;
+ rxaio = pipe->rxaio;
+ rxaio->a_iov[0].iov_buf = pipe->rxhead;
+ rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead);
+ rxaio->a_niov = 1;
- nni_plat_ipc_pipe_recv(pipe->ipp, &pipe->rxaio);
+ nni_plat_ipc_pipe_recv(pipe->ipp, rxaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -411,6 +422,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
{
nni_ipc_pipe *pipe = arg;
int rv;
+ nni_aio * negaio;
nni_mtx_lock(&pipe->mtx);
pipe->txhead[0] = 0;
@@ -420,20 +432,21 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&pipe->txhead[4], pipe->proto);
NNI_PUT16(&pipe->txhead[6], 0);
- pipe->user_negaio = aio;
- pipe->gotrxhead = 0;
- pipe->gottxhead = 0;
- pipe->wantrxhead = 8;
- pipe->wanttxhead = 8;
- pipe->negaio.a_niov = 1;
- pipe->negaio.a_iov[0].iov_len = 8;
- pipe->negaio.a_iov[0].iov_buf = &pipe->txhead[0];
+ pipe->user_negaio = aio;
+ pipe->gotrxhead = 0;
+ pipe->gottxhead = 0;
+ pipe->wantrxhead = 8;
+ pipe->wanttxhead = 8;
+ negaio = pipe->negaio;
+ negaio->a_niov = 1;
+ negaio->a_iov[0].iov_len = 8;
+ negaio->a_iov[0].iov_buf = &pipe->txhead[0];
rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe);
if (rv != 0) {
nni_mtx_unlock(&pipe->mtx);
return;
}
- nni_plat_ipc_pipe_send(pipe->ipp, &pipe->negaio);
+ nni_plat_ipc_pipe_send(pipe->ipp, negaio);
nni_mtx_unlock(&pipe->mtx);
}
@@ -473,9 +486,9 @@ nni_ipc_ep_fini(void *arg)
{
nni_ipc_ep *ep = arg;
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
nni_plat_ipc_ep_fini(ep->iep);
- nni_aio_fini(&ep->aio);
+ nni_aio_fini(ep->aio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -532,7 +545,7 @@ nni_ipc_ep_close(void *arg)
nni_plat_ipc_ep_close(ep->iep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
}
static int
@@ -554,19 +567,19 @@ nni_ipc_ep_finish(nni_ipc_ep *ep)
int rv;
nni_ipc_pipe *pipe = NULL;
- if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(ep->aio.a_pipe != NULL);
+ NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_ipc_pipe_init(&pipe, ep, ep->aio.a_pipe);
+ rv = nni_ipc_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
done:
- ep->aio.a_pipe = NULL;
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ nni_aio_set_pipe(ep->aio, NULL);
+ aio = ep->user_aio;
+ ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
NNI_ASSERT(pipe != NULL);
@@ -607,7 +620,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, rv);
+ nni_aio_cancel(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -627,7 +640,7 @@ nni_ipc_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_ipc_ep_accept(ep->iep, &ep->aio);
+ nni_plat_ipc_ep_accept(ep->iep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -648,7 +661,7 @@ nni_ipc_ep_connect(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_ipc_ep_connect(ep->iep, &ep->aio);
+ nni_plat_ipc_ep_connect(ep->iep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f0f07592..759e20f5 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -38,9 +38,9 @@ struct nni_tcp_pipe {
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
- nni_aio txaio;
- nni_aio rxaio;
- nni_aio negaio;
+ nni_aio *txaio;
+ nni_aio *rxaio;
+ nni_aio *negaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
@@ -52,7 +52,7 @@ struct nni_tcp_ep {
size_t rcvmax;
nni_duration linger;
int ipv4only;
- nni_aio aio;
+ nni_aio * aio;
nni_aio * user_aio;
nni_mtx mtx;
};
@@ -98,13 +98,13 @@ nni_tcp_pipe_fini(void *arg)
{
nni_tcp_pipe *p = arg;
- nni_aio_stop(&p->rxaio);
- nni_aio_stop(&p->txaio);
- nni_aio_stop(&p->negaio);
+ nni_aio_stop(p->rxaio);
+ nni_aio_stop(p->txaio);
+ nni_aio_stop(p->negaio);
- nni_aio_fini(&p->rxaio);
- nni_aio_fini(&p->txaio);
- nni_aio_fini(&p->negaio);
+ nni_aio_fini(p->rxaio);
+ nni_aio_fini(p->txaio);
+ nni_aio_fini(p->negaio);
if (p->tpp != NULL) {
nni_plat_tcp_pipe_fini(p->tpp);
}
@@ -119,14 +119,18 @@ static int
nni_tcp_pipe_init(nni_tcp_pipe **pipep, nni_tcp_ep *ep, void *tpp)
{
nni_tcp_pipe *p;
+ int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);
- nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p);
- nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p);
+ if (((rv = nni_aio_init(&p->txaio, nni_tcp_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->rxaio, nni_tcp_pipe_recv_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->negaio, nni_tcp_pipe_nego_cb, p)) != 0)) {
+ nni_tcp_pipe_fini(p);
+ return (rv);
+ }
p->proto = ep->proto;
p->rcvmax = ep->rcvmax;
@@ -150,7 +154,7 @@ nni_tcp_cancel_nego(nni_aio *aio, int rv)
p->user_negaio = NULL;
nni_mtx_unlock(&p->mtx);
- nni_aio_cancel(&p->negaio, rv);
+ nni_aio_cancel(p->negaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -158,7 +162,7 @@ static void
nni_tcp_pipe_nego_cb(void *arg)
{
nni_tcp_pipe *p = arg;
- nni_aio * aio = &p->negaio;
+ nni_aio * aio = p->negaio;
int rv;
nni_mtx_lock(&p->mtx);
@@ -224,12 +228,12 @@ nni_tcp_pipe_send_cb(void *arg)
}
p->user_txaio = NULL;
- if ((rv = nni_aio_result(&p->txaio)) != 0) {
+ if ((rv = nni_aio_result(p->txaio)) != 0) {
len = 0;
} else {
len = nni_msg_len(aio->a_msg);
- nni_msg_free(aio->a_msg);
- aio->a_msg = NULL;
+ nni_msg_free(nni_aio_get_msg(aio));
+ nni_aio_set_msg(aio, NULL);
}
nni_aio_finish(aio, 0, len);
nni_mtx_unlock(&p->mtx);
@@ -251,7 +255,7 @@ nni_tcp_pipe_recv_cb(void *arg)
return;
}
- if ((rv = nni_aio_result(&p->rxaio)) != 0) {
+ if ((rv = nni_aio_result(p->rxaio)) != 0) {
// Error on receive. This has to cause an error back
// to the user. Also, if we had allocated an rxmsg, lets
// toss it.
@@ -269,6 +273,7 @@ nni_tcp_pipe_recv_cb(void *arg)
// header, which is just the length. This tells us the size of the
// message to allocate and how much more to expect.
if (p->rxmsg == NULL) {
+ nni_aio *rxaio;
uint64_t len;
// We should have gotten a message header.
NNI_GET64(p->rxlen, len);
@@ -291,11 +296,12 @@ nni_tcp_pipe_recv_cb(void *arg)
// Submit the rest of the data for a read -- we want to
// read the entire message now.
- p->rxaio.a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
- p->rxaio.a_iov[0].iov_len = nni_msg_len(p->rxmsg);
- p->rxaio.a_niov = 1;
+ rxaio = p->rxaio;
+ rxaio->a_iov[0].iov_buf = nni_msg_body(p->rxmsg);
+ rxaio->a_iov[0].iov_len = nni_msg_len(p->rxmsg);
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
return;
}
@@ -322,7 +328,7 @@ nni_tcp_cancel_tx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&p->txaio, rv);
+ nni_aio_cancel(p->txaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -330,8 +336,9 @@ static void
nni_tcp_pipe_send(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
- nni_msg * msg = aio->a_msg;
+ nni_msg * msg = nni_aio_get_msg(aio);
uint64_t len;
+ nni_aio * txaio;
len = nni_msg_len(msg) + nni_msg_header_len(msg);
@@ -346,15 +353,16 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio)
NNI_PUT64(p->txlen, len);
- p->txaio.a_iov[0].iov_buf = p->txlen;
- p->txaio.a_iov[0].iov_len = sizeof(p->txlen);
- p->txaio.a_iov[1].iov_buf = nni_msg_header(msg);
- p->txaio.a_iov[1].iov_len = nni_msg_header_len(msg);
- p->txaio.a_iov[2].iov_buf = nni_msg_body(msg);
- p->txaio.a_iov[2].iov_len = nni_msg_len(msg);
- p->txaio.a_niov = 3;
+ txaio = p->txaio;
+ txaio->a_iov[0].iov_buf = p->txlen;
+ txaio->a_iov[0].iov_len = sizeof(p->txlen);
+ txaio->a_iov[1].iov_buf = nni_msg_header(msg);
+ txaio->a_iov[1].iov_len = nni_msg_header_len(msg);
+ txaio->a_iov[2].iov_buf = nni_msg_body(msg);
+ txaio->a_iov[2].iov_len = nni_msg_len(msg);
+ txaio->a_niov = 3;
- nni_plat_tcp_pipe_send(p->tpp, &p->txaio);
+ nni_plat_tcp_pipe_send(p->tpp, txaio);
nni_mtx_unlock(&p->mtx);
}
@@ -372,7 +380,7 @@ nni_tcp_cancel_rx(nni_aio *aio, int rv)
nni_mtx_unlock(&p->mtx);
// cancel the underlying operation.
- nni_aio_cancel(&p->rxaio, rv);
+ nni_aio_cancel(p->rxaio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -380,6 +388,7 @@ static void
nni_tcp_pipe_recv(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ nni_aio * rxaio;
nni_mtx_lock(&p->mtx);
@@ -392,11 +401,12 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio)
NNI_ASSERT(p->rxmsg == NULL);
// Schedule a read of the TCP header.
- p->rxaio.a_iov[0].iov_buf = p->rxlen;
- p->rxaio.a_iov[0].iov_len = sizeof(p->rxlen);
- p->rxaio.a_niov = 1;
+ rxaio = p->rxaio;
+ rxaio->a_iov[0].iov_buf = p->rxlen;
+ rxaio->a_iov[0].iov_len = sizeof(p->rxlen);
+ rxaio->a_niov = 1;
- nni_plat_tcp_pipe_recv(p->tpp, &p->rxaio);
+ nni_plat_tcp_pipe_recv(p->tpp, rxaio);
nni_mtx_unlock(&p->mtx);
}
@@ -525,6 +535,7 @@ static void
nni_tcp_pipe_start(void *arg, nni_aio *aio)
{
nni_tcp_pipe *p = arg;
+ nni_aio * negaio;
nni_mtx_lock(&p->mtx);
p->txlen[0] = 0;
@@ -534,19 +545,20 @@ nni_tcp_pipe_start(void *arg, nni_aio *aio)
NNI_PUT16(&p->txlen[4], p->proto);
NNI_PUT16(&p->txlen[6], 0);
- p->user_negaio = aio;
- p->gotrxhead = 0;
- p->gottxhead = 0;
- p->wantrxhead = 8;
- p->wanttxhead = 8;
- p->negaio.a_niov = 1;
- p->negaio.a_iov[0].iov_len = 8;
- p->negaio.a_iov[0].iov_buf = &p->txlen[0];
+ p->user_negaio = aio;
+ p->gotrxhead = 0;
+ p->gottxhead = 0;
+ p->wantrxhead = 8;
+ p->wanttxhead = 8;
+ negaio = p->negaio;
+ negaio->a_niov = 1;
+ negaio->a_iov[0].iov_len = 8;
+ negaio->a_iov[0].iov_buf = &p->txlen[0];
if (nni_aio_start(aio, nni_tcp_cancel_nego, p) != 0) {
nni_mtx_unlock(&p->mtx);
return;
}
- nni_plat_tcp_pipe_send(p->tpp, &p->negaio);
+ nni_plat_tcp_pipe_send(p->tpp, negaio);
nni_mtx_unlock(&p->mtx);
}
@@ -555,11 +567,11 @@ nni_tcp_ep_fini(void *arg)
{
nni_tcp_ep *ep = arg;
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
if (ep->tep != NULL) {
nni_plat_tcp_ep_fini(ep->tep);
}
- nni_aio_fini(&ep->aio);
+ nni_aio_fini(ep->aio);
nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
}
@@ -575,7 +587,7 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
char * lhost;
char * lserv;
nni_sockaddr rsa, lsa;
- nni_aio aio;
+ nni_aio * aio;
int passive;
// Make a copy of the url (to allow for destructive operations)
@@ -590,17 +602,19 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
passive = (mode == NNI_EP_MODE_DIAL ? 0 : 1);
- nni_aio_init(&aio, NULL, NULL);
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
// XXX: arguably we could defer this part to the point we do a bind
// or connect!
if ((rhost != NULL) || (rserv != NULL)) {
- aio.a_addr = &rsa;
- nni_plat_tcp_resolv(
- rhost, rserv, NNG_AF_UNSPEC, passive, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) != 0) {
+ aio->a_addr = &rsa;
+ nni_plat_tcp_resolv(rhost, rserv, NNG_AF_UNSPEC, passive, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
return (rv);
}
} else {
@@ -608,16 +622,17 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
if ((lhost != NULL) || (lserv != NULL)) {
- aio.a_addr = &lsa;
- nni_plat_tcp_resolv(
- lhost, lserv, NNG_AF_UNSPEC, passive, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) != 0) {
+ aio->a_addr = &lsa;
+ nni_plat_tcp_resolv(lhost, lserv, NNG_AF_UNSPEC, passive, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ nni_aio_fini(aio);
return (rv);
}
} else {
lsa.s_un.s_family = NNG_AF_UNSPEC;
}
+ nni_aio_fini(aio);
if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
return (NNG_ENOMEM);
@@ -633,8 +648,10 @@ nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock, int mode)
}
nni_mtx_init(&ep->mtx);
- nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep);
-
+ if ((rv = nni_aio_init(&ep->aio, nni_tcp_ep_cb, ep)) != 0) {
+ nni_tcp_ep_fini(ep);
+ return (rv);
+ }
ep->proto = nni_sock_proto(sock);
*epp = ep;
@@ -650,7 +667,7 @@ nni_tcp_ep_close(void *arg)
nni_plat_tcp_ep_close(ep->tep);
nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(&ep->aio);
+ nni_aio_stop(ep->aio);
}
static int
@@ -673,19 +690,19 @@ nni_tcp_ep_finish(nni_tcp_ep *ep)
int rv;
nni_tcp_pipe *pipe = NULL;
- if ((rv = nni_aio_result(&ep->aio)) != 0) {
+ if ((rv = nni_aio_result(ep->aio)) != 0) {
goto done;
}
- NNI_ASSERT(ep->aio.a_pipe != NULL);
+ NNI_ASSERT(nni_aio_get_pipe(ep->aio) != NULL);
// Attempt to allocate the parent pipe. If this fails we'll
// drop the connection (ENOMEM probably).
- rv = nni_tcp_pipe_init(&pipe, ep, ep->aio.a_pipe);
+ rv = nni_tcp_pipe_init(&pipe, ep, nni_aio_get_pipe(ep->aio));
done:
- ep->aio.a_pipe = NULL;
- aio = ep->user_aio;
- ep->user_aio = NULL;
+ nni_aio_set_pipe(ep->aio, NULL);
+ aio = ep->user_aio;
+ ep->user_aio = NULL;
if ((aio != NULL) && (rv == 0)) {
nni_aio_finish_pipe(aio, pipe);
@@ -723,7 +740,7 @@ nni_tcp_cancel_ep(nni_aio *aio, int rv)
ep->user_aio = NULL;
nni_mtx_unlock(&ep->mtx);
- nni_aio_cancel(&ep->aio, rv);
+ nni_aio_cancel(ep->aio, rv);
nni_aio_finish_error(aio, rv);
}
@@ -743,7 +760,7 @@ nni_tcp_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_tcp_ep_accept(ep->tep, &ep->aio);
+ nni_plat_tcp_ep_accept(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -764,7 +781,7 @@ nni_tcp_ep_connect(void *arg, nni_aio *aio)
ep->user_aio = aio;
- nni_plat_tcp_ep_connect(ep->tep, &ep->aio);
+ nni_plat_tcp_ep_connect(ep->tep, ep->aio);
nni_mtx_unlock(&ep->mtx);
}
diff --git a/tests/resolv.c b/tests/resolv.c
index 849b41cd..49d258c3 100644
--- a/tests/resolv.c
+++ b/tests/resolv.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -130,103 +131,103 @@ TestMain("Resolver", {
nni_init();
Convey("Google DNS IPv4 resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
+ aio->a_addr = &sa;
nni_plat_tcp_resolv("google-public-dns-a.google.com", "80",
- NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(80));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.8.8") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("Numeric UDP resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_udp_resolv("8.8.4.4", "69", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_udp_resolv("8.8.4.4", "69", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(69));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("Numeric v4 resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_tcp_resolv("8.8.4.4", "80", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("8.8.4.4", "80", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(80));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("Numeric v6 resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("::1", "80", NNG_AF_INET6, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in6.sa_family == NNG_AF_INET6);
So(sa.s_un.s_in6.sa_port == ntohs(80));
str = ip6tostr(&sa.s_un.s_in6.sa_addr);
So(strcmp(str, "::1") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("TCP Name service resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_tcp_resolv("8.8.4.4", "http", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_tcp_resolv("8.8.4.4", "http", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(80));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
Convey("UDP Name service resolves", {
- nni_aio aio;
+ nni_aio * aio;
const char * str;
nng_sockaddr sa;
- memset(&aio, 0, sizeof(aio));
+
nni_aio_init(&aio, NULL, NULL);
- aio.a_addr = &sa;
- nni_plat_udp_resolv("8.8.4.4", "tftp", NNG_AF_INET, 1, &aio);
- nni_aio_wait(&aio);
- So(nni_aio_result(&aio) == 0);
+ aio->a_addr = &sa;
+ nni_plat_udp_resolv("8.8.4.4", "tftp", NNG_AF_INET, 1, aio);
+ nni_aio_wait(aio);
+ So(nni_aio_result(aio) == 0);
So(sa.s_un.s_in.sa_family == NNG_AF_INET);
So(sa.s_un.s_in.sa_port == ntohs(69));
str = ip4tostr(&sa.s_un.s_in.sa_addr);
So(strcmp(str, "8.8.4.4") == 0);
- nni_aio_fini(&aio);
+ nni_aio_fini(aio);
});
nni_fini();