diff options
| -rw-r--r-- | src/core/event.c | 32 | ||||
| -rw-r--r-- | src/core/event.h | 2 | ||||
| -rw-r--r-- | src/core/socket.c | 8 | ||||
| -rw-r--r-- | src/nng.c | 30 | ||||
| -rw-r--r-- | src/nng.h | 32 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/convey.c | 2 | ||||
| -rw-r--r-- | tests/event.c | 132 |
8 files changed, 219 insertions, 20 deletions
diff --git a/src/core/event.c b/src/core/event.c index 1b264632..73bc1c00 100644 --- a/src/core/event.c +++ b/src/core/event.c @@ -95,7 +95,7 @@ nni_notifier(void *arg) // No interest. continue; } - notify->n_func(event, ¬ify->n_arg); + notify->n_func(event, notify->n_arg); } nni_mtx_unlock(&sock->s_notify_mx); @@ -112,3 +112,33 @@ nni_notifier(void *arg) } nni_mtx_unlock(&sock->s_mx); } + + +nni_notify * +nni_add_notify(nni_sock *sock, int mask, nng_notify_func fn, void *arg) +{ + nni_notify *notify; + + if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) { + return (NULL); + } + notify->n_func = fn; + notify->n_arg = arg; + notify->n_mask = mask; + NNI_LIST_NODE_INIT(¬ify->n_node); + + nni_mtx_lock(&sock->s_notify_mx); + nni_list_append(&sock->s_notify, notify); + nni_mtx_unlock(&sock->s_notify_mx); + return (notify); +} + + +void +nni_rem_notify(nni_sock *sock, nni_notify *notify) +{ + nni_mtx_lock(&sock->s_notify_mx); + nni_list_remove(&sock->s_notify, notify); + nni_mtx_unlock(&sock->s_notify_mx); + NNI_FREE_STRUCT(notify); +} diff --git a/src/core/event.h b/src/core/event.h index 8b160a89..74d6fddb 100644 --- a/src/core/event.h +++ b/src/core/event.h @@ -37,5 +37,7 @@ extern int nni_ev_init(nni_event *, int, nni_sock *); extern void nni_ev_fini(nni_event *); extern void nni_ev_submit(nni_event *); // call holding sock lock extern void nni_ev_wait(nni_event *); // call holding sock lock +extern nni_notify *nni_add_notify(nni_sock *, int, nng_notify_func, void *); +extern void nni_rem_notify(nni_sock *, nni_notify *); #endif // CORE_EVENT_H diff --git a/src/core/socket.c b/src/core/socket.c index 734e4da8..1e9fe11f 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -232,8 +232,12 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } - if (((rv = nni_ev_init(&sock->s_recv_ev, NNG_EVENT_RECV, sock)) != 0) || - ((rv = nni_ev_init(&sock->s_send_ev, NNG_EVENT_SEND, sock)) != 0)) { + rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RECV, sock); + if (rv != 0) { + goto fail; + } + rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SEND, sock); + if (rv != 0) { goto fail; } @@ -144,6 +144,36 @@ nng_getopt(nng_socket *s, int opt, void *val, size_t *szp) } +nng_notify * +nng_setnotify(nng_socket *sock, int mask, nng_notify_func fn, void *arg) +{ + NNI_INIT_VOID(); + return (nni_add_notify(sock, mask, fn, arg)); +} + + +void +nng_unsetnotify(nng_socket *sock, nng_notify *notify) +{ + NNI_INIT_VOID(); + nni_rem_notify(sock, notify); +} + + +nng_socket * +nng_event_socket(nng_event *ev) +{ + return (ev->e_sock); +} + + +int +nng_event_type(nng_event *ev) +{ + return (ev->e_type); +} + + // Misc. const char * nng_strerror(int num) @@ -99,27 +99,27 @@ NNG_DECL nng_notify *nng_setnotify(nng_socket *, int, nng_notify_func, void *); // If the callback is running when this called, then it will wait until that // callback completes. (The caller of this function should not hold any // locks acqured by the callback, in order to avoid a deadlock.) -NNG_DECL int nng_unsetnotify(nng_socket *, nng_notify *); +NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *); // Event types. Sockets can have multiple different kind of events. // Note that these are edge triggered -- therefore the status indicated // may have changed since the notification occurred. // -// NNG_EVENT_RECV - A message is ready for receive. -// NNG_EVENT_SEND - A message can be sent. -// NNG_EVENT_ERROR - An error condition on the socket occurred. -// NNG_EVENT_PIPE_ADD - A new pipe (connection) is added to the socket. -// NNG_EVENT_PIPE_REM - A pipe (connection) is removed from the socket. -// NNG_EVENT_ENDPT_ADD - An endpoint is added to the socket. -// NNG_EVENT_ENDPT_REM - An endpoint is removed from the socket. -#define NNG_EVENT_BIT(x) (1U << (x)) -#define NNG_EVENT_RECV NNG_EVENT_BIT(0) -#define NNG_EVENT_SEND NNG_EVENT_BIT(1) -#define NNG_EVENT_ERROR NNG_EVENT_BIT(2) -#define NNG_EVENT_PIPE_ADD NNG_EVENT_BIT(3) -#define NNG_EVENT_PIPE_REM NNG_EVENT_BIT(4) -#define NNG_EVENT_ENDPOINT_ADD NNG_EVENT_BIT(5) -#define NNG_EVENT_ENDPOINT_REM NNG_EVENT_BIT(6) +// NNG_EV_CAN_RECV - A message is ready for receive. +// NNG_EV_CAN_SEND - A message can be sent. +// NNG_EV_ERROR - An error condition on the socket occurred. +// NNG_EV_PIPE_ADD - A new pipe (connection) is added to the socket. +// NNG_EV_PIPE_REM - A pipe (connection) is removed from the socket. +// NNG_EV_ENDPT_ADD - An endpoint is added to the socket. +// NNG_EV_ENDPT_REM - An endpoint is removed from the socket. +#define NNG_EV_BIT(x) (1U << (x)) +#define NNG_EV_CAN_RECV NNG_EV_BIT(0) +#define NNG_EV_CAN_SEND NNG_EV_BIT(1) +#define NNG_EV_ERROR NNG_EV_BIT(2) +#define NNG_EV_PIPE_ADD NNG_EV_BIT(3) +#define NNG_EV_PIPE_REM NNG_EV_BIT(4) +#define NNG_EV_ENDPT_ADD NNG_EV_BIT(5) +#define NNG_EV_ENDPT_REM NNG_EV_BIT(6) // The following functions return more detailed information about the event. // Some of the values will not make sense for some event types, in which case diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9c711408..f6751566 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -56,6 +56,7 @@ else () endif () add_nng_test(bus 5) +add_nng_test(event 5) add_nng_test(idhash 5) add_nng_test(inproc 5) add_nng_test(ipc 5) diff --git a/tests/convey.c b/tests/convey.c index 62471a2c..d1691665 100644 --- a/tests/convey.c +++ b/tests/convey.c @@ -705,7 +705,7 @@ convey_vlogf(struct convey_log *log, const char *fmt, va_list va, int addnl) { /* Grow the log buffer if we need to */ while ((log->log_size - log->log_length) < 256) { - int newsz = log->log_size + 2000; + size_t newsz = log->log_size + 2000; char *ptr = malloc(newsz); if (ptr == NULL) { return; diff --git a/tests/event.c b/tests/event.c new file mode 100644 index 00000000..766ef3ad --- /dev/null +++ b/tests/event.c @@ -0,0 +1,132 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "core/nng_impl.h" +#include <string.h> + +#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s)) +#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\ + So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) +struct evcnt { + nng_socket *sock; + int readable; + int writeable; + int pipeadd; + int piperem; + int epadd; + int eprem; + int err; +}; + +void +bump(nng_event *ev, void *arg) +{ + struct evcnt *cnt = arg; + + if (nng_event_socket(ev) != cnt->sock) { + nni_panic("Incorrect socket! %p != %p", + nng_event_socket(ev), cnt->sock); + } + switch (nng_event_type(ev)) { + case NNG_EV_CAN_SEND: + cnt->writeable++; + break; + + case NNG_EV_CAN_RECV: + cnt->readable++; + break; + + case NNG_EV_PIPE_ADD: + cnt->pipeadd++; + break; + + case NNG_EV_PIPE_REM: + cnt->piperem++; + break; + + case NNG_EV_ENDPT_ADD: + cnt->epadd++; + break; + + case NNG_EV_ENDPT_REM: + cnt->eprem++; + break; + + default: + nni_panic("Invalid event type %d", nng_event_type(ev)); + break; + } +} + +Main({ + const char *addr = "inproc://test"; + + Test("Event Handling", { + Convey("Given a connected pair of pair sockets", { + nng_socket *sock1; + nng_socket *sock2; + struct evcnt evcnt1; + struct evcnt evcnt2; + nng_notify *notify1; + nng_notify *notify2; + + So(nng_open(&sock1, NNG_PROTO_PAIR) == 0); + So(nng_open(&sock2, NNG_PROTO_PAIR) == 0); + + memset(&evcnt1, 0, sizeof (evcnt1)); + memset(&evcnt2, 0, sizeof (evcnt2)); + evcnt1.sock = sock1; + evcnt2.sock = sock2; + + Reset({ + nng_close(sock1); + nng_close(sock2); + }) + + So(nng_listen(sock1, addr, NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial(sock2, addr, NULL, NNG_FLAG_SYNCH) == 0); + + // Let everything connect. + nni_usleep(100000); + + Convey("We can register callbacks", { + So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SEND, bump, &evcnt1)) != NULL); + So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RECV, bump, &evcnt2)) != NULL); + + Convey("They are called", { + nni_msg *msg; + + So(nni_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "abc"); + + So(nng_sendmsg(sock1, msg, 0) == 0); + So(nng_recvmsg(sock2, &msg, 0) == 0); + + CHECKSTR(msg, "abc"); + nni_msg_free(msg); + + // The notify runs async... + nni_usleep(100000); + + So(evcnt1.writeable == 1); + So(evcnt2.readable == 1); + }) + + Convey("We can unregister them", { + nng_unsetnotify(sock1, notify1); + So(1); + nng_unsetnotify(sock2, notify2); + So(1); + }) + }) + }) + }) +}) |
