diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-03-11 22:38:21 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-03-11 22:38:21 -0800 |
| commit | 3d4be5126f91978b7d7349de79334ecfc8fc2afe (patch) | |
| tree | c8cfadbb1096e99cad21bddbb9fe9ff7b5dd175a /src/core/socket.c | |
| parent | 3d90bae8eda62fecdf367932fca591b965838e20 (diff) | |
| download | nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.gz nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.tar.bz2 nng-3d4be5126f91978b7d7349de79334ecfc8fc2afe.zip | |
Notification working - separate thread now.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 129 |
1 files changed, 78 insertions, 51 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 5beffaf0..abbd2c2c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -229,30 +229,93 @@ nni_sock_unlock(nni_sock *sock) static void -nni_sock_urq_notify(nni_msgq *mq, int flags, void *arg) +nni_sock_cansend_cb(void *arg) { - nni_sock *sock = arg; + nni_notify *notify = arg; + nni_sock *sock = notify->n_sock; - if ((flags & NNI_MSGQ_NOTIFY_CANGET) == 0) { - return; // No interest in writability of read queue. + if (nni_aio_result(¬ify->n_aio) != 0) { + return; } - nni_mtx_lock(&sock->s_mx); - nni_ev_submit(&sock->s_recv_ev); - nni_mtx_unlock(&sock->s_mx); + + notify->n_func(&sock->s_send_ev, notify->n_arg); } static void -nni_sock_uwq_notify(nni_msgq *mq, int flags, void *arg) +nni_sock_canrecv_cb(void *arg) { - nni_sock *sock = arg; + nni_notify *notify = arg; + nni_sock *sock = notify->n_sock; - if ((flags & NNI_MSGQ_NOTIFY_CANPUT) == 0) { - return; // No interest in readability of write queue. + if (nni_aio_result(¬ify->n_aio) != 0) { + return; } - nni_mtx_lock(&sock->s_mx); - nni_ev_submit(&sock->s_send_ev); - nni_mtx_unlock(&sock->s_mx); + + notify->n_func(&sock->s_recv_ev, notify->n_arg); +} + + +nni_notify * +nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) +{ + nni_notify *notify; + int rv; + + if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { + return (NULL); + } + + notify->n_func = fn; + notify->n_arg = arg; + notify->n_type = type; + notify->n_sock = sock; + + switch (type) { + case NNG_EV_CAN_RCV: + rv = nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); + if (rv != 0) { + goto fail; + } + nni_msgq_aio_notify_get(sock->s_urq, ¬ify->n_aio); + break; + case NNG_EV_CAN_SND: + rv = nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); + if (rv != 0) { + goto fail; + } + nni_msgq_aio_notify_put(sock->s_uwq, ¬ify->n_aio); + break; + default: + rv = NNG_ENOTSUP; + goto fail; + break; + } + + return (notify); + +fail: + nni_aio_fini(¬ify->n_aio); + NNI_FREE_STRUCT(notify); + return (NULL); +} + + +void +nni_sock_unnotify(nni_sock *sock, nni_notify *notify) +{ + switch (notify->n_type) { + case NNG_EV_CAN_RCV: + nni_msgq_aio_cancel(sock->s_urq, ¬ify->n_aio); + break; + case NNG_EV_CAN_SND: + nni_msgq_aio_cancel(sock->s_uwq, ¬ify->n_aio); + break; + default: + return; + } + nni_aio_fini(¬ify->n_aio); + NNI_FREE_STRUCT(notify); } @@ -344,8 +407,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); - NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node); - NNI_LIST_INIT(&sock->s_events, nni_event, e_node); sock->s_send_fd.sn_init = 0; sock->s_recv_fd.sn_init = 0; @@ -379,9 +440,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || - ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) || - ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) { + ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0)) { goto fail; } @@ -398,10 +457,6 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if ((rv = nni_thr_init(&sock->s_notifier, nni_notifier, sock)) != 0) { - goto fail; - } - if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { goto fail; @@ -421,21 +476,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - // XXX: This kills performance. Look at moving this to - // be conditional - if nobody has callbacks because their code is - // also threaded, then we don't need to jump through these hoops. - rv = nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock); - if (rv != 0) { - goto fail; - } - rv = nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock); - if (rv != 0) { - goto fail; - } - sops->sock_open(sock->s_data); - nni_thr_run(&sock->s_notifier); *sockp = sock; return (0); @@ -453,15 +495,12 @@ fail: } nni_mtx_unlock(nni_idlock); } - nni_thr_fini(&sock->s_notifier); nni_ev_fini(&sock->s_send_ev); nni_ev_fini(&sock->s_recv_ev); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_cv_fini(&sock->s_refcv); - nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_notify_mx); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); @@ -545,7 +584,6 @@ nni_sock_shutdown(nni_sock *sock) sock->s_sock_ops.sock_close(sock->s_data); - nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); while ((nni_list_first(&sock->s_idles) != NULL) || @@ -554,9 +592,6 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&sock->s_mx); - // Wait for the threads to exit. - nni_thr_wait(&sock->s_notifier); - // At this point, there are no threads blocked inside of us // that are referencing socket state. User code should call // nng_close to release the last resources. @@ -611,20 +646,12 @@ nni_sock_close(nni_sock *sock) // The protocol needs to clean up its state. sock->s_sock_ops.sock_fini(sock->s_data); - // And we need to clean up *our* state. - while ((notify = nni_list_first(&sock->s_notify)) != NULL) { - nni_list_remove(&sock->s_notify, notify); - NNI_FREE_STRUCT(notify); - } - nni_thr_fini(&sock->s_notifier); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_ev_fini(&sock->s_send_ev); nni_ev_fini(&sock->s_recv_ev); nni_cv_fini(&sock->s_refcv); - nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_notify_mx); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); } |
