From 45f3f141850a0ac07c31906748752571652683df Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 16 Apr 2018 11:40:28 -0700 Subject: fixes #344 nn_poll() legacy API missing This includes the test from legacy libnanomsg and a man page. We have refactored the message queue notification system so that it uses nni_pollable, leading we hope to a more consistent system, and reducing the code size and complexity. We also fixed the size of the NN_RCVFD and NN_SNDFD so that they are a SOCKET on Windows systems, rather than an integer. This addresses 64-bit compilation problems. --- src/core/msgqueue.c | 111 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 66 insertions(+), 45 deletions(-) (limited to 'src/core/msgqueue.c') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 529e7f4d..212b52c2 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -31,11 +31,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - // Callback - this function is executed with the lock held, and - // provides information about the current queue state anytime - // a message enters or leaves the queue, or a waiter is blocked. - nni_msgq_cb mq_cb_fn; - void * mq_cb_arg; + // Pollable status. + nni_pollable *mq_sendable; + nni_pollable *mq_recvable; // Filters. nni_msgq_filter mq_filter_fn; @@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) NNI_FREE_STRUCT(mq); return (NNG_ENOMEM); } - nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - *mqp = mq; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_recvable = NULL; + mq->mq_sendable = NULL; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq) nni_msg_free(msg); } + if (mq->mq_sendable) { + nni_pollable_free(mq->mq_sendable); + } + if (mq->mq_recvable) { + nni_pollable_free(mq->mq_recvable); + } + nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *)); NNI_FREE_STRUCT(mq); } @@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - if (mq->mq_cb_fn != NULL) { - int flags = 0; - - if (mq->mq_closed) { - flags |= nni_msgq_f_closed; - } - if (mq->mq_len == 0) { - flags |= nni_msgq_f_empty; - } else if (mq->mq_len == mq->mq_cap) { - flags |= nni_msgq_f_full; - } - if (mq->mq_len < mq->mq_cap || - !nni_list_empty(&mq->mq_aio_getq)) { - flags |= nni_msgq_f_can_put; - } - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - flags |= nni_msgq_f_can_get; - } - mq->mq_cb_fn(mq->mq_cb_arg, flags); + if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) { + nni_pollable_raise(mq->mq_sendable); + } else { + nni_pollable_clear(mq->mq_sendable); + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + nni_pollable_raise(mq->mq_recvable); + } else { + nni_pollable_clear(mq->mq_recvable); } -} - -void -nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) -{ - nni_mtx_lock(&mq->mq_lock); - mq->mq_cb_fn = fn; - mq->mq_cb_arg = arg; - nni_msgq_run_notify(mq); - nni_mtx_unlock(&mq->mq_lock); } static void @@ -543,3 +528,39 @@ out: nni_mtx_unlock(&mq->mq_lock); return (0); } + +int +nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_recvable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_recvable; + return (0); +} + +int +nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_sendable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_sendable; + return (0); +} -- cgit v1.2.3-70-g09d2