diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-24 15:26:14 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-24 16:30:09 -0700 |
| commit | 4250fc119057eb6a6b534e9c0758488cc5fb034e (patch) | |
| tree | c64efe1ae1115cf3aacdedf365c11398c8d99ba8 /src/core | |
| parent | a7b20c3babd965b12dec8cb5ff0883a4d8d1116d (diff) | |
| download | nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.tar.gz nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.tar.bz2 nng-4250fc119057eb6a6b534e9c0758488cc5fb034e.zip | |
fixes #132 Implement saner notification for file descriptors
This eliminates the "quasi-functional" notify API altogether.
The aio framework will be coming soon to replace it.
As a bonus, apps (legacy apps) that use the notification FDs
will see improved performance, since we don't have to context
switch to give them a notification.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/event.c | 27 | ||||
| -rw-r--r-- | src/core/event.h | 35 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 74 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 24 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/options.c | 61 | ||||
| -rw-r--r-- | src/core/options.h | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 157 | ||||
| -rw-r--r-- | src/core/socket.h | 3 |
9 files changed, 139 insertions, 246 deletions
diff --git a/src/core/event.c b/src/core/event.c deleted file mode 100644 index ab157b02..00000000 --- a/src/core/event.c +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#include <stdlib.h> -#include <string.h> - -void -nni_ev_init(nni_event *event, int type, nni_sock *sock) -{ - memset(event, 0, sizeof(*event)); - event->e_type = type; - event->e_sock = sock; -} - -void -nni_ev_fini(nni_event *event) -{ - NNI_ARG_UNUSED(event); -} diff --git a/src/core/event.h b/src/core/event.h deleted file mode 100644 index 9536d206..00000000 --- a/src/core/event.h +++ /dev/null @@ -1,35 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_EVENT_H -#define CORE_EVENT_H - -#include "core/defs.h" -#include "core/list.h" - -struct nng_event { - int e_type; - nni_sock *e_sock; - nni_ep * e_ep; - nni_pipe *e_pipe; -}; - -struct nng_notify { - nng_notify_func n_func; - void * n_arg; - int n_type; - nni_sock * n_sock; - nni_aio * n_aio; -}; - -extern void nni_ev_init(nni_event *, int, nni_sock *); -extern void nni_ev_fini(nni_event *); - -#endif // CORE_EVENT_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index c1f3701c..3f1ffcb5 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -32,8 +32,12 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - nni_list mq_aio_notify_get; - nni_list mq_aio_notify_put; + + // Callback - this function is executed with the lock held, and + // provides information about the current queue state anytime + // a message enters or leaves the queue, or a waiter is blocked. + nni_msgq_cb mq_cb_fn; + void * mq_cb_arg; // Filters. nni_msgq_filter mq_filter_fn; @@ -63,9 +67,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); - nni_aio_list_init(&mq->mq_aio_notify_get); - nni_aio_list_init(&mq->mq_aio_notify_put); - nni_mtx_init(&mq->mq_lock); nni_cv_init(&mq->mq_drained, &mq->mq_lock); @@ -297,22 +298,25 @@ static void nni_msgq_run_notify(nni_msgq *mq) { nni_aio *aio; + if (mq->mq_cb_fn != NULL) { + int flags = 0; - if (mq->mq_closed) { - return; - } - if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { - // This stays on the list. - nni_aio_finish(aio, 0, 0); + if (mq->mq_closed) { + flags |= nni_msgq_f_closed; } - } - - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) { - // This stays on the list. - nni_aio_finish(aio, 0, 0); + if (mq->mq_len == 0) { + flags |= nni_msgq_f_empty; + } else if (mq->mq_len == mq->mq_cap) { + flags |= nni_msgq_f_full; + } + if (mq->mq_len < mq->mq_cap || + !nni_list_empty(&mq->mq_aio_getq)) { + flags |= nni_msgq_f_can_put; + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + flags |= nni_msgq_f_can_get; } + mq->mq_cb_fn(mq->mq_cb_arg, flags); } if (mq->mq_draining) { @@ -322,6 +326,16 @@ nni_msgq_run_notify(nni_msgq *mq) } } +void +nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_cb_fn = fn; + mq->mq_cb_arg = arg; + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); +} + static void nni_msgq_cancel(nni_aio *aio, int rv) { @@ -336,30 +350,6 @@ nni_msgq_cancel(nni_aio *aio, int rv) } void -nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) -{ - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - nni_aio_list_append(&mq->mq_aio_notify_put, aio); - nni_mtx_unlock(&mq->mq_lock); -} - -void -nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) -{ - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - nni_aio_list_append(&mq->mq_aio_notify_get, aio); - nni_mtx_unlock(&mq->mq_lock); -} - -void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { nni_mtx_lock(&mq->mq_lock); diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index bbac505d..ececf372 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -38,8 +38,6 @@ extern void nni_msgq_fini(nni_msgq *); extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); -extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); -extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); // nni_msgq_tryput performs a non-blocking attempt to put a message on // the message queue. It is the same as calling nng_msgq_put_until with @@ -83,6 +81,28 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); // discarded instead, and any get waiters remain waiting. extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); +// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb. +enum nni_msgq_cb_flags { + nni_msgq_f_full = 1, + nni_msgq_f_empty = 2, + nni_msgq_f_can_get = 4, + nni_msgq_f_can_put = 8, + nni_msgq_f_closed = 16, +}; + +// nni_msgq_cb is a callback function used by sockets to monitor +// the status of the queue. It is called with the lock held for +// performance reasons so consumers must not re-enter the queue. +// The purpose is to enable file descriptor notifications on the socket, +// which don't need to reenter the msgq. The integer is a mask of +// flags that are true for the given message queue. +typedef void (*nni_msgq_cb)(void *, int); + +// nni_msgq_set_cb sets the callback and argument for the callback +// which will be called on state changes in the message queue. Only +// one callback can be registered on a message queue at a time. +extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); + // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 905411d7..87dd2de1 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -46,7 +46,6 @@ // These have to come after the others - particularly transport.h #include "core/endpt.h" -#include "core/event.h" #include "core/pipe.h" #include "core/socket.h" diff --git a/src/core/options.c b/src/core/options.c index 3b787b82..1417d0b3 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -220,63 +220,4 @@ nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep) memcpy(val, &len, sz); *sizep = sizeof(len); return (0); -} - -static void -nni_notifyfd_push(struct nng_event *ev, void *arg) -{ - nni_notifyfd *fd = arg; - - NNI_ARG_UNUSED(ev); - - nni_plat_pipe_raise(fd->sn_wfd); -} - -int -nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp) -{ - int rv; - uint32_t flags; - - if ((*szp < sizeof(int))) { - return (NNG_EINVAL); - } - - flags = nni_sock_flags(s); - - switch (mask) { - case NNG_EV_CAN_SND: - if ((flags & NNI_PROTO_FLAG_SND) == 0) { - return (NNG_ENOTSUP); - } - break; - case NNG_EV_CAN_RCV: - if ((flags & NNI_PROTO_FLAG_RCV) == 0) { - return (NNG_ENOTSUP); - } - break; - default: - return (NNG_ENOTSUP); - } - - // If we already inited this, just give back the same file descriptor. - if (fd->sn_init) { - memcpy(val, &fd->sn_rfd, sizeof(int)); - *szp = sizeof(int); - return (0); - } - - if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { - return (rv); - } - - if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { - nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); - return (NNG_ENOMEM); - } - - fd->sn_init = 1; - *szp = sizeof(int); - memcpy(val, &fd->sn_rfd, sizeof(int)); - return (0); -} +}
\ No newline at end of file diff --git a/src/core/options.h b/src/core/options.h index beeca951..d373851f 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -58,9 +58,6 @@ extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t); // nni_getopt_size obtains a size_t option. extern int nni_getopt_size(size_t, void *, size_t *); -// nni_getopt_fd obtains a notification file descriptor. -extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *); - extern int nni_chkopt_ms(const void *, size_t); extern int nni_chkopt_int(const void *, size_t, int, int); extern int nni_chkopt_size(const void *, size_t, size_t, size_t); diff --git a/src/core/socket.c b/src/core/socket.c index bc1f446d..dfa49317 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -70,23 +70,102 @@ struct nni_socket { int s_closing; // Socket is closing int s_closed; // Socket closed, protected by global lock - nni_event s_recv_ev; // Event for readability - nni_event s_send_ev; // Event for sendability - nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void +nni_sock_can_send_cb(void *arg, int flags) +{ + nni_notifyfd *fd = arg; + + if ((flags & nni_msgq_f_can_put) == 0) { + nni_plat_pipe_clear(fd->sn_rfd); + } else { + nni_plat_pipe_raise(fd->sn_wfd); + } +} + +static void +nni_sock_can_recv_cb(void *arg, int flags) +{ + nni_notifyfd *fd = arg; + + if ((flags & nni_msgq_f_can_get) == 0) { + nni_plat_pipe_clear(fd->sn_rfd); + } else { + nni_plat_pipe_raise(fd->sn_wfd); + } +} + +static int +nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) +{ + int rv; + uint32_t flags; + nni_notifyfd *fd; + nni_msgq * mq; + nni_msgq_cb cb; + + if ((*szp < sizeof(int))) { + return (NNG_EINVAL); + } + + if ((flag & nni_sock_flags(s)) == 0) { + return (NNG_ENOTSUP); + } + + switch (flag) { + case NNI_PROTO_FLAG_SND: + fd = &s->s_send_fd; + mq = s->s_uwq; + cb = nni_sock_can_send_cb; + break; + case NNI_PROTO_FLAG_RCV: + fd = &s->s_recv_fd; + mq = s->s_urq; + cb = nni_sock_can_recv_cb; + break; + default: + nni_panic("default case!"); + } + + // If we already inited this, just give back the same file descriptor. + if (fd->sn_init) { + memcpy(val, &fd->sn_rfd, sizeof(int)); + *szp = sizeof(int); + return (0); + } + + if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { + return (rv); + } + + nni_msgq_set_cb(mq, cb, fd); + +#if 0 + if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { + nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); + return (NNG_ENOMEM); + } +#endif + + fd->sn_init = 1; + *szp = sizeof(int); + memcpy(val, &fd->sn_rfd, sizeof(int)); + return (0); +} + static int nni_sock_getopt_sendfd(nni_sock *s, void *buf, size_t *szp) { - return (nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, buf, szp)); + return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_SND, buf, szp)); } static int nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp) { - return (nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, buf, szp)); + return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_RCV, buf, szp)); } static int @@ -387,70 +466,6 @@ nni_sock_recv_pending(nni_sock *sock) } static void -nni_sock_cansend_cb(void *arg) -{ - nni_notify *notify = arg; - nni_sock * sock = notify->n_sock; - - if (nni_aio_result(notify->n_aio) != 0) { - return; - } - - notify->n_func(&sock->s_send_ev, notify->n_arg); -} - -static void -nni_sock_canrecv_cb(void *arg) -{ - nni_notify *notify = arg; - nni_sock * sock = notify->n_sock; - - if (nni_aio_result(notify->n_aio) != 0) { - return; - } - - notify->n_func(&sock->s_recv_ev, notify->n_arg); -} - -nni_notify * -nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) -{ - nni_notify *notify; - - if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { - return (NULL); - } - - notify->n_func = fn; - notify->n_arg = arg; - notify->n_type = type; - notify->n_sock = sock; - - switch (type) { - case NNG_EV_CAN_RCV: - nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio); - break; - case NNG_EV_CAN_SND: - nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio); - break; - default: - NNI_FREE_STRUCT(notify); - return (NULL); - } - - return (notify); -} - -void -nni_sock_unnotify(nni_sock *sock, nni_notify *notify) -{ - nni_aio_fini(notify->n_aio); - NNI_FREE_STRUCT(notify); -} - -static void nni_sock_destroy(nni_sock *s) { nni_sockopt *sopt; @@ -481,8 +496,6 @@ nni_sock_destroy(nni_sock *s) nni_mtx_lock(&s->s_mx); nni_mtx_unlock(&s->s_mx); - nni_ev_fini(&s->s_send_ev); - nni_ev_fini(&s->s_recv_ev); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); nni_cv_fini(&s->s_close_cv); @@ -530,8 +543,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_mtx_init(&s->s_mx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &nni_sock_lk); - nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s); - nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s); if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || diff --git a/src/core/socket.h b/src/core/socket.h index edfe6447..6d9622e3 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -30,9 +30,6 @@ extern void nni_sock_send(nni_sock *, nni_aio *); extern void nni_sock_recv(nni_sock *, nni_aio *); extern uint32_t nni_sock_id(nni_sock *); -extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); -extern void nni_sock_unnotify(nni_sock *, nni_notify *); - // nni_sock_pipe_add adds the pipe to the socket. It is called by // the generic pipe creation code. It also adds the socket to the // ep list, and starts the pipe. It does all these to ensure that |
