diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-24 15:26:14 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-24 16:30:09 -0700 |
| commit | 4250fc119057eb6a6b534e9c0758488cc5fb034e (patch) | |
| tree | c64efe1ae1115cf3aacdedf365c11398c8d99ba8 /src/core/socket.c | |
| parent | a7b20c3babd965b12dec8cb5ff0883a4d8d1116d (diff) | |
| download | nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.tar.gz nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.tar.bz2 nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.zip | |
fixes #132 Implement saner notification for file descriptors
This eliminates the "quasi-functional" notify API altogether.
The aio framework will be coming soon to replace it.
As a bonus, apps (legacy apps) that use the notification FDs
will see improved performance, since we don't have to context
switch to give them a notification.
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 157 |
1 files changed, 84 insertions, 73 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index bc1f446d..dfa49317 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -70,23 +70,102 @@ struct nni_socket { int s_closing; // Socket is closing int s_closed; // Socket closed, protected by global lock - nni_event s_recv_ev; // Event for readability - nni_event s_send_ev; // Event for sendability - nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void +nni_sock_can_send_cb(void *arg, int flags) +{ + nni_notifyfd *fd = arg; + + if ((flags & nni_msgq_f_can_put) == 0) { + nni_plat_pipe_clear(fd->sn_rfd); + } else { + nni_plat_pipe_raise(fd->sn_wfd); + } +} + +static void +nni_sock_can_recv_cb(void *arg, int flags) +{ + nni_notifyfd *fd = arg; + + if ((flags & nni_msgq_f_can_get) == 0) { + nni_plat_pipe_clear(fd->sn_rfd); + } else { + nni_plat_pipe_raise(fd->sn_wfd); + } +} + +static int +nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) +{ + int rv; + uint32_t flags; + nni_notifyfd *fd; + nni_msgq * mq; + nni_msgq_cb cb; + + if ((*szp < sizeof(int))) { + return (NNG_EINVAL); + } + + if ((flag & nni_sock_flags(s)) == 0) { + return (NNG_ENOTSUP); + } + + switch (flag) { + case NNI_PROTO_FLAG_SND: + fd = &s->s_send_fd; + mq = s->s_uwq; + cb = nni_sock_can_send_cb; + break; + case NNI_PROTO_FLAG_RCV: + fd = &s->s_recv_fd; + mq = s->s_urq; + cb = nni_sock_can_recv_cb; + break; + default: + nni_panic("default case!"); + } + + // If we already inited this, just give back the same file descriptor. + if (fd->sn_init) { + memcpy(val, &fd->sn_rfd, sizeof(int)); + *szp = sizeof(int); + return (0); + } + + if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { + return (rv); + } + + nni_msgq_set_cb(mq, cb, fd); + +#if 0 + if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { + nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); + return (NNG_ENOMEM); + } +#endif + + fd->sn_init = 1; + *szp = sizeof(int); + memcpy(val, &fd->sn_rfd, sizeof(int)); + return (0); +} + static int nni_sock_getopt_sendfd(nni_sock *s, void *buf, size_t *szp) { - return (nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, buf, szp)); + return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_SND, buf, szp)); } static int nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp) { - return (nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, buf, szp)); + return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_RCV, buf, szp)); } static int @@ -387,70 +466,6 @@ nni_sock_recv_pending(nni_sock *sock) } static void -nni_sock_cansend_cb(void *arg) -{ - nni_notify *notify = arg; - nni_sock * sock = notify->n_sock; - - if (nni_aio_result(notify->n_aio) != 0) { - return; - } - - notify->n_func(&sock->s_send_ev, notify->n_arg); -} - -static void -nni_sock_canrecv_cb(void *arg) -{ - nni_notify *notify = arg; - nni_sock * sock = notify->n_sock; - - if (nni_aio_result(notify->n_aio) != 0) { - return; - } - - notify->n_func(&sock->s_recv_ev, notify->n_arg); -} - -nni_notify * -nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) -{ - nni_notify *notify; - - if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { - return (NULL); - } - - notify->n_func = fn; - notify->n_arg = arg; - notify->n_type = type; - notify->n_sock = sock; - - switch (type) { - case NNG_EV_CAN_RCV: - nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio); - break; - case NNG_EV_CAN_SND: - nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio); - break; - default: - NNI_FREE_STRUCT(notify); - return (NULL); - } - - return (notify); -} - -void -nni_sock_unnotify(nni_sock *sock, nni_notify *notify) -{ - nni_aio_fini(notify->n_aio); - NNI_FREE_STRUCT(notify); -} - -static void nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; @@ -481,8 +496,6 @@ nni_sock_destroy(nni_sock *s) nni_mtx_lock(&s->s_mx); nni_mtx_unlock(&s->s_mx); - nni_ev_fini(&s->s_send_ev); - nni_ev_fini(&s->s_recv_ev); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); nni_cv_fini(&s->s_close_cv); @@ -530,8 +543,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_mtx_init(&s->s_mx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &nni_sock_lk); - nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s); - nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s); if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || |
