summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-16 22:22:07 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-16 22:22:07 -0800
commit4e46e666e47e277316cc680c833356045932bb5f (patch)
treed6888f69674a93a3c0a82baafc268773c7bb2976
parent50e1484af0d443b46aa04fd4a8096b157dc160aa (diff)
downloadnng-4e46e666e47e277316cc680c833356045932bb5f.tar.gz
nng-4e46e666e47e277316cc680c833356045932bb5f.tar.bz2
nng-4e46e666e47e277316cc680c833356045932bb5f.zip
External event API for send/recv implemented.
This was the main blocker, I think, for the nanomsg legacy compat shim. Now that we have this, it should be relatively straight-forward to implement the legacy nanomsg API, including the SENDFD, RECVFD thing.
-rw-r--r--src/core/event.c32
-rw-r--r--src/core/event.h2
-rw-r--r--src/core/socket.c8
-rw-r--r--src/nng.c30
-rw-r--r--src/nng.h32
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/convey.c2
-rw-r--r--tests/event.c132
8 files changed, 219 insertions, 20 deletions
diff --git a/src/core/event.c b/src/core/event.c
index 1b264632..73bc1c00 100644
--- a/src/core/event.c
+++ b/src/core/event.c
@@ -95,7 +95,7 @@ nni_notifier(void *arg)
// No interest.
continue;
}
- notify->n_func(event, &notify->n_arg);
+ notify->n_func(event, notify->n_arg);
}
nni_mtx_unlock(&sock->s_notify_mx);
@@ -112,3 +112,33 @@ nni_notifier(void *arg)
}
nni_mtx_unlock(&sock->s_mx);
}
+
+
+nni_notify *
+nni_add_notify(nni_sock *sock, int mask, nng_notify_func fn, void *arg)
+{
+ nni_notify *notify;
+
+ if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
+ return (NULL);
+ }
+ notify->n_func = fn;
+ notify->n_arg = arg;
+ notify->n_mask = mask;
+ NNI_LIST_NODE_INIT(&notify->n_node);
+
+ nni_mtx_lock(&sock->s_notify_mx);
+ nni_list_append(&sock->s_notify, notify);
+ nni_mtx_unlock(&sock->s_notify_mx);
+ return (notify);
+}
+
+
+void
+nni_rem_notify(nni_sock *sock, nni_notify *notify)
+{
+ nni_mtx_lock(&sock->s_notify_mx);
+ nni_list_remove(&sock->s_notify, notify);
+ nni_mtx_unlock(&sock->s_notify_mx);
+ NNI_FREE_STRUCT(notify);
+}
diff --git a/src/core/event.h b/src/core/event.h
index 8b160a89..74d6fddb 100644
--- a/src/core/event.h
+++ b/src/core/event.h
@@ -37,5 +37,7 @@ extern int nni_ev_init(nni_event *, int, nni_sock *);
extern void nni_ev_fini(nni_event *);
extern void nni_ev_submit(nni_event *); // call holding sock lock
extern void nni_ev_wait(nni_event *); // call holding sock lock
+extern nni_notify *nni_add_notify(nni_sock *, int, nng_notify_func, void *);
+extern void nni_rem_notify(nni_sock *, nni_notify *);
#endif // CORE_EVENT_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 734e4da8..1e9fe11f 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -232,8 +232,12 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) ||
- ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) {
+ rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RECV, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+ rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SEND, sock);
+ if (rv != 0) {
goto fail;
}
diff --git a/src/nng.c b/src/nng.c
index ec221fb7..723b230e 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -144,6 +144,36 @@ nng_getopt(nng_socket *s, int opt, void *val, size_t *szp)
}
+nng_notify *
+nng_setnotify(nng_socket *sock, int mask, nng_notify_func fn, void *arg)
+{
+ NNI_INIT_VOID();
+ return (nni_add_notify(sock, mask, fn, arg));
+}
+
+
+void
+nng_unsetnotify(nng_socket *sock, nng_notify *notify)
+{
+ NNI_INIT_VOID();
+ nni_rem_notify(sock, notify);
+}
+
+
+nng_socket *
+nng_event_socket(nng_event *ev)
+{
+ return (ev->e_sock);
+}
+
+
+int
+nng_event_type(nng_event *ev)
+{
+ return (ev->e_type);
+}
+
+
// Misc.
const char *
nng_strerror(int num)
diff --git a/src/nng.h b/src/nng.h
index 58c2b774..8dcfb4de 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -99,27 +99,27 @@ NNG_DECL nng_notify *nng_setnotify(nng_socket *, int, nng_notify_func, void *);
// If the callback is running when this called, then it will wait until that
// callback completes. (The caller of this function should not hold any
// locks acqured by the callback, in order to avoid a deadlock.)
-NNG_DECL int nng_unsetnotify(nng_socket *, nng_notify *);
+NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *);
// Event types. Sockets can have multiple different kind of events.
// Note that these are edge triggered -- therefore the status indicated
// may have changed since the notification occurred.
//
-// NNG_EVENT_RECV - A message is ready for receive.
-// NNG_EVENT_SEND - A message can be sent.
-// NNG_EVENT_ERROR - An error condition on the socket occurred.
-// NNG_EVENT_PIPE_ADD - A new pipe (connection) is added to the socket.
-// NNG_EVENT_PIPE_REM - A pipe (connection) is removed from the socket.
-// NNG_EVENT_ENDPT_ADD - An endpoint is added to the socket.
-// NNG_EVENT_ENDPT_REM - An endpoint is removed from the socket.
-#define NNG_EVENT_BIT(x) (1U << (x))
-#define NNG_EVENT_RECV NNG_EVENT_BIT(0)
-#define NNG_EVENT_SEND NNG_EVENT_BIT(1)
-#define NNG_EVENT_ERROR NNG_EVENT_BIT(2)
-#define NNG_EVENT_PIPE_ADD NNG_EVENT_BIT(3)
-#define NNG_EVENT_PIPE_REM NNG_EVENT_BIT(4)
-#define NNG_EVENT_ENDPOINT_ADD NNG_EVENT_BIT(5)
-#define NNG_EVENT_ENDPOINT_REM NNG_EVENT_BIT(6)
+// NNG_EV_CAN_RECV - A message is ready for receive.
+// NNG_EV_CAN_SEND - A message can be sent.
+// NNG_EV_ERROR - An error condition on the socket occurred.
+// NNG_EV_PIPE_ADD - A new pipe (connection) is added to the socket.
+// NNG_EV_PIPE_REM - A pipe (connection) is removed from the socket.
+// NNG_EV_ENDPT_ADD - An endpoint is added to the socket.
+// NNG_EV_ENDPT_REM - An endpoint is removed from the socket.
+#define NNG_EV_BIT(x) (1U << (x))
+#define NNG_EV_CAN_RECV NNG_EV_BIT(0)
+#define NNG_EV_CAN_SEND NNG_EV_BIT(1)
+#define NNG_EV_ERROR NNG_EV_BIT(2)
+#define NNG_EV_PIPE_ADD NNG_EV_BIT(3)
+#define NNG_EV_PIPE_REM NNG_EV_BIT(4)
+#define NNG_EV_ENDPT_ADD NNG_EV_BIT(5)
+#define NNG_EV_ENDPT_REM NNG_EV_BIT(6)
// The following functions return more detailed information about the event.
// Some of the values will not make sense for some event types, in which case
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 9c711408..f6751566 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -56,6 +56,7 @@ else ()
endif ()
add_nng_test(bus 5)
+add_nng_test(event 5)
add_nng_test(idhash 5)
add_nng_test(inproc 5)
add_nng_test(ipc 5)
diff --git a/tests/convey.c b/tests/convey.c
index 62471a2c..d1691665 100644
--- a/tests/convey.c
+++ b/tests/convey.c
@@ -705,7 +705,7 @@ convey_vlogf(struct convey_log *log, const char *fmt, va_list va, int addnl)
{
/* Grow the log buffer if we need to */
while ((log->log_size - log->log_length) < 256) {
- int newsz = log->log_size + 2000;
+ size_t newsz = log->log_size + 2000;
char *ptr = malloc(newsz);
if (ptr == NULL) {
return;
diff --git a/tests/event.c b/tests/event.c
new file mode 100644
index 00000000..766ef3ad
--- /dev/null
+++ b/tests/event.c
@@ -0,0 +1,132 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+#include "core/nng_impl.h"
+#include <string.h>
+
+#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
+#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
+ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
+struct evcnt {
+ nng_socket *sock;
+ int readable;
+ int writeable;
+ int pipeadd;
+ int piperem;
+ int epadd;
+ int eprem;
+ int err;
+};
+
+void
+bump(nng_event *ev, void *arg)
+{
+ struct evcnt *cnt = arg;
+
+ if (nng_event_socket(ev) != cnt->sock) {
+ nni_panic("Incorrect socket! %p != %p",
+ nng_event_socket(ev), cnt->sock);
+ }
+ switch (nng_event_type(ev)) {
+ case NNG_EV_CAN_SEND:
+ cnt->writeable++;
+ break;
+
+ case NNG_EV_CAN_RECV:
+ cnt->readable++;
+ break;
+
+ case NNG_EV_PIPE_ADD:
+ cnt->pipeadd++;
+ break;
+
+ case NNG_EV_PIPE_REM:
+ cnt->piperem++;
+ break;
+
+ case NNG_EV_ENDPT_ADD:
+ cnt->epadd++;
+ break;
+
+ case NNG_EV_ENDPT_REM:
+ cnt->eprem++;
+ break;
+
+ default:
+ nni_panic("Invalid event type %d", nng_event_type(ev));
+ break;
+ }
+}
+
+Main({
+ const char *addr = "inproc://test";
+
+ Test("Event Handling", {
+ Convey("Given a connected pair of pair sockets", {
+ nng_socket *sock1;
+ nng_socket *sock2;
+ struct evcnt evcnt1;
+ struct evcnt evcnt2;
+ nng_notify *notify1;
+ nng_notify *notify2;
+
+ So(nng_open(&sock1, NNG_PROTO_PAIR) == 0);
+ So(nng_open(&sock2, NNG_PROTO_PAIR) == 0);
+
+ memset(&evcnt1, 0, sizeof (evcnt1));
+ memset(&evcnt2, 0, sizeof (evcnt2));
+ evcnt1.sock = sock1;
+ evcnt2.sock = sock2;
+
+ Reset({
+ nng_close(sock1);
+ nng_close(sock2);
+ })
+
+ So(nng_listen(sock1, addr, NULL, NNG_FLAG_SYNCH) == 0);
+ So(nng_dial(sock2, addr, NULL, NNG_FLAG_SYNCH) == 0);
+
+ // Let everything connect.
+ nni_usleep(100000);
+
+ Convey("We can register callbacks", {
+ So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SEND, bump, &evcnt1)) != NULL);
+ So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RECV, bump, &evcnt2)) != NULL);
+
+ Convey("They are called", {
+ nni_msg *msg;
+
+ So(nni_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "abc");
+
+ So(nng_sendmsg(sock1, msg, 0) == 0);
+ So(nng_recvmsg(sock2, &msg, 0) == 0);
+
+ CHECKSTR(msg, "abc");
+ nni_msg_free(msg);
+
+ // The notify runs async...
+ nni_usleep(100000);
+
+ So(evcnt1.writeable == 1);
+ So(evcnt2.readable == 1);
+ })
+
+ Convey("We can unregister them", {
+ nng_unsetnotify(sock1, notify1);
+ So(1);
+ nng_unsetnotify(sock2, notify2);
+ So(1);
+ })
+ })
+ })
+ })
+})