summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c129
1 files changed, 78 insertions, 51 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 5beffaf0..abbd2c2c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -229,30 +229,93 @@ nni_sock_unlock(nni_sock *sock)
static void
-nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg)
+nni_sock_cansend_cb(void *arg)
{
- nni_sock *sock = arg;
+ nni_notify *notify = arg;
+ nni_sock *sock = notify->n_sock;
- if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) {
- return; // No interest in writability of read queue.
+ if (nni_aio_result(&notify->n_aio) != 0) {
+ return;
}
- nni_mtx_lock(&sock->s_mx);
- nni_ev_submit(&sock->s_recv_ev);
- nni_mtx_unlock(&sock->s_mx);
+
+ notify->n_func(&sock->s_send_ev, notify->n_arg);
}
static void
-nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg)
+nni_sock_canrecv_cb(void *arg)
{
- nni_sock *sock = arg;
+ nni_notify *notify = arg;
+ nni_sock *sock = notify->n_sock;
- if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) {
- return; // No interest in readability of write queue.
+ if (nni_aio_result(&notify->n_aio) != 0) {
+ return;
}
- nni_mtx_lock(&sock->s_mx);
- nni_ev_submit(&sock->s_send_ev);
- nni_mtx_unlock(&sock->s_mx);
+
+ notify->n_func(&sock->s_recv_ev, notify->n_arg);
+}
+
+
+nni_notify *
+nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
+{
+ nni_notify *notify;
+ int rv;
+
+ if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
+ return (NULL);
+ }
+
+ notify->n_func = fn;
+ notify->n_arg = arg;
+ notify->n_type = type;
+ notify->n_sock = sock;
+
+ switch (type) {
+ case NNG_EV_CAN_RCV:
+ rv = nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
+ if (rv != 0) {
+ goto fail;
+ }
+ nni_msgq_aio_notify_get(sock->s_urq, &notify->n_aio);
+ break;
+ case NNG_EV_CAN_SND:
+ rv = nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
+ if (rv != 0) {
+ goto fail;
+ }
+ nni_msgq_aio_notify_put(sock->s_uwq, &notify->n_aio);
+ break;
+ default:
+ rv = NNG_ENOTSUP;
+ goto fail;
+ break;
+ }
+
+ return (notify);
+
+fail:
+ nni_aio_fini(&notify->n_aio);
+ NNI_FREE_STRUCT(notify);
+ return (NULL);
+}
+
+
+void
+nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
+{
+ switch (notify->n_type) {
+ case NNG_EV_CAN_RCV:
+ nni_msgq_aio_cancel(sock->s_urq, &notify->n_aio);
+ break;
+ case NNG_EV_CAN_SND:
+ nni_msgq_aio_cancel(sock->s_uwq, &notify->n_aio);
+ break;
+ default:
+ return;
+ }
+ nni_aio_fini(&notify->n_aio);
+ NNI_FREE_STRUCT(notify);
}
@@ -344,8 +407,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
- NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
- NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
sock->s_send_fd.sn_init = 0;
sock->s_recv_fd.sn_init = 0;
@@ -379,9 +440,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
if (((rv = nni_mtx_init(&sock->s_mx)) != 0) ||
- ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) ||
- ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) ||
- ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) {
+ ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) {
goto fail;
}
@@ -398,10 +457,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- if ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0) {
- goto fail;
- }
-
if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) {
goto fail;
@@ -421,21 +476,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- // XXX: This kills performance. Look at moving this to
- // be conditional - if nobody has callbacks because their code is
- // also threaded, then we don't need to jump through these hoops.
- rv = nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
- if (rv != 0) {
- goto fail;
- }
- rv = nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
- if (rv != 0) {
- goto fail;
- }
-
sops->sock_open(sock->s_data);
- nni_thr_run(&sock->s_notifier);
*sockp = sock;
return (0);
@@ -453,15 +495,12 @@ fail:
}
nni_mtx_unlock(nni_idlock);
}
- nni_thr_fini(&sock->s_notifier);
nni_ev_fini(&sock->s_send_ev);
nni_ev_fini(&sock->s_recv_ev);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_cv_fini(&sock->s_refcv);
- nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_notify_mx);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
return (rv);
@@ -545,7 +584,6 @@ nni_sock_shutdown(nni_sock *sock)
sock->s_sock_ops.sock_close(sock->s_data);
- nni_cv_wake(&sock->s_notify_cv);
nni_cv_wake(&sock->s_cv);
while ((nni_list_first(&sock->s_idles) != NULL) ||
@@ -554,9 +592,6 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&sock->s_mx);
- // Wait for the threads to exit.
- nni_thr_wait(&sock->s_notifier);
-
// At this point, there are no threads blocked inside of us
// that are referencing socket state. User code should call
// nng_close to release the last resources.
@@ -611,20 +646,12 @@ nni_sock_close(nni_sock *sock)
// The protocol needs to clean up its state.
sock->s_sock_ops.sock_fini(sock->s_data);
- // And we need to clean up *our* state.
- while ((notify = nni_list_first(&sock->s_notify)) != NULL) {
- nni_list_remove(&sock->s_notify, notify);
- NNI_FREE_STRUCT(notify);
- }
- nni_thr_fini(&sock->s_notifier);
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_ev_fini(&sock->s_send_ev);
nni_ev_fini(&sock->s_recv_ev);
nni_cv_fini(&sock->s_refcv);
- nni_cv_fini(&sock->s_notify_cv);
nni_cv_fini(&sock->s_cv);
- nni_mtx_fini(&sock->s_notify_mx);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
}