aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/event.c27
-rw-r--r--src/core/event.h35
-rw-r--r--src/core/msgqueue.c74
-rw-r--r--src/core/msgqueue.h24
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/options.c61
-rw-r--r--src/core/options.h3
-rw-r--r--src/core/socket.c157
-rw-r--r--src/core/socket.h3
9 files changed, 139 insertions, 246 deletions
diff --git a/src/core/event.c b/src/core/event.c
deleted file mode 100644
index ab157b02..00000000
--- a/src/core/event.c
+++ /dev/null
@@ -1,27 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#include "core/nng_impl.h"
-
-#include <stdlib.h>
-#include <string.h>
-
-void
-nni_ev_init(nni_event *event, int type, nni_sock *sock)
-{
- memset(event, 0, sizeof(*event));
- event->e_type = type;
- event->e_sock = sock;
-}
-
-void
-nni_ev_fini(nni_event *event)
-{
- NNI_ARG_UNUSED(event);
-}
diff --git a/src/core/event.h b/src/core/event.h
deleted file mode 100644
index 9536d206..00000000
--- a/src/core/event.h
+++ /dev/null
@@ -1,35 +0,0 @@
-//
-// Copyright 2017 Garrett D'Amore <garrett@damore.org>
-// Copyright 2017 Capitar IT Group BV <info@capitar.com>
-//
-// This software is supplied under the terms of the MIT License, a
-// copy of which should be located in the distribution where this
-// file was obtained (LICENSE.txt). A copy of the license may also be
-// found online at https://opensource.org/licenses/MIT.
-//
-
-#ifndef CORE_EVENT_H
-#define CORE_EVENT_H
-
-#include "core/defs.h"
-#include "core/list.h"
-
-struct nng_event {
- int e_type;
- nni_sock *e_sock;
- nni_ep * e_ep;
- nni_pipe *e_pipe;
-};
-
-struct nng_notify {
- nng_notify_func n_func;
- void * n_arg;
- int n_type;
- nni_sock * n_sock;
- nni_aio * n_aio;
-};
-
-extern void nni_ev_init(nni_event *, int, nni_sock *);
-extern void nni_ev_fini(nni_event *);
-
-#endif // CORE_EVENT_H
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index c1f3701c..3f1ffcb5 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -32,8 +32,12 @@ struct nni_msgq {
nni_list mq_aio_putq;
nni_list mq_aio_getq;
- nni_list mq_aio_notify_get;
- nni_list mq_aio_notify_put;
+
+ // Callback - this function is executed with the lock held, and
+ // provides information about the current queue state anytime
+ // a message enters or leaves the queue, or a waiter is blocked.
+ nni_msgq_cb mq_cb_fn;
+ void * mq_cb_arg;
// Filters.
nni_msgq_filter mq_filter_fn;
@@ -63,9 +67,6 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap)
nni_aio_list_init(&mq->mq_aio_putq);
nni_aio_list_init(&mq->mq_aio_getq);
- nni_aio_list_init(&mq->mq_aio_notify_get);
- nni_aio_list_init(&mq->mq_aio_notify_put);
-
nni_mtx_init(&mq->mq_lock);
nni_cv_init(&mq->mq_drained, &mq->mq_lock);
@@ -297,22 +298,25 @@ static void
nni_msgq_run_notify(nni_msgq *mq)
{
nni_aio *aio;
+ if (mq->mq_cb_fn != NULL) {
+ int flags = 0;
- if (mq->mq_closed) {
- return;
- }
- if ((mq->mq_len < mq->mq_cap) || !nni_list_empty(&mq->mq_aio_getq)) {
- NNI_LIST_FOREACH (&mq->mq_aio_notify_put, aio) {
- // This stays on the list.
- nni_aio_finish(aio, 0, 0);
+ if (mq->mq_closed) {
+ flags |= nni_msgq_f_closed;
}
- }
-
- if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
- NNI_LIST_FOREACH (&mq->mq_aio_notify_get, aio) {
- // This stays on the list.
- nni_aio_finish(aio, 0, 0);
+ if (mq->mq_len == 0) {
+ flags |= nni_msgq_f_empty;
+ } else if (mq->mq_len == mq->mq_cap) {
+ flags |= nni_msgq_f_full;
+ }
+ if (mq->mq_len < mq->mq_cap ||
+ !nni_list_empty(&mq->mq_aio_getq)) {
+ flags |= nni_msgq_f_can_put;
+ }
+ if ((mq->mq_len != 0) || !nni_list_empty(&mq->mq_aio_putq)) {
+ flags |= nni_msgq_f_can_get;
}
+ mq->mq_cb_fn(mq->mq_cb_arg, flags);
}
if (mq->mq_draining) {
@@ -322,6 +326,16 @@ nni_msgq_run_notify(nni_msgq *mq)
}
}
+void
+nni_msgq_set_cb(nni_msgq *mq, nni_msgq_cb fn, void *arg)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_cb_fn = fn;
+ mq->mq_cb_arg = arg;
+ nni_msgq_run_notify(mq);
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
static void
nni_msgq_cancel(nni_aio *aio, int rv)
{
@@ -336,30 +350,6 @@ nni_msgq_cancel(nni_aio *aio, int rv)
}
void
-nni_msgq_aio_notify_put(nni_msgq *mq, nni_aio *aio)
-{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- nni_aio_list_append(&mq->mq_aio_notify_put, aio);
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
-nni_msgq_aio_notify_get(nni_msgq *mq, nni_aio *aio)
-{
- nni_mtx_lock(&mq->mq_lock);
- if (nni_aio_start(aio, nni_msgq_cancel, mq) != 0) {
- nni_mtx_unlock(&mq->mq_lock);
- return;
- }
- nni_aio_list_append(&mq->mq_aio_notify_get, aio);
- nni_mtx_unlock(&mq->mq_lock);
-}
-
-void
nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
{
nni_mtx_lock(&mq->mq_lock);
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index bbac505d..ececf372 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -38,8 +38,6 @@ extern void nni_msgq_fini(nni_msgq *);
extern void nni_msgq_aio_put(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
-extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
-extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
// nni_msgq_tryput performs a non-blocking attempt to put a message on
// the message queue. It is the same as calling nng_msgq_put_until with
@@ -83,6 +81,28 @@ typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);
// discarded instead, and any get waiters remain waiting.
extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);
+// nni_msgq_cb_flags is an enumeration of flag bits used with nni_msgq_cb.
+enum nni_msgq_cb_flags {
+ nni_msgq_f_full = 1,
+ nni_msgq_f_empty = 2,
+ nni_msgq_f_can_get = 4,
+ nni_msgq_f_can_put = 8,
+ nni_msgq_f_closed = 16,
+};
+
+// nni_msgq_cb is a callback function used by sockets to monitor
+// the status of the queue. It is called with the lock held for
+// performance reasons so consumers must not re-enter the queue.
+// The purpose is to enable file descriptor notifications on the socket,
+// which don't need to reenter the msgq. The integer is a mask of
+// flags that are true for the given message queue.
+typedef void (*nni_msgq_cb)(void *, int);
+
+// nni_msgq_set_cb sets the callback and argument for the callback
+// which will be called on state changes in the message queue. Only
+// one callback can be registered on a message queue at a time.
+extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *);
+
// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 905411d7..87dd2de1 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -46,7 +46,6 @@
// These have to come after the others - particularly transport.h
#include "core/endpt.h"
-#include "core/event.h"
#include "core/pipe.h"
#include "core/socket.h"
diff --git a/src/core/options.c b/src/core/options.c
index 3b787b82..1417d0b3 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -220,63 +220,4 @@ nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep)
memcpy(val, &len, sz);
*sizep = sizeof(len);
return (0);
-}
-
-static void
-nni_notifyfd_push(struct nng_event *ev, void *arg)
-{
- nni_notifyfd *fd = arg;
-
- NNI_ARG_UNUSED(ev);
-
- nni_plat_pipe_raise(fd->sn_wfd);
-}
-
-int
-nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
-{
- int rv;
- uint32_t flags;
-
- if ((*szp < sizeof(int))) {
- return (NNG_EINVAL);
- }
-
- flags = nni_sock_flags(s);
-
- switch (mask) {
- case NNG_EV_CAN_SND:
- if ((flags & NNI_PROTO_FLAG_SND) == 0) {
- return (NNG_ENOTSUP);
- }
- break;
- case NNG_EV_CAN_RCV:
- if ((flags & NNI_PROTO_FLAG_RCV) == 0) {
- return (NNG_ENOTSUP);
- }
- break;
- default:
- return (NNG_ENOTSUP);
- }
-
- // If we already inited this, just give back the same file descriptor.
- if (fd->sn_init) {
- memcpy(val, &fd->sn_rfd, sizeof(int));
- *szp = sizeof(int);
- return (0);
- }
-
- if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
- return (rv);
- }
-
- if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
- nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
- return (NNG_ENOMEM);
- }
-
- fd->sn_init = 1;
- *szp = sizeof(int);
- memcpy(val, &fd->sn_rfd, sizeof(int));
- return (0);
-}
+} \ No newline at end of file
diff --git a/src/core/options.h b/src/core/options.h
index beeca951..d373851f 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -58,9 +58,6 @@ extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t);
// nni_getopt_size obtains a size_t option.
extern int nni_getopt_size(size_t, void *, size_t *);
-// nni_getopt_fd obtains a notification file descriptor.
-extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *);
-
extern int nni_chkopt_ms(const void *, size_t);
extern int nni_chkopt_int(const void *, size_t, int, int);
extern int nni_chkopt_size(const void *, size_t, size_t, size_t);
diff --git a/src/core/socket.c b/src/core/socket.c
index bc1f446d..dfa49317 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -70,23 +70,102 @@ struct nni_socket {
int s_closing; // Socket is closing
int s_closed; // Socket closed, protected by global lock
- nni_event s_recv_ev; // Event for readability
- nni_event s_send_ev; // Event for sendability
-
nni_notifyfd s_send_fd;
nni_notifyfd s_recv_fd;
};
+static void
+nni_sock_can_send_cb(void *arg, int flags)
+{
+ nni_notifyfd *fd = arg;
+
+ if ((flags & nni_msgq_f_can_put) == 0) {
+ nni_plat_pipe_clear(fd->sn_rfd);
+ } else {
+ nni_plat_pipe_raise(fd->sn_wfd);
+ }
+}
+
+static void
+nni_sock_can_recv_cb(void *arg, int flags)
+{
+ nni_notifyfd *fd = arg;
+
+ if ((flags & nni_msgq_f_can_get) == 0) {
+ nni_plat_pipe_clear(fd->sn_rfd);
+ } else {
+ nni_plat_pipe_raise(fd->sn_wfd);
+ }
+}
+
+static int
+nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
+{
+ int rv;
+ uint32_t flags;
+ nni_notifyfd *fd;
+ nni_msgq * mq;
+ nni_msgq_cb cb;
+
+ if ((*szp < sizeof(int))) {
+ return (NNG_EINVAL);
+ }
+
+ if ((flag & nni_sock_flags(s)) == 0) {
+ return (NNG_ENOTSUP);
+ }
+
+ switch (flag) {
+ case NNI_PROTO_FLAG_SND:
+ fd = &s->s_send_fd;
+ mq = s->s_uwq;
+ cb = nni_sock_can_send_cb;
+ break;
+ case NNI_PROTO_FLAG_RCV:
+ fd = &s->s_recv_fd;
+ mq = s->s_urq;
+ cb = nni_sock_can_recv_cb;
+ break;
+ default:
+ nni_panic("default case!");
+ }
+
+ // If we already inited this, just give back the same file descriptor.
+ if (fd->sn_init) {
+ memcpy(val, &fd->sn_rfd, sizeof(int));
+ *szp = sizeof(int);
+ return (0);
+ }
+
+ if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
+ return (rv);
+ }
+
+ nni_msgq_set_cb(mq, cb, fd);
+
+#if 0
+ if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
+ nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
+ return (NNG_ENOMEM);
+ }
+#endif
+
+ fd->sn_init = 1;
+ *szp = sizeof(int);
+ memcpy(val, &fd->sn_rfd, sizeof(int));
+ return (0);
+}
+
static int
nni_sock_getopt_sendfd(nni_sock *s, void *buf, size_t *szp)
{
- return (nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, buf, szp));
+ return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_SND, buf, szp));
}
static int
nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp)
{
- return (nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, buf, szp));
+ return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_RCV, buf, szp));
}
static int
@@ -387,70 +466,6 @@ nni_sock_recv_pending(nni_sock *sock)
}
static void
-nni_sock_cansend_cb(void *arg)
-{
- nni_notify *notify = arg;
- nni_sock * sock = notify->n_sock;
-
- if (nni_aio_result(notify->n_aio) != 0) {
- return;
- }
-
- notify->n_func(&sock->s_send_ev, notify->n_arg);
-}
-
-static void
-nni_sock_canrecv_cb(void *arg)
-{
- nni_notify *notify = arg;
- nni_sock * sock = notify->n_sock;
-
- if (nni_aio_result(notify->n_aio) != 0) {
- return;
- }
-
- notify->n_func(&sock->s_recv_ev, notify->n_arg);
-}
-
-nni_notify *
-nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
-{
- nni_notify *notify;
-
- if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
- return (NULL);
- }
-
- notify->n_func = fn;
- notify->n_arg = arg;
- notify->n_type = type;
- notify->n_sock = sock;
-
- switch (type) {
- case NNG_EV_CAN_RCV:
- nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio);
- break;
- case NNG_EV_CAN_SND:
- nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio);
- break;
- default:
- NNI_FREE_STRUCT(notify);
- return (NULL);
- }
-
- return (notify);
-}
-
-void
-nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
-{
- nni_aio_fini(notify->n_aio);
- NNI_FREE_STRUCT(notify);
-}
-
-static void
nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
@@ -481,8 +496,6 @@ nni_sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
- nni_ev_fini(&s->s_send_ev);
- nni_ev_fini(&s->s_recv_ev);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -530,8 +543,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_mx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &nni_sock_lk);
- nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s);
- nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s);
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
diff --git a/src/core/socket.h b/src/core/socket.h
index edfe6447..6d9622e3 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -30,9 +30,6 @@ extern void nni_sock_send(nni_sock *, nni_aio *);
extern void nni_sock_recv(nni_sock *, nni_aio *);
extern uint32_t nni_sock_id(nni_sock *);
-extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
-extern void nni_sock_unnotify(nni_sock *, nni_notify *);
-
// nni_sock_pipe_add adds the pipe to the socket. It is called by
// the generic pipe creation code. It also adds the socket to the
// ep list, and starts the pipe. It does all these to ensure that