summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-22 02:32:32 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-22 02:32:32 -0800
commitb93d5759c9b39ff153a14d474d800cd981f7dc97 (patch)
tree1a98b7ac74cd91003c38f53ae3eb01fb8027deef
parent769f9a2b66aca629eb4dd240a072849a48aa300f (diff)
downloadnng-b93d5759c9b39ff153a14d474d800cd981f7dc97.tar.gz
nng-b93d5759c9b39ff153a14d474d800cd981f7dc97.tar.bz2
nng-b93d5759c9b39ff153a14d474d800cd981f7dc97.zip
Event notification via pollable FDs verified working.
-rw-r--r--perf/perf.c2
-rw-r--r--src/core/msgqueue.c139
-rw-r--r--src/core/msgqueue.h2
-rw-r--r--src/core/options.c8
-rw-r--r--src/core/protocol.h6
-rw-r--r--src/core/socket.c26
-rw-r--r--src/nng.h12
-rw-r--r--src/platform/posix/posix_pipe.c7
-rw-r--r--src/platform/windows/win_pipe.c8
-rw-r--r--src/protocol/bus/bus.c2
-rw-r--r--src/protocol/pair/pair.c2
-rw-r--r--src/protocol/pipeline/pull.c2
-rw-r--r--src/protocol/pipeline/push.c2
-rw-r--r--src/protocol/pubsub/pub.c2
-rw-r--r--src/protocol/pubsub/sub.c2
-rw-r--r--src/protocol/reqrep/rep.c2
-rw-r--r--src/protocol/reqrep/req.c2
-rw-r--r--src/protocol/survey/respond.c2
-rw-r--r--src/protocol/survey/survey.c2
-rw-r--r--tests/event.c21
-rw-r--r--tests/pollfd.c73
21 files changed, 223 insertions, 101 deletions
diff --git a/perf/perf.c b/perf/perf.c
index 419d729f..e0526282 100644
--- a/perf/perf.c
+++ b/perf/perf.c
@@ -225,6 +225,8 @@ do_inproc_lat(int argc, char **argv)
ia.count = parse_int(argv[1], "count");
ia.func = latency_server;
+ // Sleep a bit.
+ nng_usleep(100000);
if ((rv = nni_thr_init(&thr, do_inproc, &ia)) != 0) {
die("Cannot create thread: %s", nng_strerror(rv));
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 3ec194fc..09f58a33 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -31,10 +31,61 @@ struct nni_msgq {
int mq_wwait;
nni_msg ** mq_msgs;
+ int mq_notify_sig;
+ nni_cv mq_notify_cv;
+ nni_thr mq_notify_thr;
nni_msgq_notify_fn mq_notify_fn;
void * mq_notify_arg;
};
+
+// nni_msgq_notifier thread runs if events callbacks are registered on the
+// message queue, and calls the notification callbacks outside of the
+// lock. It looks at the actual msgq state to trigger the right events.
+static void
+nni_msgq_notifier(void *arg)
+{
+ nni_msgq *mq = arg;
+ int sig;
+ nni_msgq_notify_fn fn;
+ void *fnarg;
+
+ for (;;) {
+ nni_mtx_lock(&mq->mq_lock);
+ while ((mq->mq_notify_sig == 0) && (!mq->mq_closed)) {
+ nni_cv_wait(&mq->mq_notify_cv);
+ }
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ break;
+ }
+
+ sig = mq->mq_notify_sig;
+ mq->mq_notify_sig = 0;
+
+ fn = mq->mq_notify_fn;
+ fnarg = mq->mq_notify_arg;
+ nni_mtx_unlock(&mq->mq_lock);
+
+ if (fn != NULL) {
+ fn(mq, sig, fnarg);
+ }
+ }
+}
+
+
+// nni_msgq_kick kicks the msgq notification thread. It should be called
+// with the lock held.
+static void
+nni_msgq_kick(nni_msgq *mq, int sig)
+{
+ if (mq->mq_notify_fn != NULL) {
+ mq->mq_notify_sig |= sig;
+ nni_cv_wake(&mq->mq_notify_cv);
+ }
+}
+
+
int
nni_msgq_init(nni_msgq **mqp, int cap)
{
@@ -59,13 +110,10 @@ nni_msgq_init(nni_msgq **mqp, int cap)
if ((rv = nni_mtx_init(&mq->mq_lock)) != 0) {
goto fail;
}
- if ((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) {
- goto fail;
- }
- if ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ if (((rv = nni_cv_init(&mq->mq_readable, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_writeable, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_drained, &mq->mq_lock)) != 0) ||
+ ((rv = nni_cv_init(&mq->mq_notify_cv, &mq->mq_lock)) != 0)) {
goto fail;
}
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * alloc)) == NULL) {
@@ -110,6 +158,7 @@ nni_msgq_fini(nni_msgq *mq)
if (mq == NULL) {
return;
}
+ nni_thr_fini(&mq->mq_notify_thr);
nni_cv_fini(&mq->mq_drained);
nni_cv_fini(&mq->mq_writeable);
nni_cv_fini(&mq->mq_readable);
@@ -131,13 +180,24 @@ nni_msgq_fini(nni_msgq *mq)
}
-void
+int
nni_msgq_notify(nni_msgq *mq, nni_msgq_notify_fn fn, void *arg)
{
+ int rv;
+
+ nni_thr_fini(&mq->mq_notify_thr);
+
nni_mtx_lock(&mq->mq_lock);
+ rv = nni_thr_init(&mq->mq_notify_thr, nni_msgq_notifier, mq);
+ if (rv != 0) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (rv);
+ }
mq->mq_notify_fn = fn;
mq->mq_notify_arg = arg;
+ nni_thr_run(&mq->mq_notify_thr);
nni_mtx_unlock(&mq->mq_lock);
+ return (0);
}
@@ -197,6 +257,7 @@ nni_msgq_signal(nni_msgq *mq, int *signal)
mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_readable);
nni_cv_wake(&mq->mq_writeable);
+ nni_cv_wake(&mq->mq_notify_cv);
nni_mtx_unlock(&mq->mq_lock);
}
@@ -205,9 +266,6 @@ int
nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
{
int rv;
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -247,8 +305,16 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
return (NNG_EAGAIN);
}
- // not writeable, so wait until something changes
+ // waiting....
mq->mq_wwait = 1;
+
+ // if we are unbuffered, kick the notifier, because we're
+ // writable.
+ if (mq->mq_cap == 0) {
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
+ }
+
+ // not writeable, so wait until something changes
rv = nni_cv_until(&mq->mq_writeable, expire);
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
@@ -267,18 +333,12 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_readable);
}
- notify = NNI_MSGQ_NOTIFY_CANGET;
if (mq->mq_len < mq->mq_cap) {
- notify |= NNI_MSGQ_NOTIFY_CANPUT;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
}
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
nni_mtx_unlock(&mq->mq_lock);
- // The notify callback is executed outside of the lock.
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
return (0);
}
@@ -289,10 +349,6 @@ nni_msgq_put_(nni_msgq *mq, nni_msg *msg, nni_time expire, nni_signal *sig)
int
nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
{
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
-
nni_mtx_lock(&mq->mq_lock);
// if closed, we don't put more... this check is first!
@@ -319,18 +375,9 @@ nni_msgq_putback(nni_msgq *mq, nni_msg *msg)
nni_cv_wake(&mq->mq_readable);
}
- notify = NNI_MSGQ_NOTIFY_CANGET;
- if (mq->mq_len < mq->mq_cap) {
- notify |= NNI_MSGQ_NOTIFY_CANPUT;
- }
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
nni_mtx_unlock(&mq->mq_lock);
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
-
return (0);
}
@@ -339,9 +386,6 @@ static int
nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
{
int rv;
- int notify = 0;
- nni_msgq_notify_fn fn;
- void *arg;
nni_mtx_lock(&mq->mq_lock);
@@ -372,6 +416,13 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
nni_cv_wake(&mq->mq_writeable);
}
mq->mq_rwait = 1;
+
+ if (mq->mq_cap == 0) {
+ // If unbuffered, kick it since a writer would not
+ // block.
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
+ }
+
rv = nni_cv_until(&mq->mq_readable, expire);
if (rv == NNG_ETIMEDOUT) {
nni_mtx_unlock(&mq->mq_lock);
@@ -391,18 +442,12 @@ nni_msgq_get_(nni_msgq *mq, nni_msg **msgp, nni_time expire, nni_signal *sig)
mq->mq_wwait = 0;
nni_cv_wake(&mq->mq_writeable);
}
- notify = NNI_MSGQ_NOTIFY_CANPUT;
- if (mq->mq_len > 0) {
- notify |= NNI_MSGQ_NOTIFY_CANGET;
+ if (mq->mq_len) {
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANGET);
}
- fn = mq->mq_notify_fn;
- arg = mq->mq_notify_arg;
+ nni_msgq_kick(mq, NNI_MSGQ_NOTIFY_CANPUT);
nni_mtx_unlock(&mq->mq_lock);
- if (fn != NULL) {
- fn(mq, notify, arg);
- }
-
return (0);
}
@@ -475,6 +520,7 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_notify_cv);
while (mq->mq_len > 0) {
if (nni_cv_until(&mq->mq_drained, expire) != 0) {
break;
@@ -502,6 +548,7 @@ nni_msgq_close(nni_msgq *mq)
mq->mq_rwait = 0;
nni_cv_wake(&mq->mq_writeable);
nni_cv_wake(&mq->mq_readable);
+ nni_cv_wake(&mq->mq_notify_cv);
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index 07f6aebb..41cda53e 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -140,6 +140,6 @@ typedef void (*nni_msgq_notify_fn)(nni_msgq *, int, void *);
// queue state changes. It notifies that the queue is readable, or writeable.
// Only one function can be registered (for simplicity), and it is called
// outside of the queue's lock.
-extern void nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *);
+extern int nni_msgq_notify(nni_msgq *, nni_msgq_notify_fn, void *);
#endif // CORE_MSQUEUE_H
diff --git a/src/core/options.c b/src/core/options.c
index 35411562..a0b74014 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -135,13 +135,13 @@ nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
}
switch (mask) {
- case NNG_EV_CAN_SEND:
- if ((s->s_flags & NNI_PROTO_FLAG_SEND) == 0) {
+ case NNG_EV_CAN_SND:
+ if ((s->s_flags & NNI_PROTO_FLAG_SND) == 0) {
return (NNG_ENOTSUP);
}
break;
- case NNG_EV_CAN_RECV:
- if ((s->s_flags & NNI_PROTO_FLAG_RECV) == 0) {
+ case NNG_EV_CAN_RCV:
+ if ((s->s_flags & NNI_PROTO_FLAG_RCV) == 0) {
return (NNG_ENOTSUP);
}
break;
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 52774080..9869afc9 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -100,9 +100,9 @@ struct nni_proto {
// These flags determine which operations make sense. We use them so that
// we can reject attempts to create notification fds for operations that make
// no sense.
-#define NNI_PROTO_FLAG_RECV 1 // Protocol can receive
-#define NNI_PROTO_FLAG_SEND 2 // Protocol can send
-#define NNI_PROTO_FLAG_SENDRECV 3 // Protocol can both send & recv
+#define NNI_PROTO_FLAG_RCV 1 // Protocol can receive
+#define NNI_PROTO_FLAG_SND 2 // Protocol can send
+#define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv
// These functions are not used by protocols, but rather by the socket
// core implementation. The lookups can be used by transports as well.
diff --git a/src/core/socket.c b/src/core/socket.c
index c0cf20b7..f451ba45 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -321,11 +321,11 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
- rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RECV, sock);
+ rv = nni_ev_init(&sock->s_recv_ev, NNG_EV_CAN_RCV, sock);
if (rv != 0) {
goto fail;
}
- rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SEND, sock);
+ rv = nni_ev_init(&sock->s_send_ev, NNG_EV_CAN_SND, sock);
if (rv != 0) {
goto fail;
}
@@ -364,12 +364,22 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
}
+ // XXX: This kills performance. Look at moving this to
+ // be conditional - if nobody has callbacks because their code is
+ // also threaded, then we don't need to jump through these hoops.
+ rv = nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+ rv = nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
+ if (rv != 0) {
+ goto fail;
+ }
+
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_run(&sock->s_worker_thr[i]);
}
- nni_msgq_notify(sock->s_urq, nni_sock_urq_notify, sock);
- nni_msgq_notify(sock->s_uwq, nni_sock_uwq_notify, sock);
nni_thr_run(&sock->s_reaper);
nni_thr_run(&sock->s_notifier);
@@ -808,12 +818,12 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RCVBUF:
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
- case NNG_OPT_SENDFD:
- rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SEND,
+ case NNG_OPT_SNDFD:
+ rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SND,
val, sizep);
break;
- case NNG_OPT_RECVFD:
- rv = nni_getopt_fd(sock, &sock->s_recv_fd, NNG_EV_CAN_RECV,
+ case NNG_OPT_RCVFD:
+ rv = nni_getopt_fd(sock, &sock->s_recv_fd, NNG_EV_CAN_RCV,
val, sizep);
break;
}
diff --git a/src/nng.h b/src/nng.h
index 8dc3fedd..eb9a2d87 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -105,16 +105,16 @@ NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *);
// Note that these are edge triggered -- therefore the status indicated
// may have changed since the notification occurred.
//
-// NNG_EV_CAN_RECV - A message is ready for receive.
-// NNG_EV_CAN_SEND - A message can be sent.
+// NNG_EV_CAN_RCV - A message is ready for receive.
+// NNG_EV_CAN_SND - A message can be sent.
// NNG_EV_ERROR - An error condition on the socket occurred.
// NNG_EV_PIPE_ADD - A new pipe (connection) is added to the socket.
// NNG_EV_PIPE_REM - A pipe (connection) is removed from the socket.
// NNG_EV_ENDPT_ADD - An endpoint is added to the socket.
// NNG_EV_ENDPT_REM - An endpoint is removed from the socket.
#define NNG_EV_BIT(x) (1U << (x))
-#define NNG_EV_CAN_RECV NNG_EV_BIT(0)
-#define NNG_EV_CAN_SEND NNG_EV_BIT(1)
+#define NNG_EV_CAN_RCV NNG_EV_BIT(0)
+#define NNG_EV_CAN_SND NNG_EV_BIT(1)
#define NNG_EV_ERROR NNG_EV_BIT(2)
#define NNG_EV_PIPE_ADD NNG_EV_BIT(3)
#define NNG_EV_PIPE_REM NNG_EV_BIT(4)
@@ -298,8 +298,8 @@ NNG_DECL int nng_pipe_close(nng_pipe);
#define NNG_OPT_TRANSPORT NNG_OPT_SOCKET(15)
#define NNG_OPT_LOCALADDR NNG_OPT_SOCKET(16)
#define NNG_OPT_REMOTEADDR NNG_OPT_SOCKET(17)
-#define NNG_OPT_RECVFD NNG_OPT_SOCKET(18)
-#define NNG_OPT_SENDFD NNG_OPT_SOCKET(19)
+#define NNG_OPT_RCVFD NNG_OPT_SOCKET(18)
+#define NNG_OPT_SNDFD NNG_OPT_SOCKET(19)
// XXX: TBD: priorities, socket names, ipv4only
diff --git a/src/platform/posix/posix_pipe.c b/src/platform/posix/posix_pipe.c
index 7f6a50cc..28cd909c 100644
--- a/src/platform/posix/posix_pipe.c
+++ b/src/platform/posix/posix_pipe.c
@@ -84,8 +84,8 @@ nni_plat_pipe_open(int *wfd, int *rfd)
if (pipe(fds) < 0) {
return (nni_plat_errno(errno));
}
- *wfd = fds[0];
- *rfd = fds[1];
+ *wfd = fds[1];
+ *rfd = fds[0];
(void) fcntl(fds[0], F_SETFD, FD_CLOEXEC);
(void) fcntl(fds[1], F_SETFD, FD_CLOEXEC);
@@ -101,7 +101,7 @@ nni_plat_pipe_raise(int wfd)
{
char c = 1;
- write(wfd, &c, 1);
+ (void) write(wfd, &c, 1);
}
@@ -109,6 +109,7 @@ void
nni_plat_pipe_clear(int rfd)
{
char buf[32];
+ int rv;
for (;;) {
// Completely drain the pipe, but don't wait. This coalesces
diff --git a/src/platform/windows/win_pipe.c b/src/platform/windows/win_pipe.c
index f254d9bb..22886faa 100644
--- a/src/platform/windows/win_pipe.c
+++ b/src/platform/windows/win_pipe.c
@@ -9,6 +9,7 @@
#include "core/nng_impl.h"
+#include <stdio.h>
// Windows named pipes won't work for us; we *MUST* use sockets. This is
// a real sadness, but what can you do. We use an anonymous socket bound
// to localhost and a connected peer.
@@ -34,19 +35,21 @@ nni_plat_pipe_open(int *wfdp, int *rfdp)
// ephemeral port.
addr.sin_family = AF_INET;
addr.sin_port = 0;
- addr.sin_addr.s_addr = INADDR_LOOPBACK;
+ addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
afd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (afd == INVALID_SOCKET) {
goto fail;
}
+
// Make sure we have exclusive address use...
one = 1;
if (setsockopt(afd, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
(char *) (&one), sizeof (one)) != 0) {
goto fail;
}
+
alen = sizeof (addr);
if (bind(afd, (struct sockaddr *) &addr, alen) != 0) {
goto fail;
@@ -65,7 +68,6 @@ nni_plat_pipe_open(int *wfdp, int *rfdp)
if (afd == INVALID_SOCKET) {
goto fail;
}
-
if (connect(rfd, (struct sockaddr *) &addr, alen) != 0) {
goto fail;
}
@@ -105,7 +107,7 @@ fail:
closesocket(wfd);
}
- return (0);
+ return (rv);
}
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index c1334fa5..c9a1b42b 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -273,7 +273,7 @@ nni_proto nni_bus_proto = {
.proto_self = NNG_PROTO_BUS,
.proto_peer = NNG_PROTO_BUS,
.proto_name = "bus",
- .proto_flags = NNI_PROTO_FLAG_SENDRECV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_bus_sock_ops,
.proto_pipe_ops = &nni_bus_pipe_ops,
};
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index fe0874ca..b97b4b46 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -233,7 +233,7 @@ nni_proto nni_pair_proto = {
.proto_self = NNG_PROTO_PAIR,
.proto_peer = NNG_PROTO_PAIR,
.proto_name = "pair",
- .proto_flags = NNI_PROTO_FLAG_SENDRECV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_pair_sock_ops,
.proto_pipe_ops = &nni_pair_pipe_ops,
};
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 5b3efd4d..6f2d716b 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -155,7 +155,7 @@ nni_proto nni_pull_proto = {
.proto_self = NNG_PROTO_PULL,
.proto_peer = NNG_PROTO_PUSH,
.proto_name = "pull",
- .proto_flags = NNI_PROTO_FLAG_RECV,
+ .proto_flags = NNI_PROTO_FLAG_RCV,
.proto_pipe_ops = &nni_pull_pipe_ops,
.proto_sock_ops = &nni_pull_sock_ops,
};
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 8794d84f..3c3164d5 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -313,7 +313,7 @@ nni_proto nni_push_proto = {
.proto_self = NNG_PROTO_PUSH,
.proto_peer = NNG_PROTO_PULL,
.proto_name = "push",
- .proto_flags = NNI_PROTO_FLAG_SEND,
+ .proto_flags = NNI_PROTO_FLAG_SND,
.proto_pipe_ops = &nni_push_pipe_ops,
.proto_sock_ops = &nni_push_sock_ops,
};
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 24dddf4e..d148999b 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -274,7 +274,7 @@ nni_proto nni_pub_proto = {
.proto_self = NNG_PROTO_PUB,
.proto_peer = NNG_PROTO_SUB,
.proto_name = "pub",
- .proto_flags = NNI_PROTO_FLAG_SEND,
+ .proto_flags = NNI_PROTO_FLAG_SND,
.proto_sock_ops = &nni_pub_sock_ops,
.proto_pipe_ops = &nni_pub_pipe_ops,
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 512765eb..46dcfd5c 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -314,7 +314,7 @@ nni_proto nni_sub_proto = {
.proto_self = NNG_PROTO_SUB,
.proto_peer = NNG_PROTO_PUB,
.proto_name = "sub",
- .proto_flags = NNI_PROTO_FLAG_RECV,
+ .proto_flags = NNI_PROTO_FLAG_RCV,
.proto_sock_ops = &nni_sub_sock_ops,
.proto_pipe_ops = &nni_sub_pipe_ops,
};
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 36535fa4..40ee52fc 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -414,7 +414,7 @@ nni_proto nni_rep_proto = {
.proto_self = NNG_PROTO_REP,
.proto_peer = NNG_PROTO_REQ,
.proto_name = "rep",
- .proto_flags = NNI_PROTO_FLAG_SENDRECV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_rep_sock_ops,
.proto_pipe_ops = &nni_rep_pipe_ops,
};
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 5d87e30d..c200fdc9 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -406,7 +406,7 @@ nni_proto nni_req_proto = {
.proto_self = NNG_PROTO_REQ,
.proto_peer = NNG_PROTO_REP,
.proto_name = "req",
- .proto_flags = NNI_PROTO_FLAG_SENDRECV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_req_sock_ops,
.proto_pipe_ops = &nni_req_pipe_ops,
};
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index a915eace..8a7239b3 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -416,7 +416,7 @@ nni_proto nni_respondent_proto = {
.proto_self = NNG_PROTO_RESPONDENT,
.proto_peer = NNG_PROTO_SURVEYOR,
.proto_name = "respondent",
- .proto_flags = NNI_PROTO_FLAG_SENDRECV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_resp_sock_ops,
.proto_pipe_ops = &nni_resp_pipe_ops,
};
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index b887a6e9..c16bad3b 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -427,7 +427,7 @@ nni_proto nni_surveyor_proto = {
.proto_self = NNG_PROTO_SURVEYOR,
.proto_peer = NNG_PROTO_RESPONDENT,
.proto_name = "surveyor",
- .proto_flags = NNI_PROTO_FLAG_SENDRECV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV,
.proto_sock_ops = &nni_surv_sock_ops,
.proto_pipe_ops = &nni_surv_pipe_ops,
};
diff --git a/tests/event.c b/tests/event.c
index 554f990f..4e6eae8c 100644
--- a/tests/event.c
+++ b/tests/event.c
@@ -33,28 +33,28 @@ bump(nng_event *ev, void *arg)
assert(nng_event_socket(ev) == cnt->sock);
switch (nng_event_type(ev)) {
- case NNG_EV_CAN_SEND:
- cnt->writeable++;
+ case NNG_EV_CAN_SND:
+ cnt->writeable = 1;
break;
- case NNG_EV_CAN_RECV:
- cnt->readable++;
+ case NNG_EV_CAN_RCV:
+ cnt->readable = 1;
break;
case NNG_EV_PIPE_ADD:
- cnt->pipeadd++;
+ cnt->pipeadd = 1;
break;
case NNG_EV_PIPE_REM:
- cnt->piperem++;
+ cnt->piperem = 1;
break;
case NNG_EV_ENDPT_ADD:
- cnt->epadd++;
+ cnt->epadd = 1;
break;
case NNG_EV_ENDPT_REM:
- cnt->eprem++;
+ cnt->eprem = 1;
break;
default:
@@ -95,8 +95,8 @@ Main({
nng_usleep(100000);
Convey("We can register callbacks", {
- So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SEND, bump, &evcnt1)) != NULL);
- So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RECV, bump, &evcnt2)) != NULL);
+ So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SND, bump, &evcnt1)) != NULL);
+ So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RCV, bump, &evcnt2)) != NULL);
Convey("They are called", {
nng_msg *msg;
@@ -105,6 +105,7 @@ Main({
APPENDSTR(msg, "abc");
So(nng_sendmsg(sock1, msg, 0) == 0);
+ //nng_usleep(20000);
So(nng_recvmsg(sock2, &msg, 0) == 0);
CHECKSTR(msg, "abc");
diff --git a/tests/pollfd.c b/tests/pollfd.c
index 899547c6..f6b9cf30 100644
--- a/tests/pollfd.c
+++ b/tests/pollfd.c
@@ -10,12 +10,22 @@
#include "convey.h"
#include "nng.h"
-#ifdef _WIN32
-#include <windows.h>
-#include <winsock2.h>
-#else
+#ifndef _WIN32
#include <unistd.h>
+#include <poll.h>
#define INVALID_SOCKET -1
+#else
+
+#define poll WSAPoll
+#ifndef WIN32_LEAN_AND_MEAN
+#define WIN32_LEAN_AND_MEAN
+#endif
+
+#include <windows.h>
+#include <winsock2.h>
+#include <mswsock.h>
+#include <ws2tcpip.h>
+
#endif
// Inproc tests.
@@ -33,15 +43,39 @@ TestMain("Poll FDs", {
nng_close(s2);
})
So(nng_listen(s1, "inproc://yeahbaby", NULL, 0) == 0);
- So(nng_dial(s2, "inproc://yeahbaby", NULL, NNG_FLAG_SYNCH) == 0);
+ nng_usleep(50000);
+
+ So(nng_dial(s2, "inproc://yeahbaby", NULL, 0) == 0);
+ nng_usleep(50000);
Convey("We can get a recv FD", {
int fd;
size_t sz;
sz = sizeof (fd);
- So(nng_getopt(s1, NNG_OPT_RECVFD, &fd, &sz) == 0);
+ So(nng_getopt(s1, NNG_OPT_RCVFD, &fd, &sz) == 0);
So(fd != INVALID_SOCKET);
+
+ Convey("And they start non pollable", {
+ struct pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ So(poll(&pfd, 1, 0) == 0);
+ So(pfd.revents == 0);
+ })
+
+ Convey("But if we write they are pollable", {
+ struct pollfd pfd;
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ So(nng_send(s2, "kick", 5, 0) == 0);
+ So(poll(&pfd, 1, 1000) == 1);
+ So((pfd.revents & POLLIN) != 0);
+ })
})
Convey("We can get a send FD", {
@@ -49,8 +83,33 @@ TestMain("Poll FDs", {
size_t sz;
sz = sizeof (fd);
- So(nng_getopt(s1, NNG_OPT_SENDFD, &fd, &sz) == 0);
+ So(nng_getopt(s1, NNG_OPT_SNDFD, &fd, &sz) == 0);
So(fd != INVALID_SOCKET);
+ So(nng_send(s1, "oops", 4, 0) == 0);
+ })
+
+ Convey("We cannot get a send FD for PULL", {
+ nng_socket s3;
+ int fd;
+ size_t sz;
+ So(nng_open(&s3, NNG_PROTO_PULL) == 0);
+ Reset({
+ nng_close(s3);
+ })
+ sz = sizeof (fd);
+ So(nng_getopt(s3, NNG_OPT_SNDFD, &fd, &sz) == NNG_ENOTSUP);
+ })
+
+ Convey("We cannot get a recv FD for PUSH", {
+ nng_socket s3;
+ int fd;
+ size_t sz;
+ So(nng_open(&s3, NNG_PROTO_PUSH) == 0);
+ Reset({
+ nng_close(s3);
+ })
+ sz = sizeof (fd);
+ So(nng_getopt(s3, NNG_OPT_RCVFD, &fd, &sz) == NNG_ENOTSUP);
})
})
})