aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/defs.h7
-rw-r--r--src/core/options.c57
-rw-r--r--src/core/options.h9
-rw-r--r--src/core/platform.h12
-rw-r--r--src/core/protocol.h2
-rw-r--r--src/core/socket.c28
-rw-r--r--src/core/socket.h5
-rw-r--r--tests/pubsub.c1
-rw-r--r--tests/tcp.c1
9 files changed, 113 insertions, 9 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index ff36cb3f..abb340bc 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -56,6 +56,13 @@ typedef struct {
size_t iov_len;
} nni_iov;
+// Notify descriptor.
+typedef struct {
+ int sn_wfd; // written to in order to flag an event
+ int sn_rfd; // read from in order to clear an event
+ int sn_init;
+} nni_notifyfd;
+
// Some default timing things.
#define NNI_TIME_NEVER ((nni_time) -1)
#define NNI_TIME_ZERO ((nni_time) 0)
diff --git a/src/core/options.c b/src/core/options.c
index 32b65ebf..35411562 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -112,3 +112,60 @@ nni_getopt_buf(nni_msgq *mq, void *val, size_t *sizep)
*sizep = sizeof (len);
return (0);
}
+
+
+static void
+nni_notifyfd_push(struct nng_event *ev, void *arg)
+{
+ nni_notifyfd *fd = arg;
+
+ NNI_ARG_UNUSED(ev);
+
+ nni_plat_pipe_raise(fd->sn_wfd);
+}
+
+
+int
+nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
+{
+ int rv;
+
+ if ((*szp < sizeof (int))) {
+ return (NNG_EINVAL);
+ }
+
+ switch (mask) {
+ case NNG_EV_CAN_SEND:
+ if ((s->s_flags & NNI_PROTO_FLAG_SEND) == 0) {
+ return (NNG_ENOTSUP);
+ }
+ break;
+ case NNG_EV_CAN_RECV:
+ if ((s->s_flags & NNI_PROTO_FLAG_RECV) == 0) {
+ return (NNG_ENOTSUP);
+ }
+ break;
+ default:
+ return (NNG_ENOTSUP);
+ }
+
+ // If we already inited this, just give back the same file descriptor.
+ if (fd->sn_init) {
+ memcpy(val, &fd->sn_rfd, sizeof (int));
+ *szp = sizeof (int);
+ return (0);
+ }
+
+ if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
+ return (rv);
+ }
+
+ if (nni_add_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
+ nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
+ return (NNG_ENOMEM);
+ }
+
+ *szp = sizeof (int);
+ memcpy(val, &fd->sn_rfd, sizeof (int));
+ return (0);
+}
diff --git a/src/core/options.h b/src/core/options.h
index 84d958c1..6c46173f 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -10,6 +10,12 @@
#ifndef CORE_OPTIONS_H
#define CORE_OPTIONS_H
+struct nni_notifyfd {
+ int sn_wfd; // written to in order to flag an event
+ int sn_rfd; // read from in order to clear an event
+ int sn_init;
+};
+
// Option helpers. These can be called from protocols or transports
// in their own option handling, centralizing the logic for dealing with
// variable sized options.
@@ -37,4 +43,7 @@ extern int nni_setopt_int(int *, const void *, size_t, int, int);
// nni_getopt_int gets an integer.
extern int nni_getopt_int(int *, void *, size_t *);
+// nni_getopt_fd obtains a notification file descriptor.
+extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *);
+
#endif // CORE_OPTIONS_H
diff --git a/src/core/platform.h b/src/core/platform.h
index 14b3aaed..b1e824e2 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -266,15 +266,15 @@ extern void nni_plat_seed_prng(void *, size_t);
// just provide the same value twice.
extern int nni_plat_pipe_open(int *, int *);
-// nni_plat_pipe_push pushses a notification to the pipe. Usually this
+// nni_plat_pipe_raise pushes a notification to the pipe. Usually this
// will just be a non-blocking attempt to write a single byte. It may
// however use any other underlying system call that is appropriate.
-extern void nni_plat_pipe_push(int);
+extern void nni_plat_pipe_raise(int);
-// nni_plat_pipe_pull pulls a notification from the pipe. Usually this
-// will just be a non-blocking read. (The pull should attempt to read
-// all data on the pipe.)
-extern void nni_plat_pipe_pull(int);
+// nni_plat_pipe_clear clears all notifications from the pipe. Usually this
+// will just be a non-blocking read. (The call should attempt to read
+// all data on a pipe, for example.)
+extern void nni_plat_pipe_clear(int);
// nni_plat_pipe_close closes both pipes that were provided by the open
// routine.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index e7ea8caf..52774080 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -92,7 +92,7 @@ struct nni_proto {
uint16_t proto_self; // our 16-bit D
uint16_t proto_peer; // who we peer with (ID)
const char * proto_name; // Our name
- uint16_t proto_flags; // Protocol flags
+ uint32_t proto_flags; // Protocol flags
const nni_proto_sock_ops * proto_sock_ops; // Per-socket opeations
const nni_proto_pipe_ops * proto_pipe_ops; // Per-pipe operations.
};
diff --git a/src/core/socket.c b/src/core/socket.c
index 52df70a2..c0cf20b7 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -268,6 +268,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
// We make a copy of the protocol operations.
sock->s_protocol = proto->proto_self;
sock->s_peer = proto->proto_peer;
+ sock->s_flags = proto->proto_flags;
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
@@ -280,6 +281,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
+ sock->s_send_fd.sn_init = 0;
+ sock->s_recv_fd.sn_init = 0;
sock->s_sock_ops = *proto->proto_sock_ops;
sops = &sock->s_sock_ops;
@@ -535,6 +538,16 @@ nni_sock_close(nni_sock *sock)
nni_mtx_unlock(nni_idlock);
+ // Close any open notification pipes.
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_close(sock->s_recv_fd.sn_wfd,
+ sock->s_recv_fd.sn_rfd);
+ }
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_close(sock->s_send_fd.sn_wfd,
+ sock->s_send_fd.sn_rfd);
+ }
+
// The protocol needs to clean up its state.
sock->s_sock_ops.sock_fini(sock->s_data);
@@ -593,6 +606,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
// backpressure, we just throw it away, and don't complain.
expire = NNI_TIME_ZERO;
}
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
+ }
rv = nni_msgq_put_until(sock->s_uwq, msg, expire);
if (besteffort && (rv == NNG_EAGAIN)) {
// Pretend this worked... it didn't, but pretend.
@@ -620,6 +636,10 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
}
nni_mtx_unlock(&sock->s_mx);
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
+ }
+
for (;;) {
rv = nni_msgq_get_until(sock->s_urq, &msg, expire);
if (rv != 0) {
@@ -788,6 +808,14 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RCVBUF:
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
+ case NNG_OPT_SENDFD:
+ rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SEND,
+ val, sizep);
+ break;
+ case NNG_OPT_RECVFD:
+ rv = nni_getopt_fd(sock, &sock->s_recv_fd, NNG_EV_CAN_RECV,
+ val, sizep);
+ break;
}
nni_mtx_unlock(&sock->s_mx);
return (rv);
diff --git a/src/core/socket.h b/src/core/socket.h
index 2f34a038..42f42371 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -10,6 +10,7 @@
#ifndef CORE_SOCKET_H
#define CORE_SOCKET_H
+
// NB: This structure is supplied here for use by the CORE. Use of this library
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
@@ -26,6 +27,7 @@ struct nni_socket {
uint16_t s_protocol;
uint16_t s_peer;
+ uint32_t s_flags;
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
@@ -62,6 +64,9 @@ struct nni_socket {
nni_event s_recv_ev; // Event for readability
nni_event s_send_ev; // Event for sendability
+ nni_notifyfd s_send_fd;
+ nni_notifyfd s_recv_fd;
+
uint32_t s_nextid; // Next Pipe ID.
};
diff --git a/tests/pubsub.c b/tests/pubsub.c
index 431d6980..94558f39 100644
--- a/tests/pubsub.c
+++ b/tests/pubsub.c
@@ -18,7 +18,6 @@
So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
Main({
- int rv;
const char *addr = "inproc://test";
nni_init();
diff --git a/tests/tcp.c b/tests/tcp.c
index f77ec3a0..9f248fe8 100644
--- a/tests/tcp.c
+++ b/tests/tcp.c
@@ -14,7 +14,6 @@
// Inproc tests.
TestMain("TCP Transport", {
- int rv;
trantest_test_all("tcp://127.0.0.1:4450");