From 45f3f141850a0ac07c31906748752571652683df Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 16 Apr 2018 11:40:28 -0700 Subject: fixes #344 nn_poll() legacy API missing This includes the test from legacy libnanomsg and a man page. We have refactored the message queue notification system so that it uses nni_pollable, leading we hope to a more consistent system, and reducing the code size and complexity. We also fixed the size of the NN_RCVFD and NN_SNDFD so that they are a SOCKET on Windows systems, rather than an integer. This addresses 64-bit compilation problems. --- docs/man/nn_getsockopt.3compat.adoc | 6 +- docs/man/nn_poll.3compat.adoc | 101 ++++++++++++++++++ docs/man/nng_compat.3compat.adoc | 4 +- src/compat/nanomsg/nn.c | 152 ++++++++++++++++++++++++++- src/core/msgqueue.c | 111 ++++++++++++-------- src/core/msgqueue.h | 26 +---- src/core/pollable.c | 24 +++-- src/core/socket.c | 62 ++--------- tests/CMakeLists.txt | 1 + tests/compat_poll.c | 201 ++++++++++++++++++++++++++++++++++++ tests/compat_testutil.h | 1 + 11 files changed, 553 insertions(+), 136 deletions(-) create mode 100644 docs/man/nn_poll.3compat.adoc create mode 100644 tests/compat_poll.c diff --git a/docs/man/nn_getsockopt.3compat.adoc b/docs/man/nn_getsockopt.3compat.adoc index 04c024bc..f4988093 100644 --- a/docs/man/nn_getsockopt.3compat.adoc +++ b/docs/man/nn_getsockopt.3compat.adoc @@ -148,15 +148,19 @@ This option returns a file descriptor suitable for use in with `poll()` or `select()` (or other system-specific polling functions). This descriptor will be readable when a message is available for receiving at the socket. +This option is of type `int` on all systems except Windows, where it is of +type `SOCKET`. NOTE: The file descriptor should not be read or written by the application, and is not the same as any underlying descriptor used for network sockets. -`NN_RCVFD`:: +`NN_SNDFD`:: This option returns a file descriptor suitable for use in with `poll()` or `select()` (or other system-specific polling functions). This descriptor will be readable when the socket is able to accept a message for sending. +This option is of type `int` on all systems except Windows, where it is of +type `SOCKET`. NOTE: The file descriptor should not be read or written by the application, and is not the same as any underlying descriptor used for network sockets. diff --git a/docs/man/nn_poll.3compat.adoc b/docs/man/nn_poll.3compat.adoc new file mode 100644 index 00000000..0a1f02e6 --- /dev/null +++ b/docs/man/nn_poll.3compat.adoc @@ -0,0 +1,101 @@ += nn_poll(3compat) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nn_poll - poll sockets (compatible API) + +== SYNOPSIS + +[source, c] +---- +#include + +#define NN_POLLIN 1 +#define NN_POLLOUT 2 + +struct nn_pollfd { + int fd; + uint16_t events; + uint16_t revents; +}; + +int nn_poll(struct nn_pollfd *pfds, int npfd, int timeout); +---- + +== DESCRIPTION + +The `nn_poll()` function polls a group of sockets for readiness to send or receive. + +NOTE: This function is provided for API +<> with legacy _libnanomsg_. +Consider using the relevant <> instead. + +The array of _nfds_ sockets to poll for are passed into _pfds_. +Each member of this array is initialized with the `fd` field set to +the socket, and the `events` field set to a mask that can contain either or both +of the flags `NN_POLLIN` and `NN_POLLOUT`. + +The flag `NN_POLLIN` indicates that a socket is ready for receiving without +blocking (a message is avaiable on the socket), and the flag `NN_POLLOUT` +indicates that a socket is ready for sending without blocking. + +Upon success, the function returns the number of updates the `revents` +field of each member of the _pfds_ array, setting it to indicate +whether the requested status is true or not. + +NOTE: The `revents` field will only have a flag set if the corresponding +flag was also set in the `events` field. + +If the _timeout_ field is positive, then this function will wait for +up the that many milliseconds. +If none of the requested events occurs before that timeout occurs, then +the function will return -1 and set the error to `ETIMEDOUT`. + +If the _timeout_ is zero, then this function will return immediately, +after updating the current status of the sockets. + +If the _timeout_ is -1, then the function waits forever, or until one of the +requested events occurs. + +WARNING: This function is only suitable for use with sockets obtained with the +`<>`` function, and is not compatible +with file descriptors obtained via any other means. +This includes file descriptors obtained using the `NN_SNDFD` or `NN_RCVFD` +options with `<>` + +NOTE: This function is significantly less efficient than other polling +or asynchronous I/O mechanisms, and is provided for API compatibility only. +It's use is discouraged. + +NOTE: This function is *not* supported on systems other than POSIX derived +platforms and Windows. + +== RETURN VALUES + +This function returns the number of sockets with events on success, or -1 on error. + +== ERRORS + +[horizontal] +`ENOMEM`:: Insufficient memory available. +`EBADF`:: One of the sockets is not open. +`ETIMEDOUT`:: Operation timed out. +`ENOTSUP`:: This function is not supported on this platform. + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_compat.3compat.adoc b/docs/man/nng_compat.3compat.adoc index 5493e41f..9029fb05 100644 --- a/docs/man/nng_compat.3compat.adoc +++ b/docs/man/nng_compat.3compat.adoc @@ -63,7 +63,7 @@ ifndef::backend-pdf[] `<>`:: receive data `<>`:: shut down endpoint `<>`:: close socket -//`nn_poll()`:: poll sockets +`<>`:: poll sockets `<>`:: create forwarding device `<>`:: receive message `<>`:: send message @@ -89,7 +89,7 @@ ifdef::backend-pdf[] |`<>`|receive data |`<>`|shut down endpoint |`<>`|close socket -//|`nn_poll()`|poll sockets +|`<>`|poll sockets |`<>`|create forwarding device |`<>`|receive message |`<>`|send message diff --git a/src/compat/nanomsg/nn.c b/src/compat/nanomsg/nn.c index cbb88a27..40671cfa 100644 --- a/src/compat/nanomsg/nn.c +++ b/src/compat/nanomsg/nn.c @@ -664,6 +664,39 @@ nn_getdomain(nng_socket s, void *valp, size_t *szp) return (0); } +#ifndef NNG_PLATFORM_WINDOWS +#define SOCKET int +#endif + +static int +nn_getfd(nng_socket s, void *valp, size_t *szp, const char *opt) +{ + int ifd; + int rv; + SOCKET sfd; + + if ((rv = nng_getopt_int(s, opt, &ifd)) != 0) { + nn_seterror(rv); + return (-1); + } + sfd = (SOCKET) ifd; + memcpy(valp, &sfd, *szp < sizeof(sfd) ? *szp : sizeof(sfd)); + *szp = sizeof(sfd); + return (0); +} + +static int +nn_getrecvfd(nng_socket s, void *valp, size_t *szp) +{ + return (nn_getfd(s, valp, szp, NNG_OPT_RECVFD)); +} + +static int +nn_getsendfd(nng_socket s, void *valp, size_t *szp) +{ + return (nn_getfd(s, valp, szp, NNG_OPT_SENDFD)); +} + static int nn_getzero(nng_socket s, void *valp, size_t *szp) { @@ -727,10 +760,16 @@ static const struct { .opt = NNG_OPT_RECONNMAXT, }, { - .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDFD, .opt = NNG_OPT_SENDFD, + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_SNDFD, + .opt = NNG_OPT_SENDFD, + .get = nn_getsendfd, }, { - .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVFD, .opt = NNG_OPT_RECVFD, + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RCVFD, + .opt = NNG_OPT_RECVFD, + .get = nn_getrecvfd, }, { .nnlevel = NN_SOL_SOCKET, @@ -888,6 +927,115 @@ nn_device(int s1, int s2) return (-1); } +// Windows stuff. +#ifdef NNG_PLATFORM_WINDOWS +#define poll WSAPoll +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#include +#include +#elif defined NNG_PLATFORM_POSIX +#include +#endif + +int +nn_poll(struct nn_pollfd *fds, int nfds, int timeout) +{ +// This function is rather unfortunate. poll() is available +// on POSIX, and on Windows as WSAPoll. On other systems it might +// not exist at all. We could also benefit from using a notification +// that didn't have to access file descriptors... sort of access to +// the pollable element on the socket. We don't have that, so we +// just use poll. This function is definitely suboptimal compared to +// using callbacks. +#if defined(NNG_PLATFORM_WINDOWS) || defined(NNG_PLATFORM_POSIX) + struct pollfd *pfd; + int npfd; + int rv; + + if ((pfd = NNI_ALLOC_STRUCTS(pfd, nfds * 2)) == NULL) { + errno = ENOMEM; + return (-1); + } + + // First prepare the master polling structure. + npfd = 0; + for (int i = 0; i < nfds; i++) { + int fd; + if (fds[i].events & NN_POLLIN) { + if ((rv = nng_getopt_int((nng_socket) fds[i].fd, + NNG_OPT_RECVFD, &fd)) != 0) { + nn_seterror(rv); + NNI_FREE_STRUCTS(pfd, nfds * 2); + return (-1); + } +#ifdef NNG_PLATFORM_WINDOWS + pfd[npfd].fd = (SOCKET) fd; +#else + pfd[npfd].fd = fd; +#endif + pfd[npfd].events = POLLIN; + npfd++; + } + if (fds[i].events & NN_POLLOUT) { + if ((rv = nng_getopt_int((nng_socket) fds[i].fd, + NNG_OPT_SENDFD, &fd)) != 0) { + nn_seterror(rv); + NNI_FREE_STRUCTS(pfd, nfds * 2); + return (-1); + } +#ifdef NNG_PLATFORM_WINDOWS + pfd[npfd].fd = (SOCKET) fd; +#else + pfd[npfd].fd = fd; +#endif + pfd[npfd].events = POLLIN; + npfd++; + } + } + + rv = poll(pfd, npfd, timeout); + if (rv < 0) { + int e = errno; + NNI_FREE_STRUCTS(pfd, nfds * 2); + errno = e; + return (-1); + } + + // Now update the nn_poll from the system poll. + npfd = 0; + rv = 0; + for (int i = 0; i < nfds; i++) { + fds[i].revents = 0; + if (fds[i].events & NN_POLLIN) { + if (pfd[npfd].revents & POLLIN) { + fds[i].revents |= NN_POLLIN; + } + npfd++; + } + if (fds[i].events & NN_POLLOUT) { + if (pfd[npfd].revents & POLLIN) { + fds[i].revents |= NN_POLLOUT; + } + npfd++; + } + if (fds[i].revents) { + rv++; + } + } + NNI_FREE_STRUCTS(pfd, nfds * 2); + return (rv); + +#else // NNG_PLATFORM_WINDOWS or NNG_PLATFORM_POSIX + NNI_ARG_UNUSED(pfds); + NNI_ARG_UNUSED(npfd); + NNI_ARG_UNUSED(timeout); + errno = ENOTSUP; + return (-1); +#endif +} + // nn_term is suitable only for shutting down the entire library, // and is not thread-safe with other functions. void diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 529e7f4d..212b52c2 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -31,11 +31,9 @@ struct nni_msgq { nni_list mq_aio_putq; nni_list mq_aio_getq; - // Callback - this function is executed with the lock held, and - // provides information about the current queue state anytime - // a message enters or leaves the queue, or a waiter is blocked. - nni_msgq_cb mq_cb_fn; - void * mq_cb_arg; + // Pollable status. + nni_pollable *mq_sendable; + nni_pollable *mq_recvable; // Filters. nni_msgq_filter mq_filter_fn; @@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) NNI_FREE_STRUCT(mq); return (NNG_ENOMEM); } - nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - *mqp = mq; + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_recvable = NULL; + mq->mq_sendable = NULL; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq) nni_msg_free(msg); } + if (mq->mq_sendable) { + nni_pollable_free(mq->mq_sendable); + } + if (mq->mq_recvable) { + nni_pollable_free(mq->mq_recvable); + } + nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *)); NNI_FREE_STRUCT(mq); } @@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq) static void nni_msgq_run_notify(nni_msgq *mq) { - if (mq->mq_cb_fn != NULL) { - int flags = 0; - - if (mq->mq_closed) { - flags |= nni_msgq_f_closed; - } - if (mq->mq_len == 0) { - flags |= nni_msgq_f_empty; - } else if (mq->mq_len == mq->mq_cap) { - flags |= nni_msgq_f_full; - } - if (mq->mq_len < mq->mq_cap || - !nni_list_empty(&mq->mq_aio_getq)) { - flags |= nni_msgq_f_can_put; - } - if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { - flags |= nni_msgq_f_can_get; - } - mq->mq_cb_fn(mq->mq_cb_arg, flags); + if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) { + nni_pollable_raise(mq->mq_sendable); + } else { + nni_pollable_clear(mq->mq_sendable); + } + if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) { + nni_pollable_raise(mq->mq_recvable); + } else { + nni_pollable_clear(mq->mq_recvable); } -} - -void -nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg) -{ - nni_mtx_lock(&mq->mq_lock); - mq->mq_cb_fn = fn; - mq->mq_cb_arg = arg; - nni_msgq_run_notify(mq); - nni_mtx_unlock(&mq->mq_lock); } static void @@ -543,3 +528,39 @@ out: nni_mtx_unlock(&mq->mq_lock); return (0); } + +int +nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_recvable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_recvable; + return (0); +} + +int +nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp) +{ + nni_mtx_lock(&mq->mq_lock); + if (mq->mq_sendable == NULL) { + int rv; + if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) { + nni_mtx_unlock(&mq->mq_lock); + return (rv); + } + nni_msgq_run_notify(mq); + } + nni_mtx_unlock(&mq->mq_lock); + + *sp = mq->mq_sendable; + return (0); +} diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 93a26eb6..2d23f448 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -12,6 +12,7 @@ #define CORE_MSGQUEUE_H #include "nng_impl.h" +#include "pollable.h" // Message queues. Message queues work in some ways like Go channels; // they are a thread-safe way to pass messages between subsystems. They @@ -77,28 +78,6 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); // discarded instead, and any get waiters remain waiting. extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); -// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb. -enum nni_msgq_cb_flags { - nni_msgq_f_full = 1, - nni_msgq_f_empty = 2, - nni_msgq_f_can_get = 4, - nni_msgq_f_can_put = 8, - nni_msgq_f_closed = 16, -}; - -// nni_msgq_cb is a callback function used by sockets to monitor -// the status of the queue. It is called with the lock held for -// performance reasons so consumers must not re-enter the queue. -// The purpose is to enable file descriptor notifications on the socket, -// which don't need to reenter the msgq. The integer is a mask of -// flags that are true for the given message queue. -typedef void (*nni_msgq_cb)(void *, int); - -// nni_msgq_set_cb sets the callback and argument for the callback -// which will be called on state changes in the message queue. Only -// one callback can be registered on a message queue at a time. -extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); - // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. @@ -119,4 +98,7 @@ extern int nni_msgq_cap(nni_msgq *mq); // nni_msgq_len returns the number of messages currently in the queue. extern int nni_msgq_len(nni_msgq *mq); +extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **); +extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **); + #endif // CORE_MSQUEUE_H diff --git a/src/core/pollable.c b/src/core/pollable.c index b5cecf37..a121ba3f 100644 --- a/src/core/pollable.c +++ b/src/core/pollable.c @@ -52,11 +52,13 @@ nni_pollable_raise(nni_pollable *p) return; } nni_mtx_lock(&p->p_lock); - p->p_raised = true; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_raise(p->p_wfd); - return; + if (!p->p_raised) { + p->p_raised = true; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_raise(p->p_wfd); + return; + } } nni_mtx_unlock(&p->p_lock); } @@ -68,11 +70,13 @@ nni_pollable_clear(nni_pollable *p) return; } nni_mtx_lock(&p->p_lock); - p->p_raised = false; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_clear(p->p_rfd); - return; + if (p->p_raised) { + p->p_raised = false; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_clear(p->p_rfd); + return; + } } nni_mtx_unlock(&p->p_lock); } diff --git a/src/core/socket.c b/src/core/socket.c index 4021c0c6..5c28dbbb 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -92,37 +92,11 @@ struct nni_socket { static void nni_ctx_destroy(nni_ctx *); -static void -nni_sock_can_send_cb(void *arg, int flags) -{ - nni_notifyfd *fd = arg; - - if ((flags & nni_msgq_f_can_put) == 0) { - nni_plat_pipe_clear(fd->sn_rfd); - } else { - nni_plat_pipe_raise(fd->sn_wfd); - } -} - -static void -nni_sock_can_recv_cb(void *arg, int flags) -{ - nni_notifyfd *fd = arg; - - if ((flags & nni_msgq_f_can_get) == 0) { - nni_plat_pipe_clear(fd->sn_rfd); - } else { - nni_plat_pipe_raise(fd->sn_wfd); - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { int rv; - nni_notifyfd *fd; - nni_msgq * mq; - nni_msgq_cb cb; + nni_pollable *p; if ((flag & nni_sock_flags(s)) == 0) { return (NNG_ENOTSUP); @@ -130,41 +104,21 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) switch (flag) { case NNI_PROTO_FLAG_SND: - fd = &s->s_send_fd; - mq = s->s_uwq; - cb = nni_sock_can_send_cb; + rv = nni_msgq_get_sendable(s->s_uwq, &p); break; case NNI_PROTO_FLAG_RCV: - fd = &s->s_recv_fd; - mq = s->s_urq; - cb = nni_sock_can_recv_cb; + rv = nni_msgq_get_recvable(s->s_urq, &p); break; default: - // This should never occur. - return (NNG_EINVAL); + rv = NNG_EINVAL; + break; } - // Open if not already done. - if (!fd->sn_init) { - if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) { - return (rv); - } - - // Only set the callback on the message queue if we are - // using it. The message queue automatically updates - // the pipe when the callback is first established. - // If we are not using the message queue, then we have - // to update the initial state explicitly ourselves. - - if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { - nni_msgq_set_cb(mq, cb, fd); - } - - fd->sn_init = 1; + if (rv == 0) { + rv = nni_pollable_getfd(p, fdp); } - *fdp = fd->sn_rfd; - return (0); + return (rv); } static int diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d6283dbc..08806679 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -186,6 +186,7 @@ add_nng_compat_test(compat_reqrep 5) add_nng_compat_test(compat_survey 5) add_nng_compat_test(compat_reqttl 5) add_nng_compat_test(compat_shutdown 5) +add_nng_compat_test(compat_poll 5) # These are special tests for compat mode, not inherited from the # legacy libnanomsg suite. diff --git a/tests/compat_poll.c b/tests/compat_poll.c new file mode 100644 index 00000000..1101eb0a --- /dev/null +++ b/tests/compat_poll.c @@ -0,0 +1,201 @@ +/* + Copyright (c) 2013 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a + copy of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + DEALINGS IN THE SOFTWARE. +*/ + + +#if defined _WIN32 +#define poll WSAPoll +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#include +#include +#include +#else +#include +#endif +#include "compat_testutil.h" +#include + +/* Test of polling via NN_SNDFD/NN_RCVFD mechanism. */ + +#define SOCKET_ADDRESS "inproc://a" + +int sc; + +void +routine1(NN_UNUSED void *arg) +{ + nn_sleep(10); + test_send(sc, "ABC"); +} + +void +routine2(NN_UNUSED void *arg) +{ + nn_sleep(10); + nn_term(); +} + +#define NN_IN 1 +#define NN_OUT 2 + +int +getevents(int s, int events, int timeout) +{ + int rc; + fd_set pollset; +#if defined _WIN32 + SOCKET rcvfd; + SOCKET sndfd; +#else + int rcvfd; + int sndfd; + int maxfd; +#endif + size_t fdsz; + struct timeval tv; + int revents; + +#if !defined _WIN32 + maxfd = 0; +#endif + FD_ZERO(&pollset); + + if (events & NN_IN) { + fdsz = sizeof(rcvfd); + rc = nn_getsockopt( + s, NN_SOL_SOCKET, NN_RCVFD, (char *) &rcvfd, &fdsz); + errno_assert(rc == 0); + nn_assert(fdsz == sizeof(rcvfd)); + FD_SET(rcvfd, &pollset); +#if !defined _WIN32 + if (rcvfd + 1 > maxfd) + maxfd = rcvfd + 1; +#endif + } + + if (events & NN_OUT) { + fdsz = sizeof(sndfd); + rc = nn_getsockopt( + s, NN_SOL_SOCKET, NN_SNDFD, (char *) &sndfd, &fdsz); + errno_assert(rc == 0); + nn_assert(fdsz == sizeof(sndfd)); + FD_SET(sndfd, &pollset); +#if !defined _WIN32 + if (sndfd + 1 > maxfd) + maxfd = sndfd + 1; +#endif + } + + if (timeout >= 0) { + tv.tv_sec = timeout / 1000; + tv.tv_usec = (timeout % 1000) * 1000; + } +#if defined _WIN32 + rc = select(0, &pollset, NULL, NULL, timeout < 0 ? NULL : &tv); + wsa_assert(rc != SOCKET_ERROR); +#else + rc = select(maxfd, &pollset, NULL, NULL, timeout < 0 ? NULL : &tv); + errno_assert(rc >= 0); +#endif + revents = 0; + if ((events & NN_IN) && FD_ISSET(rcvfd, &pollset)) + revents |= NN_IN; + if ((events & NN_OUT) && FD_ISSET(sndfd, &pollset)) + revents |= NN_OUT; + return revents; +} + +int +main() +{ + int rc; + int sb; + char buf[3]; + struct nn_thread thread; + struct nn_pollfd pfd[2]; + + /* Test nn_poll() function. */ + sb = test_socket(AF_SP, NN_PAIR); + test_bind(sb, SOCKET_ADDRESS); + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, SOCKET_ADDRESS); + nn_sleep(200); + test_send(sc, "ABC"); + nn_sleep(100); + pfd[0].fd = sb; + pfd[0].events = NN_POLLIN | NN_POLLOUT; + pfd[1].fd = sc; + pfd[1].events = NN_POLLIN | NN_POLLOUT; + rc = nn_poll(pfd, 2, -1); + errno_assert(rc >= 0); + nn_assert(rc == 2); + nn_assert(pfd[0].revents == (NN_POLLIN | NN_POLLOUT)); + nn_assert(pfd[1].revents == NN_POLLOUT); + test_close(sc); + test_close(sb); + + /* Create a simple topology. */ + sb = test_socket(AF_SP, NN_PAIR); + test_bind(sb, SOCKET_ADDRESS); + sc = test_socket(AF_SP, NN_PAIR); + test_connect(sc, SOCKET_ADDRESS); + + /* Check the initial state of the socket. */ + rc = getevents(sb, NN_IN | NN_OUT, 1000); + nn_assert(rc == NN_OUT); + + /* Poll for IN when there's no message available. The call should + time out. */ + rc = getevents(sb, NN_IN, 10); + nn_assert(rc == 0); + + /* Send a message and start polling. This time IN event should be + signaled. */ + test_send(sc, "ABC"); + rc = getevents(sb, NN_IN, 1000); + nn_assert(rc == NN_IN); + + /* Receive the message and make sure that IN is no longer signaled. */ + test_recv(sb, "ABC"); + rc = getevents(sb, NN_IN, 10); + nn_assert(rc == 0); + + /* Check signalling from a different thread. */ + nn_thread_init(&thread, routine1, NULL); + rc = getevents(sb, NN_IN, 1000); + nn_assert(rc == NN_IN); + test_recv(sb, "ABC"); + nn_thread_term(&thread); + + /* Check terminating the library from a different thread. */ + nn_thread_init(&thread, routine2, NULL); + rc = nn_recv(sb, buf, sizeof(buf), 0); + nn_assert(rc < 0 && nn_errno() == EBADF); + nn_thread_term(&thread); + + /* Clean up. */ + test_close(sc); + test_close(sb); + + return 0; +} diff --git a/tests/compat_testutil.h b/tests/compat_testutil.h index 745f5621..ce5968a2 100644 --- a/tests/compat_testutil.h +++ b/tests/compat_testutil.h @@ -40,6 +40,7 @@ #define nn_err_abort abort #define nn_assert assert #define errno_assert assert +#define wsa_assert assert #define alloc_assert(x) assert(x != NULL) #if defined __GNUC__ || defined __llvm__ || defined __clang__ -- cgit v1.2.3-70-g09d2