aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-16 11:40:28 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-16 20:56:32 -0700
commit45f3f141850a0ac07c31906748752571652683df (patch)
tree0e14b8e5a72972e370f60ea5fd230a790195cd28 /src/core
parente3b8f31b044e4fe7d47439467fc1622266b5335c (diff)
downloadnng-45f3f141850a0ac07c31906748752571652683df.tar.gz
nng-45f3f141850a0ac07c31906748752571652683df.tar.bz2
nng-45f3f141850a0ac07c31906748752571652683df.zip
fixes #344 nn_poll() legacy API missing
This includes the test from legacy libnanomsg and a man page. We have refactored the message queue notification system so that it uses nni_pollable, leading we hope to a more consistent system, and reducing the code size and complexity. We also fixed the size of the NN_RCVFD and NN_SNDFD so that they are a SOCKET on Windows systems, rather than an integer. This addresses 64-bit compilation problems.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/msgqueue.c111
-rw-r--r--src/core/msgqueue.h26
-rw-r--r--src/core/pollable.c24
-rw-r--r--src/core/socket.c62
4 files changed, 92 insertions, 131 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 529e7f4d..212b52c2 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -31,11 +31,9 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
- // Callback - this function is executed with the lock held, and
- // provides information about the current queue state anytime
- // a message enters or leaves the queue, or a waiter is blocked.
- nni_msgq_cb mq_cb_fn;
- void * mq_cb_arg;
+ // Pollable status.
+ nni_pollable *mq_sendable;
+ nni_pollable *mq_recvable;
// Filters.
nni_msgq_filter mq_filter_fn;
@@ -62,20 +60,20 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
NNI_FREE_STRUCT(mq);
return (NNG_ENOMEM);
}
-
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
nni_mtx_init(&mq->mq_lock);
-
- mq->mq_cap = cap;
- mq->mq_alloc = alloc;
- mq->mq_len = 0;
- mq->mq_get = 0;
- mq->mq_put = 0;
- mq->mq_closed = 0;
- mq->mq_puterr = 0;
- mq->mq_geterr = 0;
- *mqp = mq;
+ mq->mq_cap = cap;
+ mq->mq_alloc = alloc;
+ mq->mq_recvable = NULL;
+ mq->mq_sendable = NULL;
+ mq->mq_len = 0;
+ mq->mq_get = 0;
+ mq->mq_put = 0;
+ mq->mq_closed = 0;
+ mq->mq_puterr = 0;
+ mq->mq_geterr = 0;
+ *mqp = mq;
return (0);
}
@@ -101,6 +99,13 @@ nni_msgq_fini(nni_msgq *mq)
nni_msg_free(msg);
}
+ if (mq->mq_sendable) {
+ nni_pollable_free(mq->mq_sendable);
+ }
+ if (mq->mq_recvable) {
+ nni_pollable_free(mq->mq_recvable);
+ }
+
nni_free(mq->mq_msgs, mq->mq_alloc * sizeof(nng_msg *));
NNI_FREE_STRUCT(mq);
}
@@ -292,36 +297,16 @@ nni_msgq_run_getq(nni_msgq *mq)
static void
nni_msgq_run_notify(nni_msgq *mq)
{
- if (mq->mq_cb_fn != NULL) {
- int flags = 0;
-
- if (mq->mq_closed) {
- flags |= nni_msgq_f_closed;
- }
- if (mq->mq_len == 0) {
- flags |= nni_msgq_f_empty;
- } else if (mq->mq_len == mq->mq_cap) {
- flags |= nni_msgq_f_full;
- }
- if (mq->mq_len < mq->mq_cap ||
- !nni_list_empty(&mq->mq_aio_getq)) {
- flags |= nni_msgq_f_can_put;
- }
- if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- flags |= nni_msgq_f_can_get;
- }
- mq->mq_cb_fn(mq->mq_cb_arg, flags);
+ if (mq->mq_len < mq->mq_cap || !nni_list_empty(&mq->mq_aio_getq)) {
+ nni_pollable_raise(mq->mq_sendable);
+ } else {
+ nni_pollable_clear(mq->mq_sendable);
+ }
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
+ nni_pollable_raise(mq->mq_recvable);
+ } else {
+ nni_pollable_clear(mq->mq_recvable);
}
-}
-
-void
-nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg)
-{
- nni_mtx_lock(&mq->mq_lock);
- mq->mq_cb_fn = fn;
- mq->mq_cb_arg = arg;
- nni_msgq_run_notify(mq);
- nni_mtx_unlock(&mq->mq_lock);
}
static void
@@ -543,3 +528,39 @@ out:
nni_mtx_unlock(&mq->mq_lock);
return (0);
}
+
+int
+nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **sp)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_recvable == NULL) {
+ int rv;
+ if ((rv = nni_pollable_alloc(&mq->mq_recvable)) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+ nni_msgq_run_notify(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+
+ *sp = mq->mq_recvable;
+ return (0);
+}
+
+int
+nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **sp)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_sendable == NULL) {
+ int rv;
+ if ((rv = nni_pollable_alloc(&mq->mq_sendable)) != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
+ nni_msgq_run_notify(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+
+ *sp = mq->mq_sendable;
+ return (0);
+}
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 93a26eb6..2d23f448 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -12,6 +12,7 @@
#define CORE_MSGQUEUE_H
#include "nng_impl.h"
+#include "pollable.h"
// Message queues. Message queues work in some ways like Go channels;
// they are a thread-safe way to pass messages between subsystems. They
@@ -77,28 +78,6 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);
// discarded instead, and any get waiters remain waiting.
extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);
-// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb.
-enum nni_msgq_cb_flags {
- nni_msgq_f_full = 1,
- nni_msgq_f_empty = 2,
- nni_msgq_f_can_get = 4,
- nni_msgq_f_can_put = 8,
- nni_msgq_f_closed = 16,
-};
-
-// nni_msgq_cb is a callback function used by sockets to monitor
-// the status of the queue. It is called with the lock held for
-// performance reasons so consumers must not re-enter the queue.
-// The purpose is to enable file descriptor notifications on the socket,
-// which don't need to reenter the msgq. The integer is a mask of
-// flags that are true for the given message queue.
-typedef void (*nni_msgq_cb)(void *, int);
-
-// nni_msgq_set_cb sets the callback and argument for the callback
-// which will be called on state changes in the message queue. Only
-// one callback can be registered on a message queue at a time.
-extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *);
-
// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
@@ -119,4 +98,7 @@ extern int nni_msgq_cap(nni_msgq *mq);
// nni_msgq_len returns the number of messages currently in the queue.
extern int nni_msgq_len(nni_msgq *mq);
+extern int nni_msgq_get_recvable(nni_msgq *mq, nni_pollable **);
+extern int nni_msgq_get_sendable(nni_msgq *mq, nni_pollable **);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/pollable.c b/src/core/pollable.c
index b5cecf37..a121ba3f 100644
--- a/src/core/pollable.c
+++ b/src/core/pollable.c
@@ -52,11 +52,13 @@ nni_pollable_raise(nni_pollable *p)
return;
}
nni_mtx_lock(&p->p_lock);
- p->p_raised = true;
- if (p->p_open) {
- nni_mtx_unlock(&p->p_lock);
- nni_plat_pipe_raise(p->p_wfd);
- return;
+ if (!p->p_raised) {
+ p->p_raised = true;
+ if (p->p_open) {
+ nni_mtx_unlock(&p->p_lock);
+ nni_plat_pipe_raise(p->p_wfd);
+ return;
+ }
}
nni_mtx_unlock(&p->p_lock);
}
@@ -68,11 +70,13 @@ nni_pollable_clear(nni_pollable *p)
return;
}
nni_mtx_lock(&p->p_lock);
- p->p_raised = false;
- if (p->p_open) {
- nni_mtx_unlock(&p->p_lock);
- nni_plat_pipe_clear(p->p_rfd);
- return;
+ if (p->p_raised) {
+ p->p_raised = false;
+ if (p->p_open) {
+ nni_mtx_unlock(&p->p_lock);
+ nni_plat_pipe_clear(p->p_rfd);
+ return;
+ }
}
nni_mtx_unlock(&p->p_lock);
}
diff --git a/src/core/socket.c b/src/core/socket.c
index 4021c0c6..5c28dbbb 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -92,37 +92,11 @@ struct nni_socket {
static void nni_ctx_destroy(nni_ctx *);
-static void
-nni_sock_can_send_cb(void *arg, int flags)
-{
- nni_notifyfd *fd = arg;
-
- if ((flags & nni_msgq_f_can_put) == 0) {
- nni_plat_pipe_clear(fd->sn_rfd);
- } else {
- nni_plat_pipe_raise(fd->sn_wfd);
- }
-}
-
-static void
-nni_sock_can_recv_cb(void *arg, int flags)
-{
- nni_notifyfd *fd = arg;
-
- if ((flags & nni_msgq_f_can_get) == 0) {
- nni_plat_pipe_clear(fd->sn_rfd);
- } else {
- nni_plat_pipe_raise(fd->sn_wfd);
- }
-}
-
static int
nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
{
int rv;
- nni_notifyfd *fd;
- nni_msgq * mq;
- nni_msgq_cb cb;
+ nni_pollable *p;
if ((flag & nni_sock_flags(s)) == 0) {
return (NNG_ENOTSUP);
@@ -130,41 +104,21 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
switch (flag) {
case NNI_PROTO_FLAG_SND:
- fd = &s->s_send_fd;
- mq = s->s_uwq;
- cb = nni_sock_can_send_cb;
+ rv = nni_msgq_get_sendable(s->s_uwq, &p);
break;
case NNI_PROTO_FLAG_RCV:
- fd = &s->s_recv_fd;
- mq = s->s_urq;
- cb = nni_sock_can_recv_cb;
+ rv = nni_msgq_get_recvable(s->s_urq, &p);
break;
default:
- // This should never occur.
- return (NNG_EINVAL);
+ rv = NNG_EINVAL;
+ break;
}
- // Open if not already done.
- if (!fd->sn_init) {
- if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
- return (rv);
- }
-
- // Only set the callback on the message queue if we are
- // using it. The message queue automatically updates
- // the pipe when the callback is first established.
- // If we are not using the message queue, then we have
- // to update the initial state explicitly ourselves.
-
- if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) {
- nni_msgq_set_cb(mq, cb, fd);
- }
-
- fd->sn_init = 1;
+ if (rv == 0) {
+ rv = nni_pollable_getfd(p, fdp);
}
- *fdp = fd->sn_rfd;
- return (0);
+ return (rv);
}
static int