From 4250fc119057eb6a6b534e9c0758488cc5fb034e Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Tue, 24 Oct 2017 15:26:14 -0700 Subject: fixes #132 Implement saner notification for file descriptors This eliminates the "quasi-functional" notify API altogether. The aio framework will be coming soon to replace it. As a bonus, apps (legacy apps) that use the notification FDs will see improved performance, since we don't have to context switch to give them a notification. --- src/CMakeLists.txt | 2 - src/core/event.c | 27 --------- src/core/event.h | 35 ------------ src/core/msgqueue.c | 74 +++++++++++-------------- src/core/msgqueue.h | 24 +++++++- src/core/nng_impl.h | 1 - src/core/options.c | 61 +------------------- src/core/options.h | 3 - src/core/socket.c | 157 ++++++++++++++++++++++++++++------------------------ src/core/socket.h | 3 - src/nng.c | 42 -------------- src/nng.h | 62 --------------------- 12 files changed, 139 insertions(+), 352 deletions(-) delete mode 100644 src/core/event.c delete mode 100644 src/core/event.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c0080fe6..9c67a22b 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -40,8 +40,6 @@ set (NNG_SOURCES core/device.h core/endpt.c core/endpt.h - core/event.c - core/event.h core/idhash.c core/idhash.h core/init.c diff --git a/src/core/event.c b/src/core/event.c deleted file mode 100644 index ab157b02..00000000 --- a/src/core/event.c +++ /dev/null @@ -1,27 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#include "core/nng_impl.h" - -#include -#include - -void -nni_ev_init(nni_event *event, int type, nni_sock *sock) -{ - memset(event, 0, sizeof(*event)); - event->e_type = type; - event->e_sock = sock; -} - -void -nni_ev_fini(nni_event *event) -{ - NNI_ARG_UNUSED(event); -} diff --git a/src/core/event.h b/src/core/event.h deleted file mode 100644 index 9536d206..00000000 --- a/src/core/event.h +++ /dev/null @@ -1,35 +0,0 @@ -// -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV -// -// This software is supplied under the terms of the MIT License, a -// copy of which should be located in the distribution where this -// file was obtained (LICENSE.txt). A copy of the license may also be -// found online at https://opensource.org/licenses/MIT. -// - -#ifndef CORE_EVENT_H -#define CORE_EVENT_H - -#include "core/defs.h" -#include "core/list.h" - -struct nng_event { - int e_type; - nni_sock *e_sock; - nni_ep * e_ep; - nni_pipe *e_pipe; -}; - -struct nng_notify { - nng_notify_func n_func; - void * n_arg; - int n_type; - nni_sock * n_sock; - nni_aio * n_aio; -}; - -extern void nni_ev_init(nni_event *, int, nni_sock *); -extern void nni_ev_fini(nni_event *); - -#endif // CORE_EVENT_H diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index c1f3701c..3f1ffcb5 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -32,8 +32,12 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - nni_list mq_aio_notify_get; - nni_list mq_aio_notify_put; + + // Callback - this function is executed with the lock held, and + // provides information about the current queue state anytime + // a message enters or leaves the queue, or a waiter is blocked. + nni_msgq_cb mq_cb_fn; + void * mq_cb_arg; // Filters. nni_msgq_filter mq_filter_fn; @@ -63,9 +67,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); - nni_aio_list_init(&mq->mq_aio_notify_get); - nni_aio_list_init(&mq->mq_aio_notify_put); - nni_mtx_init(&mq->mq_lock); nni_cv_init(&mq->mq_drained, &mq->mq_lock); @@ -297,22 +298,25 @@ static void nni_msgq_run_notify(nni_msgq *mq) { nni_aio *aio; + if (mq->mq_cb_fn != NULL) { + int flags = 0; - if (mq->mq_closed) { - return; - } - if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) { - // This stays on the list. - nni_aio_finish(aio, 0, 0); + if (mq->mq_closed) { + flags |= nni_msgq_f_closed; } - } - - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) { - // This stays on the list. - nni_aio_finish(aio, 0, 0); + if (mq->mq_len == 0) { + flags |= nni_msgq_f_empty; + } else if (mq->mq_len == mq->mq_cap) { + flags |= nni_msgq_f_full; + } + if (mq->mq_len < mq->mq_cap || + !nni_list_empty(&mq->mq_aio_getq)) { + flags |= nni_msgq_f_can_put; + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + flags |= nni_msgq_f_can_get; } + mq->mq_cb_fn(mq->mq_cb_arg, flags); } if (mq->mq_draining) { @@ -322,6 +326,16 @@ nni_msgq_run_notify(nni_msgq *mq) } } +void +nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_cb_fn = fn; + mq->mq_cb_arg = arg; + nni_msgq_run_notify(mq); + nni_mtx_unlock(&mq->mq_lock); +} + static void nni_msgq_cancel(nni_aio *aio, int rv) { @@ -335,30 +349,6 @@ nni_msgq_cancel(nni_aio *aio, int rv) nni_mtx_unlock(&mq->mq_lock); } -void -nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio) -{ - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - nni_aio_list_append(&mq->mq_aio_notify_put, aio); - nni_mtx_unlock(&mq->mq_lock); -} - -void -nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio) -{ - nni_mtx_lock(&mq->mq_lock); - if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) { - nni_mtx_unlock(&mq->mq_lock); - return; - } - nni_aio_list_append(&mq->mq_aio_notify_get, aio); - nni_mtx_unlock(&mq->mq_lock); -} - void nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index bbac505d..ececf372 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -38,8 +38,6 @@ extern void nni_msgq_fini(nni_msgq *); extern void nni_msgq_aio_put(nni_msgq *, nni_aio *); extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); -extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); -extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); // nni_msgq_tryput performs a non-blocking attempt to put a message on // the message queue. It is the same as calling nng_msgq_put_until with @@ -83,6 +81,28 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); // discarded instead, and any get waiters remain waiting. extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); +// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb. +enum nni_msgq_cb_flags { + nni_msgq_f_full = 1, + nni_msgq_f_empty = 2, + nni_msgq_f_can_get = 4, + nni_msgq_f_can_put = 8, + nni_msgq_f_closed = 16, +}; + +// nni_msgq_cb is a callback function used by sockets to monitor +// the status of the queue. It is called with the lock held for +// performance reasons so consumers must not re-enter the queue. +// The purpose is to enable file descriptor notifications on the socket, +// which don't need to reenter the msgq. The integer is a mask of +// flags that are true for the given message queue. +typedef void (*nni_msgq_cb)(void *, int); + +// nni_msgq_set_cb sets the callback and argument for the callback +// which will be called on state changes in the message queue. Only +// one callback can be registered on a message queue at a time. +extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); + // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 905411d7..87dd2de1 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -46,7 +46,6 @@ // These have to come after the others - particularly transport.h #include "core/endpt.h" -#include "core/event.h" #include "core/pipe.h" #include "core/socket.h" diff --git a/src/core/options.c b/src/core/options.c index 3b787b82..1417d0b3 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -220,63 +220,4 @@ nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep) memcpy(val, &len, sz); *sizep = sizeof(len); return (0); -} - -static void -nni_notifyfd_push(struct nng_event *ev, void *arg) -{ - nni_notifyfd *fd = arg; - - NNI_ARG_UNUSED(ev); - - nni_plat_pipe_raise(fd->sn_wfd); -} - -int -nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp) -{ - int rv; - uint32_t flags; - - if ((*szp < sizeof(int))) { - return (NNG_EINVAL); - } - - flags = nni_sock_flags(s); - - switch (mask) { - case NNG_EV_CAN_SND: - if ((flags & NNI_PROTO_FLAG_SND) == 0) { - return (NNG_ENOTSUP); - } - break; - case NNG_EV_CAN_RCV: - if ((flags & NNI_PROTO_FLAG_RCV) == 0) { - return (NNG_ENOTSUP); - } - break; - default: - return (NNG_ENOTSUP); - } - - // If we already inited this, just give back the same file descriptor. - if (fd->sn_init) { - memcpy(val, &fd->sn_rfd, sizeof(int)); - *szp = sizeof(int); - return (0); - } - - if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { - return (rv); - } - - if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { - nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); - return (NNG_ENOMEM); - } - - fd->sn_init = 1; - *szp = sizeof(int); - memcpy(val, &fd->sn_rfd, sizeof(int)); - return (0); -} +} \ No newline at end of file diff --git a/src/core/options.h b/src/core/options.h index beeca951..d373851f 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -58,9 +58,6 @@ extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t); // nni_getopt_size obtains a size_t option. extern int nni_getopt_size(size_t, void *, size_t *); -// nni_getopt_fd obtains a notification file descriptor. -extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *); - extern int nni_chkopt_ms(const void *, size_t); extern int nni_chkopt_int(const void *, size_t, int, int); extern int nni_chkopt_size(const void *, size_t, size_t, size_t); diff --git a/src/core/socket.c b/src/core/socket.c index bc1f446d..dfa49317 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -70,23 +70,102 @@ struct nni_socket { int s_closing; // Socket is closing int s_closed; // Socket closed, protected by global lock - nni_event s_recv_ev; // Event for readability - nni_event s_send_ev; // Event for sendability - nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void +nni_sock_can_send_cb(void *arg, int flags) +{ + nni_notifyfd *fd = arg; + + if ((flags & nni_msgq_f_can_put) == 0) { + nni_plat_pipe_clear(fd->sn_rfd); + } else { + nni_plat_pipe_raise(fd->sn_wfd); + } +} + +static void +nni_sock_can_recv_cb(void *arg, int flags) +{ + nni_notifyfd *fd = arg; + + if ((flags & nni_msgq_f_can_get) == 0) { + nni_plat_pipe_clear(fd->sn_rfd); + } else { + nni_plat_pipe_raise(fd->sn_wfd); + } +} + +static int +nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp) +{ + int rv; + uint32_t flags; + nni_notifyfd *fd; + nni_msgq * mq; + nni_msgq_cb cb; + + if ((*szp < sizeof(int))) { + return (NNG_EINVAL); + } + + if ((flag & nni_sock_flags(s)) == 0) { + return (NNG_ENOTSUP); + } + + switch (flag) { + case NNI_PROTO_FLAG_SND: + fd = &s->s_send_fd; + mq = s->s_uwq; + cb = nni_sock_can_send_cb; + break; + case NNI_PROTO_FLAG_RCV: + fd = &s->s_recv_fd; + mq = s->s_urq; + cb = nni_sock_can_recv_cb; + break; + default: + nni_panic("default case!"); + } + + // If we already inited this, just give back the same file descriptor. + if (fd->sn_init) { + memcpy(val, &fd->sn_rfd, sizeof(int)); + *szp = sizeof(int); + return (0); + } + + if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { + return (rv); + } + + nni_msgq_set_cb(mq, cb, fd); + +#if 0 + if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) { + nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd); + return (NNG_ENOMEM); + } +#endif + + fd->sn_init = 1; + *szp = sizeof(int); + memcpy(val, &fd->sn_rfd, sizeof(int)); + return (0); +} + static int nni_sock_getopt_sendfd(nni_sock *s, void *buf, size_t *szp) { - return (nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, buf, szp)); + return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_SND, buf, szp)); } static int nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp) { - return (nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, buf, szp)); + return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_RCV, buf, szp)); } static int @@ -386,70 +465,6 @@ nni_sock_recv_pending(nni_sock *sock) } } -static void -nni_sock_cansend_cb(void *arg) -{ - nni_notify *notify = arg; - nni_sock * sock = notify->n_sock; - - if (nni_aio_result(notify->n_aio) != 0) { - return; - } - - notify->n_func(&sock->s_send_ev, notify->n_arg); -} - -static void -nni_sock_canrecv_cb(void *arg) -{ - nni_notify *notify = arg; - nni_sock * sock = notify->n_sock; - - if (nni_aio_result(notify->n_aio) != 0) { - return; - } - - notify->n_func(&sock->s_recv_ev, notify->n_arg); -} - -nni_notify * -nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg) -{ - nni_notify *notify; - - if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { - return (NULL); - } - - notify->n_func = fn; - notify->n_arg = arg; - notify->n_type = type; - notify->n_sock = sock; - - switch (type) { - case NNG_EV_CAN_RCV: - nni_aio_init(¬ify->n_aio, nni_sock_canrecv_cb, notify); - nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio); - break; - case NNG_EV_CAN_SND: - nni_aio_init(¬ify->n_aio, nni_sock_cansend_cb, notify); - nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio); - break; - default: - NNI_FREE_STRUCT(notify); - return (NULL); - } - - return (notify); -} - -void -nni_sock_unnotify(nni_sock *sock, nni_notify *notify) -{ - nni_aio_fini(notify->n_aio); - NNI_FREE_STRUCT(notify); -} - static void nni_sock_destroy(nni_sock *s) { @@ -481,8 +496,6 @@ nni_sock_destroy(nni_sock *s) nni_mtx_lock(&s->s_mx); nni_mtx_unlock(&s->s_mx); - nni_ev_fini(&s->s_send_ev); - nni_ev_fini(&s->s_recv_ev); nni_msgq_fini(s->s_urq); nni_msgq_fini(s->s_uwq); nni_cv_fini(&s->s_close_cv); @@ -530,8 +543,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) nni_mtx_init(&s->s_mx); nni_cv_init(&s->s_cv, &s->s_mx); nni_cv_init(&s->s_close_cv, &nni_sock_lk); - nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s); - nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s); if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || diff --git a/src/core/socket.h b/src/core/socket.h index edfe6447..6d9622e3 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -30,9 +30,6 @@ extern void nni_sock_send(nni_sock *, nni_aio *); extern void nni_sock_recv(nni_sock *, nni_aio *); extern uint32_t nni_sock_id(nni_sock *); -extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); -extern void nni_sock_unnotify(nni_sock *, nni_notify *); - // nni_sock_pipe_add adds the pipe to the socket. It is called by // the generic pipe creation code. It also adds the socket to the // ep list, and starts the pipe. It does all these to ensure that diff --git a/src/nng.c b/src/nng.c index d66c0dcb..9fc99dc3 100644 --- a/src/nng.c +++ b/src/nng.c @@ -611,48 +611,6 @@ nng_getopt_ms(nng_socket sid, const char *name, nng_duration *valp) return (nng_getopt(sid, name, valp, &sz)); } -nng_notify * -nng_setnotify(nng_socket sid, int mask, nng_notify_func fn, void *arg) -{ - nni_sock * sock; - nng_notify *notify; - int rv; - - if ((rv = nni_sock_find(&sock, sid)) != 0) { - return (NULL); - } - notify = nni_sock_notify(sock, mask, fn, arg); - nni_sock_rele(sock); - return (notify); -} - -void -nng_unsetnotify(nng_socket sid, nng_notify *notify) -{ - nni_sock *sock; - int rv; - - if ((rv = nni_sock_find(&sock, sid)) != 0) { - return; - } - nni_sock_unnotify(sock, notify); - nni_sock_rele(sock); -} - -nng_socket -nng_event_socket(nng_event *ev) -{ - // XXX: FOR NOW.... maybe evnet should contain socket Id - // instead? - return (nni_sock_id(ev->e_sock)); -} - -int -nng_event_type(nng_event *ev) -{ - return (ev->e_type); -} - int nng_device(nng_socket s1, nng_socket s2) { diff --git a/src/nng.h b/src/nng.h index d740dfc0..c228a6d7 100644 --- a/src/nng.h +++ b/src/nng.h @@ -48,8 +48,6 @@ typedef uint32_t nng_listener; typedef uint32_t nng_pipe; typedef int32_t nng_duration; // in milliseconds typedef struct nng_msg nng_msg; -typedef struct nng_event nng_event; -typedef struct nng_notify nng_notify; typedef struct nng_snapshot nng_snapshot; typedef struct nng_stat nng_stat; @@ -100,66 +98,6 @@ NNG_DECL int nng_getopt_ms(nng_socket, const char *, nng_duration *); NNG_DECL int nng_getopt_size(nng_socket, const char *, size_t *); NNG_DECL int nng_getopt_uint64(nng_socket, const char *, uint64_t *); -// nng_notify_func is a user function that is executed upon certain -// events. See below. -// -// NOTE WELL: This API is to be replaced in the future with an -// alternate API based on our AIO async I/O handles. We recommend -// against building this API too firmly into application code at -// this juncture. -typedef void (*nng_notify_func)(nng_event *, void *); - -// nng_setnotify sets a notification callback. The callback will be -// called for any of the requested events, and will be executed on a -// separate thread. Event delivery is not guaranteed, and can fail -// if events occur more quickly than the callback can handle, or -// if memory or other resources are scarce. -NNG_DECL nng_notify *nng_setnotify(nng_socket, int, nng_notify_func, void *); - -// nng_unsetnotify unregisters a previously registered notification callback. -// Once this returns, the associated callback will not be executed any longer. -// If the callback is running when this called, then it will wait until that -// callback completes. (The caller of this function should not hold any -// locks acqured by the callback, in order to avoid a deadlock.) -NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *); - -// Event types. Sockets can have multiple different kind of events. -// Note that these are edge triggered -- therefore the status indicated -// may have changed since the notification occurred. -// -// NNG_EV_CAN_RCV - A message is ready for receive. -// NNG_EV_CAN_SND - A message can be sent. -// NNG_EV_ERROR - An error condition on the socket occurred. -// NNG_EV_PIPE_ADD - A new pipe (connection) is added to the socket. -// NNG_EV_PIPE_REM - A pipe (connection) is removed from the socket. -// NNG_EV_ENDPT_ADD - An endpoint is added to the socket. -// NNG_EV_ENDPT_REM - An endpoint is removed from the socket. -#define NNG_EV_BIT(x) (1U << (x)) -enum nng_ev_bit_enum { - NNG_EV_CAN_RCV = NNG_EV_BIT(0), - NNG_EV_CAN_SND = NNG_EV_BIT(1), - NNG_EV_ERROR = NNG_EV_BIT(2), - NNG_EV_PIPE_ADD = NNG_EV_BIT(3), - NNG_EV_PIPE_REM = NNG_EV_BIT(4), - NNG_EV_DIALER_ADD = NNG_EV_BIT(5), - NNG_EV_DIALER_REM = NNG_EV_BIT(6), - NNG_EV_LISTENER_ADD = NNG_EV_BIT(7), - NNG_EV_LISTENER_REM = NNG_EV_BIT(8), - - // XXX: Remove these. - NNG_EV_ENDPT_ADD = NNG_EV_DIALER_ADD, - NNG_EV_ENDPT_REM = NNG_EV_DIALER_REM, -}; - -// The following functions return more detailed information about the event. -// Some of the values will not make sense for some event types, in which case -// the value returned will be NULL. -NNG_DECL int nng_event_type(nng_event *); - -NNG_DECL nng_socket nng_event_socket(nng_event *); - -NNG_DECL const char *nng_event_reason(nng_event *); - // nng_listen creates a listening endpoint with no special options, // and starts it listening. It is functionally equivalent to the legacy // nn_bind(). The underlying endpoint is returned back to the caller in the -- cgit v1.2.3-70-g09d2