From 45f3f141850a0ac07c31906748752571652683df Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 16 Apr 2018 11:40:28 -0700 Subject: fixes #344 nn_poll() legacy API missing This includes the test from legacy libnanomsg and a man page. We have refactored the message queue notification system so that it uses nni_pollable, leading we hope to a more consistent system, and reducing the code size and complexity. We also fixed the size of the NN_RCVFD and NN_SNDFD so that they are a SOCKET on Windows systems, rather than an integer. This addresses 64-bit compilation problems. --- src/core/msgqueue.c | 111 +++++++++++++++++++++++++++++++--------------------- src/core/msgqueue.h | 26 ++---------- src/core/pollable.c | 24 +++++++----- src/core/socket.c | 62 ++++------------------------- 4 files changed, 92 insertions(+), 131 deletions(-) (limited to 'src/core') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 529e7f4d..212b52c2 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -31,11 +31,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - // Callback - this function is executed with the lock held, and - // provides information about the current queue state anytime - // a message enters or leaves the queue, or a waiter is blocked. - nni_msgq_cb mq_cb_fn; - void * mq_cb_arg; + // Pollable status. + nni_pollable *mq_sendable; + nni_pollable *mq_recvable; // Filters. nni_msgq_filter mq_filter_fn; @@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) NNI_FREE_STRUCT(mq); return (NNG_ENOMEM); } - nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - *mqp = mq; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_recvable = NULL; + mq->mq_sendable = NULL; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq) nni_msg_free(msg); } + if (mq->mq_sendable) { + nni_pollable_free(mq->mq_sendable); + } + if (mq->mq_recvable) { + nni_pollable_free(mq->mq_recvable); + } + nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *)); NNI_FREE_STRUCT(mq); } @@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - if (mq->mq_cb_fn != NULL) { - int flags = 0; - - if (mq->mq_closed) { - flags |= nni_msgq_f_closed; - } - if (mq->mq_len == 0) { - flags |= nni_msgq_f_empty; - } else if (mq->mq_len == mq->mq_cap) { - flags |= nni_msgq_f_full; - } - if (mq->mq_len < mq->mq_cap || - !nni_list_empty(&mq->mq_aio_getq)) { - flags |= nni_msgq_f_can_put; - } - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - flags |= nni_msgq_f_can_get; - } - mq->mq_cb_fn(mq->mq_cb_arg, flags); + if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) { + nni_pollable_raise(mq->mq_sendable); + } else { + nni_pollable_clear(mq->mq_sendable); + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + nni_pollable_raise(mq->mq_recvable); + } else { + nni_pollable_clear(mq->mq_recvable); } -} - -void -nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) -{ - nni_mtx_lock(&mq->mq_lock); - mq->mq_cb_fn = fn; - mq->mq_cb_arg = arg; - nni_msgq_run_notify(mq); - nni_mtx_unlock(&mq->mq_lock); } static void @@ -543,3 +528,39 @@ out: nni_mtx_unlock(&mq->mq_lock); return (0); } + +int +nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_recvable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_recvable; + return (0); +} + +int +nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_sendable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_sendable; + return (0); +} diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 93a26eb6..2d23f448 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -12,6 +12,7 @@ #define CORE_MSGQUEUE_H #include "nng_impl.h" +#include "pollable.h" // Message queues. Message queues work in some ways like Go channels; // they are a thread-safe way to pass messages between subsystems. They @@ -77,28 +78,6 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); // discarded instead, and any get waiters remain waiting. extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); -// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb. -enum nni_msgq_cb_flags { - nni_msgq_f_full = 1, - nni_msgq_f_empty = 2, - nni_msgq_f_can_get = 4, - nni_msgq_f_can_put = 8, - nni_msgq_f_closed = 16, -}; - -// nni_msgq_cb is a callback function used by sockets to monitor -// the status of the queue. It is called with the lock held for -// performance reasons so consumers must not re-enter the queue. -// The purpose is to enable file descriptor notifications on the socket, -// which don't need to reenter the msgq. The integer is a mask of -// flags that are true for the given message queue. -typedef void (*nni_msgq_cb)(void *, int); - -// nni_msgq_set_cb sets the callback and argument for the callback -// which will be called on state changes in the message queue. Only -// one callback can be registered on a message queue at a time. -extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); - // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. @@ -119,4 +98,7 @@ extern int nni_msgq_cap(nni_msgq *mq); // nni_msgq_len returns the number of messages currently in the queue. extern int nni_msgq_len(nni_msgq *mq); +extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **); +extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **); + #endif // CORE_MSQUEUE_H diff --git a/src/core/pollable.c b/src/core/pollable.c index b5cecf37..a121ba3f 100644 --- a/src/core/pollable.c +++ b/src/core/pollable.c @@ -52,11 +52,13 @@ nni_pollable_raise(nni_pollable *p) return; } nni_mtx_lock(&p->p_lock); - p->p_raised = true; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_raise(p->p_wfd); - return; + if (!p->p_raised) { + p->p_raised = true; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_raise(p->p_wfd); + return; + } } nni_mtx_unlock(&p->p_lock); } @@ -68,11 +70,13 @@ nni_pollable_clear(nni_pollable *p) return; } nni_mtx_lock(&p->p_lock); - p->p_raised = false; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_clear(p->p_rfd); - return; + if (p->p_raised) { + p->p_raised = false; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_clear(p->p_rfd); + return; + } } nni_mtx_unlock(&p->p_lock); } diff --git a/src/core/socket.c b/src/core/socket.c index 4021c0c6..5c28dbbb 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -92,37 +92,11 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); -static void -nni_sock_can_send_cb(void *arg, int flags) -{ - nni_notifyfd *fd = arg; - - if ((flags & nni_msgq_f_can_put) == 0) { - nni_plat_pipe_clear(fd->sn_rfd); - } else { - nni_plat_pipe_raise(fd->sn_wfd); - } -} - -static void -nni_sock_can_recv_cb(void *arg, int flags) -{ - nni_notifyfd *fd = arg; - - if ((flags & nni_msgq_f_can_get) == 0) { - nni_plat_pipe_clear(fd->sn_rfd); - } else { - nni_plat_pipe_raise(fd->sn_wfd); - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { int rv; - nni_notifyfd *fd; - nni_msgq * mq; - nni_msgq_cb cb; + nni_pollable *p; if ((flag & nni_sock_flags(s)) == 0) { return (NNG_ENOTSUP); @@ -130,41 +104,21 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) switch (flag) { case NNI_PROTO_FLAG_SND: - fd = &s->s_send_fd; - mq = s->s_uwq; - cb = nni_sock_can_send_cb; + rv = nni_msgq_get_sendable(s->s_uwq, &p); break; case NNI_PROTO_FLAG_RCV: - fd = &s->s_recv_fd; - mq = s->s_urq; - cb = nni_sock_can_recv_cb; + rv = nni_msgq_get_recvable(s->s_urq, &p); break; default: - // This should never occur. - return (NNG_EINVAL); + rv = NNG_EINVAL; + break; } - // Open if not already done. - if (!fd->sn_init) { - if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { - return (rv); - } - - // Only set the callback on the message queue if we are - // using it. The message queue automatically updates - // the pipe when the callback is first established. - // If we are not using the message queue, then we have - // to update the initial state explicitly ourselves. - - if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { - nni_msgq_set_cb(mq, cb, fd); - } - - fd->sn_init = 1; + if (rv == 0) { + rv = nni_pollable_getfd(p, fdp); } - *fdp = fd->sn_rfd; - return (0); + return (rv); } static int -- cgit v1.2.3-70-g09d2