From 45f3f141850a0ac07c31906748752571652683df Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 16 Apr 2018 11:40:28 -0700 Subject: fixes #344 nn_poll() legacy API missing This includes the test from legacy libnanomsg and a man page. We have refactored the message queue notification system so that it uses nni_pollable, leading we hope to a more consistent system, and reducing the code size and complexity. We also fixed the size of the NN_RCVFD and NN_SNDFD so that they are a SOCKET on Windows systems, rather than an integer. This addresses 64-bit compilation problems. --- src/compat/nanomsg/nn.c | 152 +++++++++++++++++++++++++++++++++++++++++++++++- src/core/msgqueue.c | 111 +++++++++++++++++++++-------------- src/core/msgqueue.h | 26 ++------- src/core/pollable.c | 24 ++++---- src/core/socket.c | 62 +++----------------- 5 files changed, 242 insertions(+), 133 deletions(-) (limited to 'src') diff --git a/src/compat/nanomsg/nn.c b/src/compat/nanomsg/nn.c index cbb88a27..40671cfa 100644 --- a/src/compat/nanomsg/nn.c +++ b/src/compat/nanomsg/nn.c @@ -664,6 +664,39 @@ nn_getdomain(nng_socket s, void *valp, size_t *szp) return (0); } +#ifndef NNG_PLATFORM_WINDOWS +#define SOCKET int +#endif + +static int +nn_getfd(nng_socket s, void *valp, size_t *szp, const char *opt) +{ + int ifd; + int rv; + SOCKET sfd; + + if ((rv = nng_getopt_int(s, opt, &ifd)) != 0) { + nn_seterror(rv); + return (-1); + } + sfd = (SOCKET) ifd; + memcpy(valp, &sfd, *szp < sizeof(sfd) ? *szp : sizeof(sfd)); + *szp = sizeof(sfd); + return (0); +} + +static int +nn_getrecvfd(nng_socket s, void *valp, size_t *szp) +{ + return (nn_getfd(s, valp, szp, NNG_OPT_RECVFD)); +} + +static int +nn_getsendfd(nng_socket s, void *valp, size_t *szp) +{ + return (nn_getfd(s, valp, szp, NNG_OPT_SENDFD)); +} + static int nn_getzero(nng_socket s, void *valp, size_t *szp) { @@ -727,10 +760,16 @@ static const struct { .opt = NNG_OPT_RECONNMAXT, }, { - .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDFD, .opt = NNG_OPT_SENDFD, + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_SNDFD, + .opt = NNG_OPT_SENDFD, + .get = nn_getsendfd, }, { - .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVFD, .opt = NNG_OPT_RECVFD, + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RCVFD, + .opt = NNG_OPT_RECVFD, + .get = nn_getrecvfd, }, { .nnlevel = NN_SOL_SOCKET, @@ -888,6 +927,115 @@ nn_device(int s1, int s2) return (-1); } +// Windows stuff. +#ifdef NNG_PLATFORM_WINDOWS +#define poll WSAPoll +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#include +#include +#elif defined NNG_PLATFORM_POSIX +#include +#endif + +int +nn_poll(struct nn_pollfd *fds, int nfds, int timeout) +{ +// This function is rather unfortunate. poll() is available +// on POSIX, and on Windows as WSAPoll. On other systems it might +// not exist at all. We could also benefit from using a notification +// that didn't have to access file descriptors... sort of access to +// the pollable element on the socket. We don't have that, so we +// just use poll. This function is definitely suboptimal compared to +// using callbacks. +#if defined(NNG_PLATFORM_WINDOWS) || defined(NNG_PLATFORM_POSIX) + struct pollfd *pfd; + int npfd; + int rv; + + if ((pfd = NNI_ALLOC_STRUCTS(pfd, nfds * 2)) == NULL) { + errno = ENOMEM; + return (-1); + } + + // First prepare the master polling structure. + npfd = 0; + for (int i = 0; i < nfds; i++) { + int fd; + if (fds[i].events & NN_POLLIN) { + if ((rv = nng_getopt_int((nng_socket) fds[i].fd, + NNG_OPT_RECVFD, &fd)) != 0) { + nn_seterror(rv); + NNI_FREE_STRUCTS(pfd, nfds * 2); + return (-1); + } +#ifdef NNG_PLATFORM_WINDOWS + pfd[npfd].fd = (SOCKET) fd; +#else + pfd[npfd].fd = fd; +#endif + pfd[npfd].events = POLLIN; + npfd++; + } + if (fds[i].events & NN_POLLOUT) { + if ((rv = nng_getopt_int((nng_socket) fds[i].fd, + NNG_OPT_SENDFD, &fd)) != 0) { + nn_seterror(rv); + NNI_FREE_STRUCTS(pfd, nfds * 2); + return (-1); + } +#ifdef NNG_PLATFORM_WINDOWS + pfd[npfd].fd = (SOCKET) fd; +#else + pfd[npfd].fd = fd; +#endif + pfd[npfd].events = POLLIN; + npfd++; + } + } + + rv = poll(pfd, npfd, timeout); + if (rv < 0) { + int e = errno; + NNI_FREE_STRUCTS(pfd, nfds * 2); + errno = e; + return (-1); + } + + // Now update the nn_poll from the system poll. + npfd = 0; + rv = 0; + for (int i = 0; i < nfds; i++) { + fds[i].revents = 0; + if (fds[i].events & NN_POLLIN) { + if (pfd[npfd].revents & POLLIN) { + fds[i].revents |= NN_POLLIN; + } + npfd++; + } + if (fds[i].events & NN_POLLOUT) { + if (pfd[npfd].revents & POLLIN) { + fds[i].revents |= NN_POLLOUT; + } + npfd++; + } + if (fds[i].revents) { + rv++; + } + } + NNI_FREE_STRUCTS(pfd, nfds * 2); + return (rv); + +#else // NNG_PLATFORM_WINDOWS or NNG_PLATFORM_POSIX + NNI_ARG_UNUSED(pfds); + NNI_ARG_UNUSED(npfd); + NNI_ARG_UNUSED(timeout); + errno = ENOTSUP; + return (-1); +#endif +} + // nn_term is suitable only for shutting down the entire library, // and is not thread-safe with other functions. void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 529e7f4d..212b52c2 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -31,11 +31,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - // Callback - this function is executed with the lock held, and - // provides information about the current queue state anytime - // a message enters or leaves the queue, or a waiter is blocked. - nni_msgq_cb mq_cb_fn; - void * mq_cb_arg; + // Pollable status. + nni_pollable *mq_sendable; + nni_pollable *mq_recvable; // Filters. nni_msgq_filter mq_filter_fn; @@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) NNI_FREE_STRUCT(mq); return (NNG_ENOMEM); } - nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - *mqp = mq; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_recvable = NULL; + mq->mq_sendable = NULL; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq) nni_msg_free(msg); } + if (mq->mq_sendable) { + nni_pollable_free(mq->mq_sendable); + } + if (mq->mq_recvable) { + nni_pollable_free(mq->mq_recvable); + } + nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *)); NNI_FREE_STRUCT(mq); } @@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - if (mq->mq_cb_fn != NULL) { - int flags = 0; - - if (mq->mq_closed) { - flags |= nni_msgq_f_closed; - } - if (mq->mq_len == 0) { - flags |= nni_msgq_f_empty; - } else if (mq->mq_len == mq->mq_cap) { - flags |= nni_msgq_f_full; - } - if (mq->mq_len < mq->mq_cap || - !nni_list_empty(&mq->mq_aio_getq)) { - flags |= nni_msgq_f_can_put; - } - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - flags |= nni_msgq_f_can_get; - } - mq->mq_cb_fn(mq->mq_cb_arg, flags); + if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) { + nni_pollable_raise(mq->mq_sendable); + } else { + nni_pollable_clear(mq->mq_sendable); + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + nni_pollable_raise(mq->mq_recvable); + } else { + nni_pollable_clear(mq->mq_recvable); } -} - -void -nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) -{ - nni_mtx_lock(&mq->mq_lock); - mq->mq_cb_fn = fn; - mq->mq_cb_arg = arg; - nni_msgq_run_notify(mq); - nni_mtx_unlock(&mq->mq_lock); } static void @@ -543,3 +528,39 @@ out: nni_mtx_unlock(&mq->mq_lock); return (0); } + +int +nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_recvable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_recvable; + return (0); +} + +int +nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_sendable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_sendable; + return (0); +} diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 93a26eb6..2d23f448 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -12,6 +12,7 @@ #define CORE_MSGQUEUE_H #include "nng_impl.h" +#include "pollable.h" // Message queues. Message queues work in some ways like Go channels; // they are a thread-safe way to pass messages between subsystems. They @@ -77,28 +78,6 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); // discarded instead, and any get waiters remain waiting. extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); -// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb. -enum nni_msgq_cb_flags { - nni_msgq_f_full = 1, - nni_msgq_f_empty = 2, - nni_msgq_f_can_get = 4, - nni_msgq_f_can_put = 8, - nni_msgq_f_closed = 16, -}; - -// nni_msgq_cb is a callback function used by sockets to monitor -// the status of the queue. It is called with the lock held for -// performance reasons so consumers must not re-enter the queue. -// The purpose is to enable file descriptor notifications on the socket, -// which don't need to reenter the msgq. The integer is a mask of -// flags that are true for the given message queue. -typedef void (*nni_msgq_cb)(void *, int); - -// nni_msgq_set_cb sets the callback and argument for the callback -// which will be called on state changes in the message queue. Only -// one callback can be registered on a message queue at a time. -extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); - // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. @@ -119,4 +98,7 @@ extern int nni_msgq_cap(nni_msgq *mq); // nni_msgq_len returns the number of messages currently in the queue. extern int nni_msgq_len(nni_msgq *mq); +extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **); +extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **); + #endif // CORE_MSQUEUE_H diff --git a/src/core/pollable.c b/src/core/pollable.c index b5cecf37..a121ba3f 100644 --- a/src/core/pollable.c +++ b/src/core/pollable.c @@ -52,11 +52,13 @@ nni_pollable_raise(nni_pollable *p) return; } nni_mtx_lock(&p->p_lock); - p->p_raised = true; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_raise(p->p_wfd); - return; + if (!p->p_raised) { + p->p_raised = true; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_raise(p->p_wfd); + return; + } } nni_mtx_unlock(&p->p_lock); } @@ -68,11 +70,13 @@ nni_pollable_clear(nni_pollable *p) return; } nni_mtx_lock(&p->p_lock); - p->p_raised = false; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_clear(p->p_rfd); - return; + if (p->p_raised) { + p->p_raised = false; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_clear(p->p_rfd); + return; + } } nni_mtx_unlock(&p->p_lock); } diff --git a/src/core/socket.c b/src/core/socket.c index 4021c0c6..5c28dbbb 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -92,37 +92,11 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); -static void -nni_sock_can_send_cb(void *arg, int flags) -{ - nni_notifyfd *fd = arg; - - if ((flags & nni_msgq_f_can_put) == 0) { - nni_plat_pipe_clear(fd->sn_rfd); - } else { - nni_plat_pipe_raise(fd->sn_wfd); - } -} - -static void -nni_sock_can_recv_cb(void *arg, int flags) -{ - nni_notifyfd *fd = arg; - - if ((flags & nni_msgq_f_can_get) == 0) { - nni_plat_pipe_clear(fd->sn_rfd); - } else { - nni_plat_pipe_raise(fd->sn_wfd); - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { int rv; - nni_notifyfd *fd; - nni_msgq * mq; - nni_msgq_cb cb; + nni_pollable *p; if ((flag & nni_sock_flags(s)) == 0) { return (NNG_ENOTSUP); @@ -130,41 +104,21 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) switch (flag) { case NNI_PROTO_FLAG_SND: - fd = &s->s_send_fd; - mq = s->s_uwq; - cb = nni_sock_can_send_cb; + rv = nni_msgq_get_sendable(s->s_uwq, &p); break; case NNI_PROTO_FLAG_RCV: - fd = &s->s_recv_fd; - mq = s->s_urq; - cb = nni_sock_can_recv_cb; + rv = nni_msgq_get_recvable(s->s_urq, &p); break; default: - // This should never occur. - return (NNG_EINVAL); + rv = NNG_EINVAL; + break; } - // Open if not already done. - if (!fd->sn_init) { - if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { - return (rv); - } - - // Only set the callback on the message queue if we are - // using it. The message queue automatically updates - // the pipe when the callback is first established. - // If we are not using the message queue, then we have - // to update the initial state explicitly ourselves. - - if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { - nni_msgq_set_cb(mq, cb, fd); - } - - fd->sn_init = 1; + if (rv == 0) { + rv = nni_pollable_getfd(p, fdp); } - *fdp = fd->sn_rfd; - return (0); + return (rv); } static int -- cgit v1.2.3-70-g09d2