summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-16 11:40:28 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-16 20:56:32 -0700
commit45f3f141850a0ac07c31906748752571652683df (patch)
tree0e14b8e5a72972e370f60ea5fd230a790195cd28
parente3b8f31b044e4fe7d47439467fc1622266b5335c (diff)
downloadnng-45f3f141850a0ac07c31906748752571652683df.tar.gz
nng-45f3f141850a0ac07c31906748752571652683df.tar.bz2
nng-45f3f141850a0ac07c31906748752571652683df.zip
fixes #344 nn_poll() legacy API missing
This includes the test from legacy libnanomsg and a man page. We have refactored the message queue notification system so that it uses nni_pollable, leading we hope to a more consistent system, and reducing the code size and complexity. We also fixed the size of the NN_RCVFD and NN_SNDFD so that they are a SOCKET on Windows systems, rather than an integer. This addresses 64-bit compilation problems.
-rw-r--r--docs/man/nn_getsockopt.3compat.adoc6
-rw-r--r--docs/man/nn_poll.3compat.adoc101
-rw-r--r--docs/man/nng_compat.3compat.adoc4
-rw-r--r--src/compat/nanomsg/nn.c152
-rw-r--r--src/core/msgqueue.c111
-rw-r--r--src/core/msgqueue.h26
-rw-r--r--src/core/pollable.c24
-rw-r--r--src/core/socket.c62
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/compat_poll.c201
-rw-r--r--tests/compat_testutil.h1
11 files changed, 553 insertions, 136 deletions
diff --git a/docs/man/nn_getsockopt.3compat.adoc b/docs/man/nn_getsockopt.3compat.adoc
index 04c024bc..f4988093 100644
--- a/docs/man/nn_getsockopt.3compat.adoc
+++ b/docs/man/nn_getsockopt.3compat.adoc
@@ -148,15 +148,19 @@ This option returns a file descriptor suitable for use in with `poll()` or
`select()` (or other system-specific polling functions).
This descriptor will be readable when a message is available for receiving
at the socket.
+This option is of type `int` on all systems except Windows, where it is of
+type `SOCKET`.
NOTE: The file descriptor should not be read or written by the application,
and is not the same as any underlying descriptor used for network sockets.
-`NN_RCVFD`::
+`NN_SNDFD`::
This option returns a file descriptor suitable for use in with `poll()` or
`select()` (or other system-specific polling functions).
This descriptor will be readable when the socket is able to accept a message
for sending.
+This option is of type `int` on all systems except Windows, where it is of
+type `SOCKET`.
NOTE: The file descriptor should not be read or written by the application,
and is not the same as any underlying descriptor used for network sockets.
diff --git a/docs/man/nn_poll.3compat.adoc b/docs/man/nn_poll.3compat.adoc
new file mode 100644
index 00000000..0a1f02e6
--- /dev/null
+++ b/docs/man/nn_poll.3compat.adoc
@@ -0,0 +1,101 @@
+= nn_poll(3compat)
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This document is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+== NAME
+
+nn_poll - poll sockets (compatible API)
+
+== SYNOPSIS
+
+[source, c]
+----
+#include <nanomsg/nn.h>
+
+#define NN_POLLIN 1
+#define NN_POLLOUT 2
+
+struct nn_pollfd {
+ int fd;
+ uint16_t events;
+ uint16_t revents;
+};
+
+int nn_poll(struct nn_pollfd *pfds, int npfd, int timeout);
+----
+
+== DESCRIPTION
+
+The `nn_poll()` function polls a group of sockets for readiness to send or receive.
+
+NOTE: This function is provided for API
+<<nng_compat.3compat#,compatibility>> with legacy _libnanomsg_.
+Consider using the relevant <<libnng.3#,modern API>> instead.
+
+The array of _nfds_ sockets to poll for are passed into _pfds_.
+Each member of this array is initialized with the `fd` field set to
+the socket, and the `events` field set to a mask that can contain either or both
+of the flags `NN_POLLIN` and `NN_POLLOUT`.
+
+The flag `NN_POLLIN` indicates that a socket is ready for receiving without
+blocking (a message is avaiable on the socket), and the flag `NN_POLLOUT`
+indicates that a socket is ready for sending without blocking.
+
+Upon success, the function returns the number of updates the `revents`
+field of each member of the _pfds_ array, setting it to indicate
+whether the requested status is true or not.
+
+NOTE: The `revents` field will only have a flag set if the corresponding
+flag was also set in the `events` field.
+
+If the _timeout_ field is positive, then this function will wait for
+up the that many milliseconds.
+If none of the requested events occurs before that timeout occurs, then
+the function will return -1 and set the error to `ETIMEDOUT`.
+
+If the _timeout_ is zero, then this function will return immediately,
+after updating the current status of the sockets.
+
+If the _timeout_ is -1, then the function waits forever, or until one of the
+requested events occurs.
+
+WARNING: This function is only suitable for use with sockets obtained with the
+`<<nn_socket.3compat#,nn_socket()>>`` function, and is not compatible
+with file descriptors obtained via any other means.
+This includes file descriptors obtained using the `NN_SNDFD` or `NN_RCVFD`
+options with `<<nn_getsockopt.3compat#,nn_getsockopt()>>`
+
+NOTE: This function is significantly less efficient than other polling
+or asynchronous I/O mechanisms, and is provided for API compatibility only.
+It's use is discouraged.
+
+NOTE: This function is *not* supported on systems other than POSIX derived
+platforms and Windows.
+
+== RETURN VALUES
+
+This function returns the number of sockets with events on success, or -1 on error.
+
+== ERRORS
+
+[horizontal]
+`ENOMEM`:: Insufficient memory available.
+`EBADF`:: One of the sockets is not open.
+`ETIMEDOUT`:: Operation timed out.
+`ENOTSUP`:: This function is not supported on this platform.
+
+== SEE ALSO
+
+<<nn_errno.3compat#,nn_errno(3compat)>>,
+<<nn_recv.3compat#,nn_recv(3compat)>>,
+<<nn_send.3compat#,nn_send(3compat)>>,
+<<nn_socket.3compat#,nn_socket(3compat)>>,
+<<nng_compat.3compat#,nn_compat(3compat)>>,
+<<nng.7#,nng(7)>>
diff --git a/docs/man/nng_compat.3compat.adoc b/docs/man/nng_compat.3compat.adoc
index 5493e41f..9029fb05 100644
--- a/docs/man/nng_compat.3compat.adoc
+++ b/docs/man/nng_compat.3compat.adoc
@@ -63,7 +63,7 @@ ifndef::backend-pdf[]
`<<nn_recv.3compat#,nn_recv()>>`:: receive data
`<<nn_shutdown.3compat#,nn_shutdown()>>`:: shut down endpoint
`<<nn_close.3compat#,nn_close()>>`:: close socket
-//`nn_poll()`:: poll sockets
+`<<nn_poll.3compat#,nn_poll()>>`:: poll sockets
`<<nn_device.3compat#,nn_device()>>`:: create forwarding device
`<<nn_recvmsg.3compat#,nn_recvmsg()>>`:: receive message
`<<nn_sendmsg.3compat#,nn_sendmsg()>>`:: send message
@@ -89,7 +89,7 @@ ifdef::backend-pdf[]
|`<<nn_recv.3compat#,nn_recv()>>`|receive data
|`<<nn_shutdown.3compat#,nn_shutdown()>>`|shut down endpoint
|`<<nn_close.3compat#,nn_close()>>`|close socket
-//|`nn_poll()`|poll sockets
+|`<<nn_poll.3compat#,nn_poll()>>`|poll sockets
|`<<nn_device.3compat#,nn_device()>>`|create forwarding device
|`<<nn_recvmsg.3compat#,nn_recvmsg()>>`|receive message
|`<<nn_sendmsg.3compat#,nn_sendmsg()>>`|send message
diff --git a/src/compat/nanomsg/nn.c b/src/compat/nanomsg/nn.c
index cbb88a27..40671cfa 100644
--- a/src/compat/nanomsg/nn.c
+++ b/src/compat/nanomsg/nn.c
@@ -664,6 +664,39 @@ nn_getdomain(nng_socket s, void *valp, size_t *szp)
return (0);
}
+#ifndef NNG_PLATFORM_WINDOWS
+#define SOCKET int
+#endif
+
+static int
+nn_getfd(nng_socket s, void *valp, size_t *szp, const char *opt)
+{
+ int ifd;
+ int rv;
+ SOCKET sfd;
+
+ if ((rv = nng_getopt_int(s, opt, &ifd)) != 0) {
+ nn_seterror(rv);
+ return (-1);
+ }
+ sfd = (SOCKET) ifd;
+ memcpy(valp, &sfd, *szp < sizeof(sfd) ? *szp : sizeof(sfd));
+ *szp = sizeof(sfd);
+ return (0);
+}
+
+static int
+nn_getrecvfd(nng_socket s, void *valp, size_t *szp)
+{
+ return (nn_getfd(s, valp, szp, NNG_OPT_RECVFD));
+}
+
+static int
+nn_getsendfd(nng_socket s, void *valp, size_t *szp)
+{
+ return (nn_getfd(s, valp, szp, NNG_OPT_SENDFD));
+}
+
static int
nn_getzero(nng_socket s, void *valp, size_t *szp)
{
@@ -727,10 +760,16 @@ static const struct {
.opt = NNG_OPT_RECONNMAXT,
},
{
- .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDFD, .opt = NNG_OPT_SENDFD,
+ .nnlevel = NN_SOL_SOCKET,
+ .nnopt = NN_SNDFD,
+ .opt = NNG_OPT_SENDFD,
+ .get = nn_getsendfd,
},
{
- .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVFD, .opt = NNG_OPT_RECVFD,
+ .nnlevel = NN_SOL_SOCKET,
+ .nnopt = NN_RCVFD,
+ .opt = NNG_OPT_RECVFD,
+ .get = nn_getrecvfd,
},
{
.nnlevel = NN_SOL_SOCKET,
@@ -888,6 +927,115 @@ nn_device(int s1, int s2)
return (-1);
}
+// Windows stuff.
+#ifdef NNG_PLATFORM_WINDOWS
+#define poll WSAPoll
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <windows.h>
+#include <mswsock.h>
+#elif defined NNG_PLATFORM_POSIX
+#include <poll.h>
+#endif
+
+int
+nn_poll(struct nn_pollfd *fds, int nfds, int timeout)
+{
+// This function is rather unfortunate. poll() is available
+// on POSIX, and on Windows as WSAPoll. On other systems it might
+// not exist at all. We could also benefit from using a notification
+// that didn't have to access file descriptors... sort of access to
+// the pollable element on the socket. We don't have that, so we
+// just use poll. This function is definitely suboptimal compared to
+// using callbacks.
+#if defined(NNG_PLATFORM_WINDOWS) || defined(NNG_PLATFORM_POSIX)
+ struct pollfd *pfd;
+ int npfd;
+ int rv;
+
+ if ((pfd = NNI_ALLOC_STRUCTS(pfd, nfds * 2)) == NULL) {
+ errno = ENOMEM;
+ return (-1);
+ }
+
+ // First prepare the master polling structure.
+ npfd = 0;
+ for (int i = 0; i < nfds; i++) {
+ int fd;
+ if (fds[i].events & NN_POLLIN) {
+ if ((rv = nng_getopt_int((nng_socket) fds[i].fd,
+ NNG_OPT_RECVFD, &fd)) != 0) {
+ nn_seterror(rv);
+ NNI_FREE_STRUCTS(pfd, nfds * 2);
+ return (-1);
+ }
+#ifdef NNG_PLATFORM_WINDOWS
+ pfd[npfd].fd = (SOCKET) fd;
+#else
+ pfd[npfd].fd = fd;
+#endif
+ pfd[npfd].events = POLLIN;
+ npfd++;
+ }
+ if (fds[i].events & NN_POLLOUT) {
+ if ((rv = nng_getopt_int((nng_socket) fds[i].fd,
+ NNG_OPT_SENDFD, &fd)) != 0) {
+ nn_seterror(rv);
+ NNI_FREE_STRUCTS(pfd, nfds * 2);
+ return (-1);
+ }
+#ifdef NNG_PLATFORM_WINDOWS
+ pfd[npfd].fd = (SOCKET) fd;
+#else
+ pfd[npfd].fd = fd;
+#endif
+ pfd[npfd].events = POLLIN;
+ npfd++;
+ }
+ }
+
+ rv = poll(pfd, npfd, timeout);
+ if (rv < 0) {
+ int e = errno;
+ NNI_FREE_STRUCTS(pfd, nfds * 2);
+ errno = e;
+ return (-1);
+ }
+
+ // Now update the nn_poll from the system poll.
+ npfd = 0;
+ rv = 0;
+ for (int i = 0; i < nfds; i++) {
+ fds[i].revents = 0;
+ if (fds[i].events & NN_POLLIN) {
+ if (pfd[npfd].revents & POLLIN) {
+ fds[i].revents |= NN_POLLIN;
+ }
+ npfd++;
+ }
+ if (fds[i].events & NN_POLLOUT) {
+ if (pfd[npfd].revents & POLLIN) {
+ fds[i].revents |= NN_POLLOUT;
+ }
+ npfd++;
+ }
+ if (fds[i].revents) {
+ rv++;
+ }
+ }
+ NNI_FREE_STRUCTS(pfd, nfds * 2);
+ return (rv);
+
+#else // NNG_PLATFORM_WINDOWS or NNG_PLATFORM_POSIX
+ NNI_ARG_UNUSED(pfds);
+ NNI_ARG_UNUSED(npfd);
+ NNI_ARG_UNUSED(timeout);
+ errno = ENOTSUP;
+ return (-1);
+#endif
+}
+
// nn_term is suitable only for shutting down the entire library,
// and is not thread-safe with other functions.
void
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 529e7f4d..212b52c2 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -31,11 +31,9 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
- // Callback - this function is executed with the lock held, and
- // provides information about the current queue state anytime
- // a message enters or leaves the queue, or a waiter is blocked.
- nni_msgq_cb mq_cb_fn;
- void * mq_cb_arg;
+ // Pollable status.
+ nni_pollable *mq_sendable;
+ nni_pollable *mq_recvable;
// Filters.
nni_msgq_filter mq_filter_fn;
@@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
NNI_FREE_STRUCT(mq);
return (NNG_ENOMEM);
}
-
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- *mqp = mq;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_recvable = NULL;
+ mq->mq_sendable = NULL;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq)
nni_msg_free(msg);
}
+ if (mq->mq_sendable) {
+ nni_pollable_free(mq->mq_sendable);
+ }
+ if (mq->mq_recvable) {
+ nni_pollable_free(mq->mq_recvable);
+ }
+
nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *));
NNI_FREE_STRUCT(mq);
}
@@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
- if (mq->mq_cb_fn != NULL) {
- int flags = 0;
-
- if (mq->mq_closed) {
- flags |= nni_msgq_f_closed;
- }
- if (mq->mq_len == 0) {
- flags |= nni_msgq_f_empty;
- } else if (mq->mq_len == mq->mq_cap) {
- flags |= nni_msgq_f_full;
- }
- if (mq->mq_len < mq->mq_cap ||
- !nni_list_empty(&mq->mq_aio_getq)) {
- flags |= nni_msgq_f_can_put;
- }
- if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- flags |= nni_msgq_f_can_get;
- }
- mq->mq_cb_fn(mq->mq_cb_arg, flags);
+ if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) {
+ nni_pollable_raise(mq->mq_sendable);
+ } else {
+ nni_pollable_clear(mq->mq_sendable);
+ }
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
+ nni_pollable_raise(mq->mq_recvable);
+ } else {
+ nni_pollable_clear(mq->mq_recvable);
}
-}
-
-void
-nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg)
-{
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_cb_fn = fn;
- mq->mq_cb_arg = arg;
- nni_msgq_run_notify(mq);
- nni_mtx_unlock(&mq->mq_lock);
}
static void
@@ -543,3 +528,39 @@ out:
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
+
+int
+nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_recvable == NULL) {
+ int rv;
+ if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+ nni_msgq_run_notify(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+
+ *sp = mq->mq_recvable;
+ return (0);
+}
+
+int
+nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_sendable == NULL) {
+ int rv;
+ if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+ nni_msgq_run_notify(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+
+ *sp = mq->mq_sendable;
+ return (0);
+}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 93a26eb6..2d23f448 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -12,6 +12,7 @@
#define CORE_MSGQUEUE_H
#include "nng_impl.h"
+#include "pollable.h"
// Message queues. Message queues work in some ways like Go channels;
// they are a thread-safe way to pass messages between subsystems. They
@@ -77,28 +78,6 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);
// discarded instead, and any get waiters remain waiting.
extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);
-// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb.
-enum nni_msgq_cb_flags {
- nni_msgq_f_full = 1,
- nni_msgq_f_empty = 2,
- nni_msgq_f_can_get = 4,
- nni_msgq_f_can_put = 8,
- nni_msgq_f_closed = 16,
-};
-
-// nni_msgq_cb is a callback function used by sockets to monitor
-// the status of the queue. It is called with the lock held for
-// performance reasons so consumers must not re-enter the queue.
-// The purpose is to enable file descriptor notifications on the socket,
-// which don't need to reenter the msgq. The integer is a mask of
-// flags that are true for the given message queue.
-typedef void (*nni_msgq_cb)(void *, int);
-
-// nni_msgq_set_cb sets the callback and argument for the callback
-// which will be called on state changes in the message queue. Only
-// one callback can be registered on a message queue at a time.
-extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *);
-
// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
@@ -119,4 +98,7 @@ extern int nni_msgq_cap(nni_msgq *mq);
// nni_msgq_len returns the number of messages currently in the queue.
extern int nni_msgq_len(nni_msgq *mq);
+extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **);
+extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/pollable.c b/src/core/pollable.c
index b5cecf37..a121ba3f 100644
--- a/src/core/pollable.c
+++ b/src/core/pollable.c
@@ -52,11 +52,13 @@ nni_pollable_raise(nni_pollable *p)
return;
}
nni_mtx_lock(&p->p_lock);
- p->p_raised = true;
- if (p->p_open) {
- nni_mtx_unlock(&p->p_lock);
- nni_plat_pipe_raise(p->p_wfd);
- return;
+ if (!p->p_raised) {
+ p->p_raised = true;
+ if (p->p_open) {
+ nni_mtx_unlock(&p->p_lock);
+ nni_plat_pipe_raise(p->p_wfd);
+ return;
+ }
}
nni_mtx_unlock(&p->p_lock);
}
@@ -68,11 +70,13 @@ nni_pollable_clear(nni_pollable *p)
return;
}
nni_mtx_lock(&p->p_lock);
- p->p_raised = false;
- if (p->p_open) {
- nni_mtx_unlock(&p->p_lock);
- nni_plat_pipe_clear(p->p_rfd);
- return;
+ if (p->p_raised) {
+ p->p_raised = false;
+ if (p->p_open) {
+ nni_mtx_unlock(&p->p_lock);
+ nni_plat_pipe_clear(p->p_rfd);
+ return;
+ }
}
nni_mtx_unlock(&p->p_lock);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 4021c0c6..5c28dbbb 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -92,37 +92,11 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
-static void
-nni_sock_can_send_cb(void *arg, int flags)
-{
- nni_notifyfd *fd = arg;
-
- if ((flags & nni_msgq_f_can_put) == 0) {
- nni_plat_pipe_clear(fd->sn_rfd);
- } else {
- nni_plat_pipe_raise(fd->sn_wfd);
- }
-}
-
-static void
-nni_sock_can_recv_cb(void *arg, int flags)
-{
- nni_notifyfd *fd = arg;
-
- if ((flags & nni_msgq_f_can_get) == 0) {
- nni_plat_pipe_clear(fd->sn_rfd);
- } else {
- nni_plat_pipe_raise(fd->sn_wfd);
- }
-}
-
static int
nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
{
int rv;
- nni_notifyfd *fd;
- nni_msgq * mq;
- nni_msgq_cb cb;
+ nni_pollable *p;
if ((flag & nni_sock_flags(s)) == 0) {
return (NNG_ENOTSUP);
@@ -130,41 +104,21 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
switch (flag) {
case NNI_PROTO_FLAG_SND:
- fd = &s->s_send_fd;
- mq = s->s_uwq;
- cb = nni_sock_can_send_cb;
+ rv = nni_msgq_get_sendable(s->s_uwq, &p);
break;
case NNI_PROTO_FLAG_RCV:
- fd = &s->s_recv_fd;
- mq = s->s_urq;
- cb = nni_sock_can_recv_cb;
+ rv = nni_msgq_get_recvable(s->s_urq, &p);
break;
default:
- // This should never occur.
- return (NNG_EINVAL);
+ rv = NNG_EINVAL;
+ break;
}
- // Open if not already done.
- if (!fd->sn_init) {
- if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
- return (rv);
- }
-
- // Only set the callback on the message queue if we are
- // using it. The message queue automatically updates
- // the pipe when the callback is first established.
- // If we are not using the message queue, then we have
- // to update the initial state explicitly ourselves.
-
- if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) {
- nni_msgq_set_cb(mq, cb, fd);
- }
-
- fd->sn_init = 1;
+ if (rv == 0) {
+ rv = nni_pollable_getfd(p, fdp);
}
- *fdp = fd->sn_rfd;
- return (0);
+ return (rv);
}
static int
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index d6283dbc..08806679 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -186,6 +186,7 @@ add_nng_compat_test(compat_reqrep 5)
add_nng_compat_test(compat_survey 5)
add_nng_compat_test(compat_reqttl 5)
add_nng_compat_test(compat_shutdown 5)
+add_nng_compat_test(compat_poll 5)
# These are special tests for compat mode, not inherited from the
# legacy libnanomsg suite.
diff --git a/tests/compat_poll.c b/tests/compat_poll.c
new file mode 100644
index 00000000..1101eb0a
--- /dev/null
+++ b/tests/compat_poll.c
@@ -0,0 +1,201 @@
+/*
+ Copyright (c) 2013 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a
+ copy of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ DEALINGS IN THE SOFTWARE.
+*/
+
+
+#if defined _WIN32
+#define poll WSAPoll
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+#include <windows.h>
+#include <winsock2.h>
+#include <mswsock.h>
+#else
+#include <sys/select.h>
+#endif
+#include "compat_testutil.h"
+#include <nanomsg/nn.h>
+
+/* Test of polling via NN_SNDFD/NN_RCVFD mechanism. */
+
+#define SOCKET_ADDRESS "inproc://a"
+
+int sc;
+
+void
+routine1(NN_UNUSED void *arg)
+{
+ nn_sleep(10);
+ test_send(sc, "ABC");
+}
+
+void
+routine2(NN_UNUSED void *arg)
+{
+ nn_sleep(10);
+ nn_term();
+}
+
+#define NN_IN 1
+#define NN_OUT 2
+
+int
+getevents(int s, int events, int timeout)
+{
+ int rc;
+ fd_set pollset;
+#if defined _WIN32
+ SOCKET rcvfd;
+ SOCKET sndfd;
+#else
+ int rcvfd;
+ int sndfd;
+ int maxfd;
+#endif
+ size_t fdsz;
+ struct timeval tv;
+ int revents;
+
+#if !defined _WIN32
+ maxfd = 0;
+#endif
+ FD_ZERO(&pollset);
+
+ if (events & NN_IN) {
+ fdsz = sizeof(rcvfd);
+ rc = nn_getsockopt(
+ s, NN_SOL_SOCKET, NN_RCVFD, (char *) &rcvfd, &fdsz);
+ errno_assert(rc == 0);
+ nn_assert(fdsz == sizeof(rcvfd));
+ FD_SET(rcvfd, &pollset);
+#if !defined _WIN32
+ if (rcvfd + 1 > maxfd)
+ maxfd = rcvfd + 1;
+#endif
+ }
+
+ if (events & NN_OUT) {
+ fdsz = sizeof(sndfd);
+ rc = nn_getsockopt(
+ s, NN_SOL_SOCKET, NN_SNDFD, (char *) &sndfd, &fdsz);
+ errno_assert(rc == 0);
+ nn_assert(fdsz == sizeof(sndfd));
+ FD_SET(sndfd, &pollset);
+#if !defined _WIN32
+ if (sndfd + 1 > maxfd)
+ maxfd = sndfd + 1;
+#endif
+ }
+
+ if (timeout >= 0) {
+ tv.tv_sec = timeout / 1000;
+ tv.tv_usec = (timeout % 1000) * 1000;
+ }
+#if defined _WIN32
+ rc = select(0, &pollset, NULL, NULL, timeout < 0 ? NULL : &tv);
+ wsa_assert(rc != SOCKET_ERROR);
+#else
+ rc = select(maxfd, &pollset, NULL, NULL, timeout < 0 ? NULL : &tv);
+ errno_assert(rc >= 0);
+#endif
+ revents = 0;
+ if ((events & NN_IN) && FD_ISSET(rcvfd, &pollset))
+ revents |= NN_IN;
+ if ((events & NN_OUT) && FD_ISSET(sndfd, &pollset))
+ revents |= NN_OUT;
+ return revents;
+}
+
+int
+main()
+{
+ int rc;
+ int sb;
+ char buf[3];
+ struct nn_thread thread;
+ struct nn_pollfd pfd[2];
+
+ /* Test nn_poll() function. */
+ sb = test_socket(AF_SP, NN_PAIR);
+ test_bind(sb, SOCKET_ADDRESS);
+ sc = test_socket(AF_SP, NN_PAIR);
+ test_connect(sc, SOCKET_ADDRESS);
+ nn_sleep(200);
+ test_send(sc, "ABC");
+ nn_sleep(100);
+ pfd[0].fd = sb;
+ pfd[0].events = NN_POLLIN | NN_POLLOUT;
+ pfd[1].fd = sc;
+ pfd[1].events = NN_POLLIN | NN_POLLOUT;
+ rc = nn_poll(pfd, 2, -1);
+ errno_assert(rc >= 0);
+ nn_assert(rc == 2);
+ nn_assert(pfd[0].revents == (NN_POLLIN | NN_POLLOUT));
+ nn_assert(pfd[1].revents == NN_POLLOUT);
+ test_close(sc);
+ test_close(sb);
+
+ /* Create a simple topology. */
+ sb = test_socket(AF_SP, NN_PAIR);
+ test_bind(sb, SOCKET_ADDRESS);
+ sc = test_socket(AF_SP, NN_PAIR);
+ test_connect(sc, SOCKET_ADDRESS);
+
+ /* Check the initial state of the socket. */
+ rc = getevents(sb, NN_IN | NN_OUT, 1000);
+ nn_assert(rc == NN_OUT);
+
+ /* Poll for IN when there's no message available. The call should
+ time out. */
+ rc = getevents(sb, NN_IN, 10);
+ nn_assert(rc == 0);
+
+ /* Send a message and start polling. This time IN event should be
+ signaled. */
+ test_send(sc, "ABC");
+ rc = getevents(sb, NN_IN, 1000);
+ nn_assert(rc == NN_IN);
+
+ /* Receive the message and make sure that IN is no longer signaled. */
+ test_recv(sb, "ABC");
+ rc = getevents(sb, NN_IN, 10);
+ nn_assert(rc == 0);
+
+ /* Check signalling from a different thread. */
+ nn_thread_init(&thread, routine1, NULL);
+ rc = getevents(sb, NN_IN, 1000);
+ nn_assert(rc == NN_IN);
+ test_recv(sb, "ABC");
+ nn_thread_term(&thread);
+
+ /* Check terminating the library from a different thread. */
+ nn_thread_init(&thread, routine2, NULL);
+ rc = nn_recv(sb, buf, sizeof(buf), 0);
+ nn_assert(rc < 0 && nn_errno() == EBADF);
+ nn_thread_term(&thread);
+
+ /* Clean up. */
+ test_close(sc);
+ test_close(sb);
+
+ return 0;
+}
diff --git a/tests/compat_testutil.h b/tests/compat_testutil.h
index 745f5621..ce5968a2 100644
--- a/tests/compat_testutil.h
+++ b/tests/compat_testutil.h
@@ -40,6 +40,7 @@
#define nn_err_abort abort
#define nn_assert assert
#define errno_assert assert
+#define wsa_assert assert
#define alloc_assert(x) assert(x != NULL)
#if defined __GNUC__ || defined __llvm__ || defined __clang__