aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-03-11 22:38:21 -0800
committerGarrett D'Amore <garrett@damore.org>2017-03-11 22:38:21 -0800
commit3d4be5126f91978b7d7349de79334ecfc8fc2afe (patch)
treec8cfadbb1096e99cad21bddbb9fe9ff7b5dd175a /src/core
parent3d90bae8eda62fecdf367932fca591b965838e20 (diff)
downloadnng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.gz
nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.bz2
nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.zip
Notification working - separate thread now.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/event.c111
-rw-r--r--src/core/event.h14
-rw-r--r--src/core/msgqueue.c303
-rw-r--r--src/core/msgqueue.h4
-rw-r--r--src/core/options.c2
-rw-r--r--src/core/socket.c129
-rw-r--r--src/core/socket.h9
7 files changed, 151 insertions, 421 deletions
diff --git a/src/core/event.c b/src/core/event.c
index 98156a92..b0e02b16 100644
--- a/src/core/event.c
+++ b/src/core/event.c
@@ -18,10 +18,6 @@ nni_ev_init(nni_event *event, int type, nni_sock *sock)
int rv;
memset(event, 0, sizeof (*event));
- if ((rv = nni_cv_init(&event->e_cv, &sock->s_mx)) != 0) {
- return (rv);
- }
- NNI_LIST_NODE_INIT(&event->e_node);
event->e_type = type;
event->e_sock = sock;
return (0);
@@ -31,110 +27,5 @@ nni_ev_init(nni_event *event, int type, nni_sock *sock)
void
nni_ev_fini(nni_event *event)
{
- nni_cv_fini(&event->e_cv);
-}
-
-
-void
-nni_ev_submit(nni_event *event)
-{
- nni_sock *sock = event->e_sock;
-
- // If nobody is listening, don't bother submitting anything.
- // This reduces pressure on the socket locks & condvars, in the
- // typical case.
- if (nni_list_first(&sock->s_notify) == NULL) {
- event->e_pending = 0;
- event->e_done = 1;
- return;
- }
-
- // XXX: taskq_dispatch the event processing.
- // This probably should bump a reference count on the socket
- // first.
- // XXX: One question of note... the aio structures we use elsewhere
- // would be better than this. So instead of the handler doing two
- // context switches we can just do one.
-
- // Call with socket mutex owned!
- if (event->e_pending == 0) {
- event->e_pending = 1;
- event->e_done = 0;
- nni_list_append(&sock->s_events, event);
- nni_cv_wake(&sock->s_notify_cv);
- }
-}
-
-
-void
-nni_notifier(void *arg)
-{
- nni_sock *sock = arg;
- nni_event *event;
- nni_notify *notify;
-
- nni_mtx_lock(&sock->s_mx);
- for (;;) {
- if (sock->s_closing) {
- break;
- }
-
- if ((event = nni_list_first(&sock->s_events)) != NULL) {
- event->e_pending = 0;
- nni_list_remove(&sock->s_events, event);
- nni_mtx_unlock(&sock->s_mx);
-
- // Lock the notify list, it must not change.
- nni_mtx_lock(&sock->s_notify_mx);
- NNI_LIST_FOREACH (&sock->s_notify, notify) {
- if ((notify->n_mask & event->e_type) == 0) {
- // No interest.
- continue;
- }
- notify->n_func(event, notify->n_arg);
- }
- nni_mtx_unlock(&sock->s_notify_mx);
-
- nni_mtx_lock(&sock->s_mx);
- // Let the event submitter know we are done, unless
- // they have resubmitted. Submitters can wait on this
- // lock.
- event->e_done = 1;
- nni_cv_wake(&event->e_cv);
- continue;
- }
-
- nni_cv_wait(&sock->s_notify_cv);
- }
- nni_mtx_unlock(&sock->s_mx);
-}
-
-
-nni_notify *
-nni_add_notify(nni_sock *sock, int mask, nng_notify_func fn, void *arg)
-{
- nni_notify *notify;
-
- if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
- return (NULL);
- }
- notify->n_func = fn;
- notify->n_arg = arg;
- notify->n_mask = mask;
- NNI_LIST_NODE_INIT(&notify->n_node);
-
- nni_mtx_lock(&sock->s_notify_mx);
- nni_list_append(&sock->s_notify, notify);
- nni_mtx_unlock(&sock->s_notify_mx);
- return (notify);
-}
-
-
-void
-nni_rem_notify(nni_sock *sock, nni_notify *notify)
-{
- nni_mtx_lock(&sock->s_notify_mx);
- nni_list_remove(&sock->s_notify, notify);
- nni_mtx_unlock(&sock->s_notify_mx);
- NNI_FREE_STRUCT(notify);
+ NNI_ARG_UNUSED(event);
}
diff --git a/src/core/event.h b/src/core/event.h
index d07aa9dd..e43306c9 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -18,25 +18,17 @@ struct nng_event {
nni_sock * e_sock;
nni_ep * e_ep;
nni_pipe * e_pipe;
-
- int e_done; // true when notify thr is finished
- int e_pending; // true if event is queued
- nni_cv e_cv; // signaled when e_done is noted
- nni_list_node e_node; // location on the socket list
};
struct nng_notify {
- nni_list_node n_node;
nng_notify_func n_func;
void * n_arg;
- int n_mask;
+ int n_type;
+ nni_sock * n_sock;
+ nni_aio n_aio;
};
-extern void nni_notifier(void *);
extern int nni_ev_init(nni_event *, int, nni_sock *);
extern void nni_ev_fini(nni_event *);
-extern void nni_ev_submit(nni_event *); // call holding sock lock
-extern nni_notify *nni_add_notify(nni_sock *, int, nng_notify_func, void *);
-extern void nni_rem_notify(nni_sock *, nni_notify *);
#endif // CORE_EVENT_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 153d7c15..37d6d5ff 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -15,83 +15,29 @@
// side can close, and they may be closed more than once.
struct nni_msgq {
- nni_mtx mq_lock;
- nni_cv mq_readable;
- nni_cv mq_writeable;
- nni_cv mq_drained;
- int mq_cap;
- int mq_alloc; // alloc is cap + 2...
- int mq_len;
- int mq_get;
- int mq_put;
- int mq_closed;
- int mq_puterr;
- int mq_geterr;
- nni_msg ** mq_msgs;
-
- int mq_notify_sig;
- nni_cv mq_notify_cv;
- nni_thr mq_notify_thr;
- nni_msgq_notify_fn mq_notify_fn;
- void * mq_notify_arg;
-
- nni_list mq_aio_putq;
- nni_list mq_aio_getq;
- nni_list mq_aio_notify_get;
- nni_list mq_aio_notify_put;
-
- nni_timer_node mq_timer;
- nni_time mq_expire;
+ nni_mtx mq_lock;
+ nni_cv mq_drained;
+ int mq_cap;
+ int mq_alloc; // alloc is cap + 2...
+ int mq_len;
+ int mq_get;
+ int mq_put;
+ int mq_closed;
+ int mq_puterr;
+ int mq_geterr;
+ int mq_draining;
+ nni_msg ** mq_msgs;
+
+ nni_list mq_aio_putq;
+ nni_list mq_aio_getq;
+ nni_list mq_aio_notify_get;
+ nni_list mq_aio_notify_put;
+
+ nni_timer_node mq_timer;
+ nni_time mq_expire;
};
-// nni_msgq_notifier thread runs if events callbacks are registered on the
-// message queue, and calls the notification callbacks outside of the
-// lock. It looks at the actual msgq state to trigger the right events.
-static void
-nni_msgq_notifier(void *arg)
-{
- nni_msgq *mq = arg;
- int sig;
- nni_msgq_notify_fn fn;
- void *fnarg;
-
- for (;;) {
- nni_mtx_lock(&mq->mq_lock);
- while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) {
- nni_cv_wait(&mq->mq_notify_cv);
- }
- if (mq->mq_closed) {
- nni_mtx_unlock(&mq->mq_lock);
- break;
- }
-
- sig = mq->mq_notify_sig;
- mq->mq_notify_sig = 0;
-
- fn = mq->mq_notify_fn;
- fnarg = mq->mq_notify_arg;
- nni_mtx_unlock(&mq->mq_lock);
-
- if (fn != NULL) {
- fn(mq, sig, fnarg);
- }
- }
-}
-
-
-// nni_msgq_kick kicks the msgq notification thread. It should be called
-// with the lock held.
-static void
-nni_msgq_kick(nni_msgq *mq, int sig)
-{
- if (mq->mq_notify_fn != NULL) {
- mq->mq_notify_sig |= sig;
- nni_cv_wake(&mq->mq_notify_cv);
- }
-}
-
-
static void nni_msgq_run_timeout(void *);
int
@@ -123,10 +69,7 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
- if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) ||
- ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) ||
- ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) ||
- ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) {
+ if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
@@ -144,17 +87,14 @@ nni_msgq_init(nni_msgq **mqp, int cap)
mq->mq_closed = 0;
mq->mq_puterr = 0;
mq->mq_geterr = 0;
- mq->mq_notify_fn = NULL;
- mq->mq_notify_arg = NULL;
mq->mq_expire = NNI_TIME_NEVER;
+ mq->mq_draining = 0;
*mqp = mq;
return (0);
fail:
nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
nni_mtx_fini(&mq->mq_lock);
if (mq->mq_msgs != NULL) {
nni_free(mq->mq_msgs, sizeof (nng_msg *) * alloc);
@@ -173,10 +113,7 @@ nni_msgq_fini(nni_msgq *mq)
return;
}
nni_timer_cancel(&mq->mq_timer);
- nni_thr_fini(&mq->mq_notify_thr);
nni_cv_fini(&mq->mq_drained);
- nni_cv_fini(&mq->mq_writeable);
- nni_cv_fini(&mq->mq_readable);
nni_mtx_fini(&mq->mq_lock);
/* Free any orphaned messages. */
@@ -195,27 +132,6 @@ nni_msgq_fini(nni_msgq *mq)
}
-int
-nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
-{
- int rv;
-
- nni_thr_fini(&mq->mq_notify_thr);
-
- nni_mtx_lock(&mq->mq_lock);
- rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq);
- if (rv != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
- mq->mq_notify_fn = fn;
- mq->mq_notify_arg = arg;
- nni_thr_run(&mq->mq_notify_thr);
- nni_mtx_unlock(&mq->mq_lock);
- return (0);
-}
-
-
void
nni_msgq_set_get_error(nni_msgq *mq, int error)
{
@@ -329,14 +245,6 @@ nni_msgq_run_putq(nni_msgq *mq)
// Unable to make progress, leave the aio where it is.
break;
}
-
- // XXX: REMOVE ME WHEN WE GO COMPLETELY ASYNC.
- if (mq->mq_len != 0) {
- nni_cv_wake(&mq->mq_readable);
- }
- if (mq->mq_len < mq->mq_cap) {
- nni_cv_wake(&mq->mq_writeable);
- }
}
@@ -394,7 +302,6 @@ nni_msgq_run_notify(nni_msgq *mq)
}
if ((mq->mq_len < mq->mq_cap) ||
(nni_list_first(&mq->mq_aio_getq) != NULL)) {
-
NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
// This stays on the list.
nni_aio_finish(aio, 0, 0);
@@ -406,6 +313,37 @@ nni_msgq_run_notify(nni_msgq *mq)
nni_aio_finish(aio, 0, 0);
}
}
+
+ if (mq->mq_draining) {
+ if ((mq->mq_len == 0) &&
+ (nni_list_first(&mq->mq_aio_putq) == NULL)) {
+ nni_cv_wake(&mq->mq_drained);
+ }
+ }
+}
+
+
+void
+nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (nni_list_active(&mq->mq_aio_notify_put, aio)) {
+ nni_list_remove(&mq->mq_aio_notify_put, aio);
+ }
+ nni_list_append(&mq->mq_aio_notify_put, aio);
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
+
+void
+nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (nni_list_active(&mq->mq_aio_notify_get, aio)) {
+ nni_list_remove(&mq->mq_aio_notify_get, aio);
+ }
+ nni_list_append(&mq->mq_aio_notify_get, aio);
+ nni_mtx_unlock(&mq->mq_lock);
}
@@ -608,122 +546,6 @@ nni_msgq_run_timeout(void *arg)
}
-#if 0
-int
-nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
-{
- int rv;
-
- nni_mtx_lock(&mq->mq_lock);
-
- for (;;) {
- // if closed, we don't put more... this check is first!
- if (mq->mq_closed) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ECLOSED);
- }
-
- if ((rv = mq->mq_puterr) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return (rv);
- }
-
- // room in the queue?
- if (mq->mq_len < mq->mq_cap) {
- break;
- }
-
- // unbuffered, room for one, and a reader waiting?
- if (mq->mq_rwait &&
- (mq->mq_cap == 0) &&
- (mq->mq_len == mq->mq_cap)) {
- break;
- }
-
- // interrupted?
- if (*sig) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EINTR);
- }
-
- // single poll?
- if (expire == NNI_TIME_ZERO) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_EAGAIN);
- }
-
- // waiting....
- mq->mq_wwait = 1;
-
- // if we are unbuffered, kick the notifier, because we're
- // writable.
- if (mq->mq_cap == 0) {
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
- }
-
- // not writeable, so wait until something changes
- rv = nni_cv_until(&mq->mq_writeable, expire);
- if (rv == NNG_ETIMEDOUT) {
- nni_mtx_unlock(&mq->mq_lock);
- return (NNG_ETIMEDOUT);
- }
- }
-
- // Writeable! Yay!!
- mq->mq_msgs[mq->mq_put] = msg;
- mq->mq_put++;
- if (mq->mq_put == mq->mq_alloc) {
- mq->mq_put = 0;
- }
- mq->mq_len++;
- if (mq->mq_rwait) {
- mq->mq_rwait = 0;
- nni_cv_wake(&mq->mq_readable);
- }
- if (mq->mq_len < mq->mq_cap) {
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
- }
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
- nni_msgq_run_getq(mq);
- nni_mtx_unlock(&mq->mq_lock);
-
- return (0);
-}
-
-
-static int
-nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
-{
- int rv;
-
- nni_mtx_lock(&mq->mq_lock);
-
- // Readable! Yay!!
-
- *msgp = mq->mq_msgs[mq->mq_get];
- mq->mq_len--;
- mq->mq_get++;
- if (mq->mq_get == mq->mq_alloc) {
- mq->mq_get = 0;
- }
- if (mq->mq_wwait) {
- mq->mq_wwait = 0;
- nni_cv_wake(&mq->mq_writeable);
- }
- if (mq->mq_len) {
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
- }
- nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
- nni_msgq_run_putq(mq);
- nni_mtx_unlock(&mq->mq_lock);
-
- return (0);
-}
-
-
-#endif
-
-
int
nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
{
@@ -781,17 +603,23 @@ nni_msgq_put(nni_msgq *mq, nni_msg *msg)
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
+ nni_aio *aio;
+
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cv_wake(&mq->mq_writeable);
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_notify_cv);
- while (mq->mq_len > 0) {
+ mq->mq_draining = 1;
+ while ((mq->mq_len > 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
}
}
// If we timedout, free any remaining messages in the queue.
+ // Also complete the putq as NNG_ECLOSED.
+
+ while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
while (mq->mq_len > 0) {
nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get > mq->mq_alloc) {
@@ -812,9 +640,6 @@ nni_msgq_close(nni_msgq *mq)
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
- nni_cv_wake(&mq->mq_writeable);
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_notify_cv);
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
@@ -937,8 +762,6 @@ nni_msgq_resize(nni_msgq *mq, int cap)
out:
// Wake everyone up -- we changed everything.
- nni_cv_wake(&mq->mq_readable);
- nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_drained);
nni_mtx_unlock(&mq->mq_lock);
return (0);
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 51143706..682c9821 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -39,8 +39,8 @@ extern int nni_msgq_canget(nni_msgq *);
extern int nni_msgq_canput(nni_msgq *);
extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
-extern int nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
+extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
+extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_cancel(nni_msgq *, nni_aio *);
// nni_msgq_put puts the message to the queue. It blocks until it
diff --git a/src/core/options.c b/src/core/options.c
index cd67e0d0..245172bf 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -195,7 +195,7 @@ nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
return (rv);
}
- if (nni_add_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
+ if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
return (NNG_ENOMEM);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 5beffaf0..abbd2c2c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -229,30 +229,93 @@ nni_sock_unlock(nni_sock *sock)
static void
-nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg)
+nni_sock_cansend_cb(void *arg)
{
- nni_sock *sock = arg;
+ nni_notify *notify = arg;
+ nni_sock *sock = notify->n_sock;
- if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) {
- return; // No interest in writability of read queue.
+ if (nni_aio_result(&notify->n_aio) != 0) {
+ return;
}
- nni_mtx_lock(&sock->s_mx);
- nni_ev_submit(&sock->s_recv_ev);
- nni_mtx_unlock(&sock->s_mx);
+
+ notify->n_func(&sock->s_send_ev, notify->n_arg);
}
static void
-nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg)
+nni_sock_canrecv_cb(void *arg)
{
- nni_sock *sock = arg;
+ nni_notify *notify = arg;
+ nni_sock *sock = notify->n_sock;
- if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) {
- return; // No interest in readability of write queue.
+ if (nni_aio_result(&notify->n_aio) != 0) {
+ return;
}
- nni_mtx_lock(&sock->s_mx);
- nni_ev_submit(&sock->s_send_ev);
- nni_mtx_unlock(&sock->s_mx);
+
+ notify->n_func(&sock->s_recv_ev, notify->n_arg);
+}
+
+
+nni_notify *
+nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
+{
+ nni_notify *notify;
+ int rv;
+
+ if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
+ return (NULL);
+ }
+
+ notify->n_func = fn;
+ notify->n_arg = arg;
+ notify->n_type = type;
+ notify->n_sock = sock;
+
+ switch (type) {
+ case NNG_EV_CAN_RCV:
+ rv = nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
+ if (rv != 0) {
+ goto fail;
+ }
+ nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
+ break;
+ case NNG_EV_CAN_SND:
+ rv = nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
+ if (rv != 0) {
+ goto fail;
+ }
+ nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ goto fail;
+ break;
+ }
+
+ return (notify);
+
+fail:
+ nni_aio_fini(&notify->n_aio);
+ NNI_FREE_STRUCT(notify);
+ return (NULL);
+}
+
+
+void
+nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
+{
+ switch (notify->n_type) {
+ case NNG_EV_CAN_RCV:
+ nni_msgq_aio_cancel(sock->s_urq, &notify->n_aio);
+ break;
+ case NNG_EV_CAN_SND:
+ nni_msgq_aio_cancel(sock->s_uwq, &notify->n_aio);
+ break;
+ default:
+ return;
+ }
+ nni_aio_fini(&notify->n_aio);
+ NNI_FREE_STRUCT(notify);
}
@@ -344,8 +407,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
- NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
- NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
sock->s_send_fd.sn_init = 0;
sock->s_recv_fd.sn_init = 0;
@@ -379,9 +440,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
if (((rv = nni_mtx_init(&sock->s_mx)) != 0) ||
- ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) ||
- ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) ||
- ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) {
+ ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) {
goto fail;
}
@@ -398,10 +457,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- if ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0) {
- goto fail;
- }
-
if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) {
goto fail;
@@ -421,21 +476,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- // XXX: This kills performance. Look at moving this to
- // be conditional - if nobody has callbacks because their code is
- // also threaded, then we don't need to jump through these hoops.
- rv = nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
- if (rv != 0) {
- goto fail;
- }
-
sops->sock_open(sock->s_data);
- nni_thr_run(&sock->s_notifier);
*sockp = sock;
return (0);
@@ -453,15 +495,12 @@ fail:
}
nni_mtx_unlock(nni_idlock);
}
- nni_thr_fini(&sock->s_notifier);
nni_ev_fini(&sock->s_send_ev);
nni_ev_fini(&sock->s_recv_ev);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_refcv);
- nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_notify_mx);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
return (rv);
@@ -545,7 +584,6 @@ nni_sock_shutdown(nni_sock *sock)
sock->s_sock_ops.sock_close(sock->s_data);
- nni_cv_wake(&sock->s_notify_cv);
nni_cv_wake(&sock->s_cv);
while ((nni_list_first(&sock->s_idles) != NULL) ||
@@ -554,9 +592,6 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&sock->s_mx);
- // Wait for the threads to exit.
- nni_thr_wait(&sock->s_notifier);
-
// At this point, there are no threads blocked inside of us
// that are referencing socket state. User code should call
// nng_close to release the last resources.
@@ -611,20 +646,12 @@ nni_sock_close(nni_sock *sock)
// The protocol needs to clean up its state.
sock->s_sock_ops.sock_fini(sock->s_data);
- // And we need to clean up *our* state.
- while ((notify = nni_list_first(&sock->s_notify)) != NULL) {
- nni_list_remove(&sock->s_notify, notify);
- NNI_FREE_STRUCT(notify);
- }
- nni_thr_fini(&sock->s_notifier);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_ev_fini(&sock->s_send_ev);
nni_ev_fini(&sock->s_recv_ev);
nni_cv_fini(&sock->s_refcv);
- nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_notify_mx);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 6a608d0a..a746a170 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -44,15 +44,9 @@ struct nni_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // ready pipes (started)
nni_list s_idles; // idle pipes (not ready)
- nni_list s_events; // pending events
- nni_list s_notify; // event watchers
- nni_cv s_notify_cv; // wakes notify thread
- nni_mtx s_notify_mx; // protects s_notify list
size_t s_rcvmaxsz; // maximum receive size
- nni_thr s_notifier;
-
int s_ep_pend; // EP dial/listen in progress
int s_closing; // Socket is closing
int s_closed; // Socket closed
@@ -89,6 +83,9 @@ extern uint32_t nni_sock_id(nni_sock *);
extern void nni_sock_lock(nni_sock *);
extern void nni_sock_unlock(nni_sock *);
+extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
+extern void nni_sock_unnotify(nni_sock *, nni_notify *);
+
// nni_sock_pipe_add is called by the pipe to register the pipe with
// with the socket. The pipe is added to the idle list.
extern void nni_sock_pipe_add(nni_sock *, nni_pipe *);