aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c75
-rw-r--r--src/core/aio.h26
-rw-r--r--src/core/endpt.c85
-rw-r--r--src/core/event.h3
-rw-r--r--src/core/msgqueue.c59
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/socket.c10
7 files changed, 183 insertions, 89 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 5b8c7970..9b308cd1 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -56,23 +56,34 @@ static nni_list nni_aio_expire_aios;
static void nni_aio_expire_add(nni_aio *);
-void
-nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
+int
+nni_aio_init(nni_aio **aiop, nni_cb cb, void *arg)
{
+ nni_aio *aio;
+
+ if ((aio = NNI_ALLOC_STRUCT(aio)) == NULL) {
+ return (NNG_ENOMEM);
+ }
memset(aio, 0, sizeof(*aio));
nni_cv_init(&aio->a_cv, &nni_aio_lk);
aio->a_expire = NNI_TIME_NEVER;
aio->a_init = 1;
nni_task_init(NULL, &aio->a_task, cb, arg);
+ *aiop = aio;
+ return (0);
}
void
nni_aio_fini(nni_aio *aio)
{
- nni_aio_stop(aio);
+ if (aio != NULL) {
+ nni_aio_stop(aio);
- // At this point the AIO is done.
- nni_cv_fini(&aio->a_cv);
+ // At this point the AIO is done.
+ nni_cv_fini(&aio->a_cv);
+
+ NNI_FREE_STRUCT(aio);
+ }
}
// nni_aio_stop cancels any oustanding operation, and waits for the
@@ -97,6 +108,60 @@ nni_aio_stop(nni_aio *aio)
nni_aio_wait(aio);
}
+void
+nni_aio_set_timeout(nni_aio *aio, nni_time when)
+{
+ aio->a_expire = when;
+}
+
+void
+nni_aio_set_msg(nni_aio *aio, nni_msg *msg)
+{
+ aio->a_msg = msg;
+}
+
+nni_msg *
+nni_aio_get_msg(nni_aio *aio)
+{
+ return (aio->a_msg);
+}
+
+void
+nni_aio_set_pipe(nni_aio *aio, void *p)
+{
+ aio->a_pipe = p;
+}
+
+void *
+nni_aio_get_pipe(nni_aio *aio)
+{
+ return (aio->a_pipe);
+}
+
+void
+nni_aio_set_ep(nni_aio *aio, void *ep)
+{
+ aio->a_endpt = ep;
+}
+
+void *
+nni_aio_get_ep(nni_aio *aio)
+{
+ return (aio->a_endpt);
+}
+
+void
+nni_aio_set_data(nni_aio *aio, void *data)
+{
+ aio->a_data = data;
+}
+
+void *
+nni_aio_get_data(nni_aio *aio)
+{
+ return (aio->a_data);
+}
+
int
nni_aio_result(nni_aio *aio)
{
diff --git a/src/core/aio.h b/src/core/aio.h
index 9114c9fe..eae17446 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -52,6 +52,9 @@ struct nni_aio {
// Resolver operations.
nni_sockaddr *a_addr;
+ // Extra user data.
+ void *a_data;
+
// Provider-use fields.
nni_aio_cancelfn a_prov_cancel;
void * a_prov_data;
@@ -65,7 +68,7 @@ struct nni_aio {
// the supplied argument when the operation is complete. If NULL is
// supplied for the callback, then nni_aio_wake is used in its place,
// and the aio is used for the argument.
-extern void nni_aio_init(nni_aio *, nni_cb, void *);
+extern int nni_aio_init(nni_aio **, nni_cb, void *);
// nni_aio_fini finalizes the aio, releasing resources (locks)
// associated with it. The caller is responsible for ensuring that any
@@ -83,6 +86,27 @@ extern void nni_aio_fini(nni_aio *);
// use nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);
+// nni_aio_set_data sets user data. This should only be done by the
+// consumer, initiating the I/O. The intention is to be able to store
+// additional data for use when the operation callback is executed.
+extern void nni_aio_set_data(nni_aio *, void *);
+
+// nni_aio_get_data returns the user data that was previously stored
+// with nni_aio_set_data.
+extern void *nni_aio_get_data(nni_aio *);
+
+extern void nni_aio_set_msg(nni_aio *, nni_msg *);
+extern nni_msg *nni_aio_get_msg(nni_aio *);
+extern void nni_aio_set_pipe(nni_aio *, void *);
+extern void * nni_aio_get_pipe(nni_aio *);
+extern void nni_aio_set_ep(nni_aio *, void *);
+extern void * nni_aio_get_ep(nni_aio *);
+
+// nni_aio_set_timeout sets the timeout (absolute) when the AIO will
+// be canceled. The cancelation does not happen until after nni_aio_start
+// is called.
+extern void nni_aio_set_timeout(nni_aio *, nni_time);
+
// nni_aio_result returns the result code (0 on success, or an NNG errno)
// for the operation. It is only valid to call this when the operation is
// complete (such as when the callback is executed or after nni_aio_wait
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9dc33867..9122bb58 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -31,10 +31,10 @@ struct nni_ep {
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_list ep_pipes;
- nni_aio ep_acc_aio;
- nni_aio ep_con_aio;
- nni_aio ep_con_syn; // used for sync connect
- nni_aio ep_tmo_aio; // backoff timer
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
nni_duration ep_maxrtime; // maximum time for reconnect
nni_duration ep_currtime; // current time for reconnect
nni_duration ep_inirtime; // initial time for reconnect
@@ -96,15 +96,15 @@ nni_ep_destroy(nni_ep *ep)
nni_sock_ep_remove(ep->ep_sock, ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
- nni_aio_fini(&ep->ep_acc_aio);
- nni_aio_fini(&ep->ep_con_aio);
- nni_aio_fini(&ep->ep_con_syn);
- nni_aio_fini(&ep->ep_tmo_aio);
+ nni_aio_fini(ep->ep_acc_aio);
+ nni_aio_fini(ep->ep_con_aio);
+ nni_aio_fini(ep->ep_con_syn);
+ nni_aio_fini(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_data != NULL) {
@@ -154,12 +154,12 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
nni_mtx_init(&ep->ep_mtx);
nni_cv_init(&ep->ep_cv, &ep->ep_mtx);
- nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep);
- nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep);
- nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep);
- nni_aio_init(&ep->ep_con_syn, NULL, NULL);
- if (((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ if (((rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_acc_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
+ ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
@@ -246,10 +246,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_cancel(&ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_cancel(&ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_acc_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_aio, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_con_syn, NNG_ECLOSED);
+ nni_aio_cancel(ep->ep_tmo_aio, NNG_ECLOSED);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -273,10 +273,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(&ep->ep_acc_aio);
- nni_aio_stop(&ep->ep_con_aio);
- nni_aio_stop(&ep->ep_con_syn);
- nni_aio_stop(&ep->ep_tmo_aio);
+ nni_aio_stop(ep->ep_acc_aio);
+ nni_aio_stop(ep->ep_con_aio);
+ nni_aio_stop(ep->ep_con_syn);
+ nni_aio_stop(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
@@ -327,10 +327,11 @@ nni_ep_tmo_start(nni_ep *ep)
// have a statistically perfect distribution with the modulo of
// the random number, but this really doesn't matter.
- ep->ep_tmo_aio.a_expire =
- nni_clock() + (backoff ? nni_random() % backoff : 0);
+ nni_aio_set_timeout(ep->ep_tmo_aio,
+ nni_clock() + (backoff ? nni_random() % backoff : 0));
+
ep->ep_tmo_run = 1;
- if (nni_aio_start(&ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
+ if (nni_aio_start(ep->ep_tmo_aio, nni_ep_tmo_cancel, ep) != 0) {
ep->ep_tmo_run = 0;
}
}
@@ -339,7 +340,7 @@ static void
nni_ep_tmo_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_tmo_aio;
+ nni_aio *aio = ep->ep_tmo_aio;
nni_mtx_lock(&ep->ep_mtx);
if (nni_aio_result(aio) == NNG_ETIMEDOUT) {
@@ -356,11 +357,11 @@ static void
nni_ep_con_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- rv = nni_pipe_create(ep, aio->a_pipe);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
switch (rv) {
@@ -392,14 +393,14 @@ nni_ep_con_cb(void *arg)
static void
nni_ep_con_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_con_aio;
+ nni_aio *aio = ep->ep_con_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
}
@@ -436,8 +437,8 @@ nni_ep_dial(nni_ep *ep, int flags)
}
// Synchronous mode: so we have to wait for it to complete.
- aio = &ep->ep_con_syn;
- aio->a_endpt = ep->ep_data;
+ aio = ep->ep_con_syn;
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_connect(ep->ep_data, aio);
ep->ep_started = 1;
nni_mtx_unlock(&ep->ep_mtx);
@@ -446,7 +447,7 @@ nni_ep_dial(nni_ep *ep, int flags)
// As we're synchronous, we also have to handle the completion.
if (((rv = nni_aio_result(aio)) != 0) ||
- ((rv = nni_pipe_create(ep, aio->a_pipe)) != 0)) {
+ ((rv = nni_pipe_create(ep, nni_aio_get_pipe(aio))) != 0)) {
nni_mtx_lock(&ep->ep_mtx);
ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
@@ -458,12 +459,12 @@ static void
nni_ep_acc_cb(void *arg)
{
nni_ep * ep = arg;
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
int rv;
if ((rv = nni_aio_result(aio)) == 0) {
- NNI_ASSERT(aio->a_pipe != NULL);
- rv = nni_pipe_create(ep, aio->a_pipe);
+ NNI_ASSERT(nni_aio_get_pipe(aio) != NULL);
+ rv = nni_pipe_create(ep, nni_aio_get_pipe(aio));
}
nni_mtx_lock(&ep->ep_mtx);
@@ -495,14 +496,14 @@ nni_ep_acc_cb(void *arg)
static void
nni_ep_acc_start(nni_ep *ep)
{
- nni_aio *aio = &ep->ep_acc_aio;
+ nni_aio *aio = ep->ep_acc_aio;
// Call with the Endpoint lock held.
if (ep->ep_closing) {
return;
}
- aio->a_pipe = NULL;
- aio->a_endpt = ep->ep_data;
+ nni_aio_set_pipe(aio, NULL);
+ nni_aio_set_ep(aio, ep->ep_data);
ep->ep_ops.ep_accept(ep->ep_data, aio);
}
diff --git a/src/core/event.h b/src/core/event.h
index 22af096e..9536d206 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -25,7 +26,7 @@ struct nng_notify {
void * n_arg;
int n_type;
nni_sock * n_sock;
- nni_aio n_aio;
+ nni_aio * n_aio;
};
extern void nni_ev_init(nni_event *, int, nni_sock *);
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index ef7dd5a2..6fd95f98 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -166,14 +166,14 @@ nni_msgq_run_putq(nni_msgq *mq)
size_t len;
while ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
+ msg = nni_aio_get_msg(waio);
len = nni_msg_len(msg);
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(raio);
nni_aio_list_remove(waio);
@@ -190,7 +190,7 @@ nni_msgq_run_putq(nni_msgq *mq)
mq->mq_put = 0;
}
mq->mq_len++;
- waio->a_msg = NULL;
+ nni_aio_set_msg(waio, NULL);
nni_aio_finish(waio, 0, len);
continue;
}
@@ -215,7 +215,6 @@ nni_msgq_run_getq(nni_msgq *mq)
mq->mq_get = 0;
}
mq->mq_len--;
- raio->a_msg = msg;
nni_aio_list_remove(raio);
nni_aio_finish_msg(raio, msg);
continue;
@@ -223,8 +222,8 @@ nni_msgq_run_getq(nni_msgq *mq)
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
- msg = waio->a_msg;
- waio->a_msg = NULL;
+ msg = nni_aio_get_msg(waio);
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
nni_aio_list_remove(raio);
@@ -397,34 +396,38 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- nni_msgq_aio_get(mq, &aio);
- nni_aio_wait(&aio);
- if ((rv = nni_aio_result(&aio)) == 0) {
- *msgp = aio.a_msg;
- aio.a_msg = NULL;
- }
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_msgq_aio_get(mq, aio);
+ nni_aio_wait(aio);
+ if ((rv = nni_aio_result(aio)) == 0) {
+ *msgp = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
+ }
+ nni_aio_fini(aio);
return (rv);
}
int
nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
{
- nni_aio aio;
- int rv;
-
- nni_aio_init(&aio, NULL, NULL);
- aio.a_expire = expire;
- aio.a_msg = msg;
- nni_msgq_aio_put(mq, &aio);
- nni_aio_wait(&aio);
- rv = nni_aio_result(&aio);
- nni_aio_fini(&aio);
+ nni_aio *aio;
+ int rv;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
+ nni_aio_set_timeout(aio, expire);
+ nni_aio_set_msg(aio, msg);
+ nni_msgq_aio_put(mq, aio);
+ nni_aio_wait(aio);
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
return (rv);
}
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 0c024c66..0670ed01 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -30,7 +30,7 @@ struct nni_pipe {
nni_mtx p_mtx;
nni_cv p_cv;
nni_list_node p_reap_node;
- nni_aio p_start_aio;
+ nni_aio * p_start_aio;
};
static nni_idhash *nni_pipes;
@@ -99,7 +99,7 @@ nni_pipe_destroy(nni_pipe *p)
}
// Stop any pending negotiation.
- nni_aio_stop(&p->p_start_aio);
+ nni_aio_stop(p->p_start_aio);
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
@@ -112,7 +112,7 @@ nni_pipe_destroy(nni_pipe *p)
// We have exclusive access at this point, so we can check if
// we are still on any lists.
- nni_aio_fini(&p->p_start_aio);
+ nni_aio_fini(p->p_start_aio);
if (nni_list_node_active(&p->p_ep_node)) {
nni_ep_pipe_remove(p->p_ep, p);
@@ -172,7 +172,7 @@ nni_pipe_close(nni_pipe *p)
nni_mtx_unlock(&p->p_mtx);
// abort any pending negotiation/start process.
- nni_aio_cancel(&p->p_start_aio, NNG_ECLOSED);
+ nni_aio_cancel(p->p_start_aio, NNG_ECLOSED);
}
void
@@ -205,7 +205,7 @@ static void
nni_pipe_start_cb(void *arg)
{
nni_pipe *p = arg;
- nni_aio * aio = &p->p_start_aio;
+ nni_aio * aio = p->p_start_aio;
int rv;
if ((rv = nni_aio_result(aio)) != 0) {
@@ -270,9 +270,9 @@ void
nni_pipe_start(nni_pipe *p)
{
if (p->p_tran_ops.p_start == NULL) {
- nni_aio_finish(&p->p_start_aio, 0, 0);
+ nni_aio_finish(p->p_start_aio, 0, 0);
} else {
- p->p_tran_ops.p_start(p->p_tran_data, &p->p_start_aio);
+ p->p_tran_ops.p_start(p->p_tran_data, p->p_start_aio);
}
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 6a242558..01fd9a0c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -222,7 +222,7 @@ nni_sock_cansend_cb(void *arg)
nni_notify *notify = arg;
nni_sock * sock = notify->n_sock;
- if (nni_aio_result(&notify->n_aio) != 0) {
+ if (nni_aio_result(notify->n_aio) != 0) {
return;
}
@@ -235,7 +235,7 @@ nni_sock_canrecv_cb(void *arg)
nni_notify *notify = arg;
nni_sock * sock = notify->n_sock;
- if (nni_aio_result(&notify->n_aio) != 0) {
+ if (nni_aio_result(notify->n_aio) != 0) {
return;
}
@@ -259,11 +259,11 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
switch (type) {
case NNG_EV_CAN_RCV:
nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
+ nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio);
break;
case NNG_EV_CAN_SND:
nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
+ nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio);
break;
default:
NNI_FREE_STRUCT(notify);
@@ -276,7 +276,7 @@ nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
void
nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
{
- nni_aio_fini(&notify->n_aio);
+ nni_aio_fini(notify->n_aio);
NNI_FREE_STRUCT(notify);
}