aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-07 21:49:48 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-07 21:52:30 -0800
commitbc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 (patch)
tree55ca7c800e9dfa54bb58b3f2323b1cb5996fab09 /src
parentffdceebc19214f384f1b1b6b358f1b2301384135 (diff)
downloadnng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.gz
nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.bz2
nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.zip
Simplify locking for protocols.
In an attempt to simplify the protocol implementation, and hopefully track down a close related race, we've made it so that most protocols need not worry about locks, and can access the socket lock if they do need a lock. They also let the socket manage their workers, for the most part. (The req protocol is special, since it needs a top level work distributor, *and* a resender.)
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h27
-rw-r--r--src/core/pipe.c32
-rw-r--r--src/core/pipe.h1
-rw-r--r--src/core/protocol.h52
-rw-r--r--src/core/socket.c120
-rw-r--r--src/core/socket.h63
-rw-r--r--src/core/thread.c8
-rw-r--r--src/protocol/pair/pair.c40
-rw-r--r--src/protocol/pipeline/pull.c79
-rw-r--r--src/protocol/pipeline/push.c93
-rw-r--r--src/protocol/pubsub/pub.c70
-rw-r--r--src/protocol/pubsub/sub.c87
-rw-r--r--src/protocol/reqrep/rep.c99
-rw-r--r--src/protocol/reqrep/req.c93
14 files changed, 421 insertions, 443 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 9ae734f7..98f4e661 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -18,24 +18,25 @@
#define NNI_ARG_UNUSED(x) ((void) x);
// These types are common but have names shared with user space.
-typedef struct nng_socket nni_sock;
-typedef struct nng_endpoint nni_ep;
-typedef struct nng_pipe nni_pipe;
-typedef struct nng_msg nni_msg;
-typedef struct nng_sockaddr nni_sockaddr;
+typedef struct nng_socket nni_sock;
+typedef struct nng_endpoint nni_ep;
+typedef struct nng_pipe nni_pipe;
+typedef struct nng_msg nni_msg;
+typedef struct nng_sockaddr nni_sockaddr;
// These are our own names.
-typedef struct nni_tran nni_tran;
-typedef struct nni_tran_ep nni_tran_ep;
-typedef struct nni_tran_pipe nni_tran_pipe;
+typedef struct nni_tran nni_tran;
+typedef struct nni_tran_ep nni_tran_ep;
+typedef struct nni_tran_pipe nni_tran_pipe;
-typedef struct nni_proto_pipe nni_proto_pipe;
-typedef struct nni_proto nni_proto;
+typedef struct nni_proto_sock_ops nni_proto_sock_ops;
+typedef struct nni_proto_pipe_ops nni_proto_pipe_ops;
+typedef struct nni_proto nni_proto;
-typedef int nni_signal; // Turnstile/wakeup channel.
-typedef uint64_t nni_time; // Absolute time (usec).
-typedef int64_t nni_duration; // Relative time (usec).
+typedef int nni_signal; // Wakeup channel.
+typedef uint64_t nni_time; // Abs. time (usec).
+typedef int64_t nni_duration; // Rel. time (usec).
// Used by transports for scatter gather I/O.
typedef struct {
diff --git a/src/core/pipe.c b/src/core/pipe.c
index e49ad05a..b3a107ff 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -77,7 +77,7 @@ nni_pipe_destroy(nni_pipe *p)
p->p_tran_ops.pipe_destroy(p->p_tran_data);
}
if (p->p_proto_data != NULL) {
- p->p_proto_ops.pipe_fini(p->p_proto_data);
+ p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
}
NNI_FREE_STRUCT(p);
}
@@ -88,8 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
{
nni_pipe *p;
nni_sock *sock = ep->ep_sock;
- nni_proto *proto = &sock->s_proto;
- const nni_proto_pipe *pops = proto->proto_pipe;
+ const nni_proto_pipe_ops *ops = &sock->s_pipe_ops;
+ void *pdata;
int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
@@ -105,23 +105,19 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *ep->ep_tran->tran_pipe;
- // Make a copy of the protocol ops. Same rationale.
- p->p_proto_ops = *pops;
-
- if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) {
+ if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) {
NNI_FREE_STRUCT(p);
return (rv);
}
- rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data);
- if (rv != 0) {
- pops->pipe_fini(&p->p_proto_data);
+ p->p_proto_data = pdata;
+ if ((rv = nni_thr_init(&p->p_recv_thr, ops->pipe_recv, pdata)) != 0) {
+ ops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
- rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data);
- if (rv != 0) {
+ if ((rv = nni_thr_init(&p->p_send_thr, ops->pipe_send, pdata)) != 0) {
nni_thr_fini(&p->p_recv_thr);
- pops->pipe_fini(&p->p_proto_data);
+ ops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
@@ -155,9 +151,16 @@ nni_pipe_start(nni_pipe *pipe)
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
return (NNG_ECLOSED);
}
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
+ return (NNG_EPROTO);
+ }
+
do {
// We generate a new pipe ID, but we make sure it does not
// collide with any we already have. This can only normally
@@ -174,8 +177,7 @@ nni_pipe_start(nni_pipe *pipe)
}
} while (collide);
- rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data);
- if (rv != 0) {
+ if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
nni_mtx_unlock(&sock->s_mx);
nni_pipe_close(pipe);
return (rv);
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 242e1d8c..cc14de86 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -21,7 +21,6 @@ struct nng_pipe {
uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
- nni_proto_pipe p_proto_ops;
void * p_proto_data;
nni_list_node p_node;
nni_sock * p_sock;
diff --git a/src/core/protocol.h b/src/core/protocol.h
index c858ad39..000e9c64 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -1,5 +1,5 @@
//
-// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -21,7 +21,7 @@
// implementations.
// nni_proto_pipe contains protocol-specific per-pipe operations.
-struct nni_proto_pipe {
+struct nni_proto_pipe_ops {
// pipe_init creates the protocol-specific per pipe data structure.
// The last argument is the per-socket protocol private data.
int (*pipe_init)(void **, nni_pipe *, void *);
@@ -55,32 +55,52 @@ struct nni_proto_pipe {
void (*pipe_recv)(void *);
};
-struct nni_proto {
- uint16_t proto_self; // our 16-bit protocol ID
- uint16_t proto_peer; // who we peer with (ID)
- const char * proto_name; // string version of our name
- const nni_proto_pipe * proto_pipe; // Per-pipe operations.
+struct nni_proto_sock_ops {
+ // sock_initf creates the protocol instance, which will be stored on
+ // the socket. This is run without the sock lock held, and allocates
+ // storage or other resources for the socket.
+ int (*sock_init)(void **, nni_sock *);
- // Create protocol instance, which will be stored on the socket.
- int (*proto_init)(void **, nni_sock *);
+ // sock_fini destroys the protocol instance. This is run without the
+ // socket lock held, and is intended to release resources. It may
+ // block as needed.
+ void (*sock_fini)(void *);
- // Destroy the protocol instance.
- void (*proto_fini)(void *);
+ // Close the protocol instance. This is run with the lock held,
+ // and intended to initiate closure of the socket. For example,
+ // it can signal the socket worker threads to exit.
+ void (*sock_close)(void *);
// Option manipulation. These may be NULL.
- int (*proto_setopt)(void *, int, const void *,
- size_t);
- int (*proto_getopt)(void *, int, void *, size_t *);
+ int (*sock_setopt)(void *, int, const void *, size_t);
+ int (*sock_getopt)(void *, int, void *, size_t *);
+
+ // sock_send is a send worker. It can really be anything, but it
+ // is run in a separate thread (if it is non-NULL).
+ void (*sock_send)(void *);
+
+ // sock_recv is a receive worker. As with send it can really be
+ // anything, its just a thread that runs for the duration of the
+ // socket.
+ void (*sock_recv)(void *);
// Receive filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
// delivered to the application. To drop the message, the prtocol
// should return NULL, otherwise the message (possibly modified).
- nni_msg * (*proto_recv_filter)(void *, nni_msg *);
+ nni_msg * (*sock_rfilter)(void *, nni_msg *);
// Send filter. This may be NULL, but if it isn't, then messages
// here are filtered just after they come from the application.
- nni_msg * (*proto_send_filter)(void *, nni_msg *);
+ nni_msg * (*sock_sfilter)(void *, nni_msg *);
+};
+
+struct nni_proto {
+ uint16_t proto_self; // our 16-bit D
+ uint16_t proto_peer; // who we peer with (ID)
+ const char * proto_name; // Our name
+ const nni_proto_sock_ops * proto_sock_ops; // Per-socket opeations
+ const nni_proto_pipe_ops * proto_pipe_ops; // Per-pipe operations.
};
// These functions are not used by protocols, but rather by the socket
diff --git a/src/core/socket.c b/src/core/socket.c
index 4e8eccbf..2ef35d7a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -56,7 +56,7 @@ nni_reaper(void *arg)
// Note that if a protocol has rejected the pipe, it
// won't have any data.
if (pipe->p_active) {
- pipe->p_proto_ops.pipe_rem(pipe->p_proto_data);
+ sock->s_pipe_ops.pipe_rem(pipe->p_proto_data);
}
nni_mtx_unlock(&sock->s_mx);
@@ -78,6 +78,13 @@ nni_reaper(void *arg)
}
+nni_mtx *
+nni_sock_mtx(nni_sock *sock)
+{
+ return (&sock->s_mx);
+}
+
+
static nni_msg *
nni_sock_nullfilter(void *arg, nni_msg *mp)
{
@@ -108,6 +115,22 @@ nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz)
}
+static void
+nni_sock_nullop(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static int
+nni_sock_nulladdpipe(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+
+ return (0);
+}
+
+
// nn_sock_open creates the underlying socket.
int
nni_sock_open(nni_sock **sockp, uint16_t pnum)
@@ -124,7 +147,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
// We make a copy of the protocol operations.
- sock->s_proto = *proto;
+ sock->s_protocol = proto->proto_self;
+ sock->s_peer = proto->proto_peer;
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
@@ -135,6 +159,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
+ sock->s_sock_ops = *proto->proto_sock_ops;
+ if (sock->s_sock_ops.sock_sfilter == NULL) {
+ sock->s_sock_ops.sock_sfilter = nni_sock_nullfilter;
+ }
+ if (sock->s_sock_ops.sock_rfilter == NULL) {
+ sock->s_sock_ops.sock_rfilter = nni_sock_nullfilter;
+ }
+ if (sock->s_sock_ops.sock_getopt == NULL) {
+ sock->s_sock_ops.sock_getopt = nni_sock_nullgetopt;
+ }
+ if (sock->s_sock_ops.sock_setopt == NULL) {
+ sock->s_sock_ops.sock_setopt = nni_sock_nullsetopt;
+ }
+ if (sock->s_sock_ops.sock_close == NULL) {
+ sock->s_sock_ops.sock_close = nni_sock_nullop;
+ }
+ sock->s_pipe_ops = *proto->proto_pipe_ops;
+ if (sock->s_pipe_ops.pipe_add == NULL) {
+ sock->s_pipe_ops.pipe_add = nni_sock_nulladdpipe;
+ }
+ if (sock->s_pipe_ops.pipe_rem == NULL) {
+ sock->s_pipe_ops.pipe_rem = nni_sock_nullop;
+ }
+
if ((rv = nni_mtx_init(&sock->s_mx)) != 0) {
NNI_FREE_STRUCT(sock);
return (rv);
@@ -145,6 +193,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
return (rv);
}
+
if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
@@ -161,14 +210,14 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) {
nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_reaper);
+ nni_thr_fini(&sock->s_recver);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
return (rv);
}
- if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) {
+ if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) {
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
@@ -177,19 +226,39 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_FREE_STRUCT(sock);
return (rv);
}
- if (sock->s_proto.proto_send_filter == NULL) {
- sock->s_proto.proto_send_filter = nni_sock_nullfilter;
- }
- if (sock->s_proto.proto_recv_filter == NULL) {
- sock->s_proto.proto_recv_filter = nni_sock_nullfilter;
- }
- if (sock->s_proto.proto_getopt == NULL) {
- sock->s_proto.proto_getopt = nni_sock_nullgetopt;
+
+ // NB: If these functions are NULL, the thread initialization is
+ // largely a NO-OP. The system won't actually create the threads.
+ rv = nni_thr_init(&sock->s_sender, sock->s_sock_ops.sock_send,
+ sock->s_data);
+ if (rv != 0) {
+ nni_thr_wait(&sock->s_reaper);
+ sock->s_sock_ops.sock_fini(&sock->s_data);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_thr_fini(&sock->s_reaper);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
}
- if (sock->s_proto.proto_setopt == NULL) {
- sock->s_proto.proto_setopt = nni_sock_nullsetopt;
+ rv = nni_thr_init(&sock->s_recver, sock->s_sock_ops.sock_recv,
+ sock->s_data);
+ if (rv != 0) {
+ nni_thr_wait(&sock->s_sender);
+ sock->s_sock_ops.sock_fini(&sock->s_data);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_thr_fini(&sock->s_sender);
+ nni_thr_fini(&sock->s_reaper);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
}
+
+
nni_thr_run(&sock->s_reaper);
+ nni_thr_run(&sock->s_recver);
+ nni_thr_run(&sock->s_sender);
*sockp = sock;
return (0);
}
@@ -265,10 +334,14 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_lock(&sock->s_mx);
}
+ sock->s_sock_ops.sock_close(sock->s_data);
+
nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
- // Wait for the reaper to exit.
+ // Wait for the threads to exit.
+ nni_thr_wait(&sock->s_sender);
+ nni_thr_wait(&sock->s_recver);
nni_thr_wait(&sock->s_reaper);
// At this point, there are no threads blocked inside of us
@@ -297,7 +370,7 @@ nni_sock_close(nni_sock *sock)
// the results may be tragic.
// The protocol needs to clean up its state.
- sock->s_proto.proto_fini(sock->s_data);
+ sock->s_sock_ops.sock_fini(sock->s_data);
// And we need to clean up *our* state.
nni_thr_fini(&sock->s_reaper);
@@ -328,9 +401,10 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
return (rv);
}
besteffort = sock->s_besteffort;
+
+ msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg);
nni_mtx_unlock(&sock->s_mx);
- msg = sock->s_proto.proto_send_filter(sock->s_data, msg);
if (msg == NULL) {
return (0);
}
@@ -372,7 +446,9 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- msg = sock->s_proto.proto_recv_filter(sock->s_data, msg);
+ nni_mtx_lock(&sock->s_mx);
+ msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg);
+ nni_mtx_unlock(&sock->s_mx);
if (msg != NULL) {
break;
}
@@ -388,14 +464,14 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
uint16_t
nni_sock_proto(nni_sock *sock)
{
- return (sock->s_proto.proto_self);
+ return (sock->s_protocol);
}
uint16_t
nni_sock_peer(nni_sock *sock)
{
- return (sock->s_proto.proto_peer);
+ return (sock->s_peer);
}
@@ -488,7 +564,7 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size);
+ rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
return (rv);
@@ -533,7 +609,7 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep);
+ rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
return (rv);
diff --git a/src/core/socket.h b/src/core/socket.h
index 62347f61..197b5d0d 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -14,36 +14,42 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
struct nng_socket {
- nni_mtx s_mx;
- nni_cv s_cv;
+ nni_mtx s_mx;
+ nni_cv s_cv;
- nni_msgq * s_uwq; // Upper write queue
- nni_msgq * s_urq; // Upper read queue
+ nni_msgq * s_uwq; // Upper write queue
+ nni_msgq * s_urq; // Upper read queue
- nni_proto s_proto;
+ uint16_t s_protocol;
+ uint16_t s_peer;
- void * s_data; // Protocol private
+ nni_proto_pipe_ops s_pipe_ops;
+ nni_proto_sock_ops s_sock_ops;
- // XXX: options
- nni_duration s_linger; // linger time
- nni_duration s_sndtimeo; // send timeout
- nni_duration s_rcvtimeo; // receive timeout
- nni_duration s_reconn; // reconnect time
- nni_duration s_reconnmax; // max reconnect time
-
- nni_list s_eps; // active endpoints
- nni_list s_pipes; // pipes for this socket
-
- nni_list s_reaps; // pipes to reap
- nni_thr s_reaper;
+ void * s_data; // Protocol private
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_besteffort; // Best effort mode delivery
- int s_senderr; // Protocol state machine use
- int s_recverr; // Protocol state machine use
-
- uint32_t s_nextid; // Next Pipe ID.
+ // XXX: options
+ nni_duration s_linger; // linger time
+ nni_duration s_sndtimeo; // send timeout
+ nni_duration s_rcvtimeo; // receive timeout
+ nni_duration s_reconn; // reconnect time
+ nni_duration s_reconnmax; // max reconnect time
+
+ nni_list s_eps; // active endpoints
+ nni_list s_pipes; // pipes for this socket
+
+ nni_list s_reaps; // pipes to reap
+ nni_thr s_reaper;
+ nni_thr s_sender;
+ nni_thr s_recver;
+
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_besteffort; // Best effort mode delivery
+ int s_senderr; // Protocol state machine use
+ int s_recverr; // Protocol state machine use
+
+ uint32_t s_nextid; // Next Pipe ID.
};
extern int nni_sock_open(nni_sock **, uint16_t);
@@ -76,5 +82,12 @@ extern nni_msgq *nni_sock_sendq(nni_sock *);
// inject incoming messages from pipes to it.
extern nni_msgq *nni_sock_recvq(nni_sock *);
+// nni_sock_mtx obtains the socket mutex. This is for protocols to use
+// from separate threads; they must not hold the lock for extended periods.
+// Additionally, this can only be acquired from separate threads. The
+// synchronous entry points (excluding the send/recv thread workers) will
+// be called with this lock already held. We expose the mutex directly
+// here so that protocols can use it to initialize condvars.
+extern nni_mtx *nni_sock_mtx(nni_sock *);
#endif // CORE_SOCKET_H
diff --git a/src/core/thread.c b/src/core/thread.c
index 7678ad0a..4a871ede 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -128,6 +128,10 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg)
nni_plat_mtx_fini(&thr->mtx);
return (rv);
}
+ if (fn == NULL) {
+ thr->done = 1;
+ return (0);
+ }
if ((rv = nni_plat_thr_init(&thr->thr, nni_thr_wrap, thr)) != 0) {
nni_plat_cv_fini(&thr->cv);
nni_plat_mtx_fini(&thr->mtx);
@@ -170,7 +174,9 @@ nni_thr_fini(nni_thr *thr)
nni_plat_cv_wait(&thr->cv);
}
nni_plat_mtx_unlock(&thr->mtx);
- nni_plat_thr_fini(&thr->thr);
+ if (thr->fn != NULL) {
+ nni_plat_thr_fini(&thr->thr);
+ }
nni_plat_cv_fini(&thr->cv);
nni_plat_mtx_fini(&thr->mtx);
}
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 830a02e6..51608e07 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -41,7 +41,7 @@ static void nni_pair_receiver(void *);
static void nni_pair_sender(void *);
static int
-nni_pair_init(void **pairp, nni_sock *sock)
+nni_pair_sock_init(void **pairp, nni_sock *sock)
{
nni_pair_sock *pair;
int rv;
@@ -59,7 +59,7 @@ nni_pair_init(void **pairp, nni_sock *sock)
static void
-nni_pair_fini(void *arg)
+nni_pair_sock_fini(void *arg)
{
nni_pair_sock *pair = arg;
@@ -102,9 +102,6 @@ nni_pair_pipe_add(void *arg)
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PAIR) {
- return (NNG_EPROTO);
- }
if (pair->pipe != NULL) {
return (NNG_EBUSY); // Already have a peer, denied.
}
@@ -182,14 +179,14 @@ nni_pair_pipe_recv(void *arg)
// TODO: probably we could replace these with NULL, since we have no
// protocol specific options?
static int
-nni_pair_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_pair_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
return (NNG_ENOTSUP);
}
static int
-nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_pair_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
return (NNG_ENOTSUP);
}
@@ -198,7 +195,7 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_pair_proto_pipe = {
+static nni_proto_pipe_ops nni_pair_pipe_ops = {
.pipe_init = nni_pair_pipe_init,
.pipe_fini = nni_pair_pipe_fini,
.pipe_add = nni_pair_pipe_add,
@@ -207,15 +204,22 @@ static nni_proto_pipe nni_pair_proto_pipe = {
.pipe_recv = nni_pair_pipe_recv,
};
+static nni_proto_sock_ops nni_pair_sock_ops = {
+ .sock_init = nni_pair_sock_init,
+ .sock_fini = nni_pair_sock_fini,
+ .sock_close = NULL,
+ .sock_setopt = nni_pair_sock_setopt,
+ .sock_getopt = nni_pair_sock_getopt,
+ .sock_rfilter = NULL,
+ .sock_sfilter = NULL,
+ .sock_send = NULL,
+ .sock_recv = NULL,
+};
+
nni_proto nni_pair_proto = {
- .proto_self = NNG_PROTO_PAIR,
- .proto_peer = NNG_PROTO_PAIR,
- .proto_name = "pair",
- .proto_pipe = &nni_pair_proto_pipe,
- .proto_init = nni_pair_init,
- .proto_fini = nni_pair_fini,
- .proto_setopt = nni_pair_setopt,
- .proto_getopt = nni_pair_getopt,
- .proto_recv_filter = NULL,
- .proto_send_filter = NULL,
+ .proto_self = NNG_PROTO_PAIR,
+ .proto_peer = NNG_PROTO_PAIR,
+ .proto_name = "pair",
+ .proto_sock_ops = &nni_pair_sock_ops,
+ .proto_pipe_ops = &nni_pair_pipe_ops,
};
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 695b730e..9612177c 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -19,7 +19,6 @@ typedef struct nni_pull_sock nni_pull_sock;
// An nni_pull_sock is our per-socket protocol private structure.
struct nni_pull_sock {
- nni_mtx mx;
nni_msgq * urq;
int raw;
};
@@ -31,7 +30,7 @@ struct nni_pull_pipe {
};
static int
-nni_pull_init(void **pullp, nni_sock *sock)
+nni_pull_sock_init(void **pullp, nni_sock *sock)
{
nni_pull_sock *pull;
int rv;
@@ -39,10 +38,6 @@ nni_pull_init(void **pullp, nni_sock *sock)
if ((pull = NNI_ALLOC_STRUCT(pull)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pull->mx)) != 0) {
- NNI_FREE_STRUCT(pull);
- return (rv);
- }
pull->raw = 0;
pull->urq = nni_sock_recvq(sock);
*pullp = pull;
@@ -52,11 +47,10 @@ nni_pull_init(void **pullp, nni_sock *sock)
static void
-nni_pull_fini(void *arg)
+nni_pull_sock_fini(void *arg)
{
nni_pull_sock *pull = arg;
- nni_mtx_fini(&pull->mx);
NNI_FREE_STRUCT(pull);
}
@@ -86,32 +80,6 @@ nni_pull_pipe_fini(void *arg)
}
-static int
-nni_pull_pipe_add(void *arg)
-{
- nni_pull_pipe *pp = arg;
-
- if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PUSH) {
- return (NNG_EPROTO);
- }
- return (0);
-}
-
-
-static void
-nni_pull_pipe_rem(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-
-static void
-nni_pull_pipe_send(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-
static void
nni_pull_pipe_recv(void *arg)
{
@@ -133,16 +101,14 @@ nni_pull_pipe_recv(void *arg)
static int
-nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_pull_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pull_sock *pull = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&pull->mx);
rv = nni_setopt_int(&pull->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&pull->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -152,16 +118,14 @@ nni_pull_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_pull_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pull_sock *pull = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&pull->mx);
rv = nni_getopt_int(&pull->raw, buf, szp);
- nni_mtx_unlock(&pull->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -172,24 +136,31 @@ nni_pull_getopt(void *arg, int opt, void *buf, size_t *szp)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_pull_proto_pipe = {
+static nni_proto_pipe_ops nni_pull_pipe_ops = {
.pipe_init = nni_pull_pipe_init,
.pipe_fini = nni_pull_pipe_fini,
- .pipe_add = nni_pull_pipe_add,
- .pipe_rem = nni_pull_pipe_rem,
- .pipe_send = nni_pull_pipe_send,
+ .pipe_add = NULL,
+ .pipe_rem = NULL,
+ .pipe_send = NULL,
.pipe_recv = nni_pull_pipe_recv,
};
+static nni_proto_sock_ops nni_pull_sock_ops = {
+ .sock_init = nni_pull_sock_init,
+ .sock_fini = nni_pull_sock_fini,
+ .sock_close = NULL,
+ .sock_setopt = nni_pull_sock_setopt,
+ .sock_getopt = nni_pull_sock_getopt,
+ .sock_send = NULL,
+ .sock_recv = NULL,
+ .sock_rfilter = NULL,
+ .sock_sfilter = NULL,
+};
+
nni_proto nni_pull_proto = {
- .proto_self = NNG_PROTO_PULL,
- .proto_peer = NNG_PROTO_PUSH,
- .proto_name = "pull",
- .proto_pipe = &nni_pull_proto_pipe,
- .proto_init = nni_pull_init,
- .proto_fini = nni_pull_fini,
- .proto_setopt = nni_pull_setopt,
- .proto_getopt = nni_pull_getopt,
- .proto_recv_filter = NULL,
- .proto_send_filter = NULL,
+ .proto_self = NNG_PROTO_PULL,
+ .proto_peer = NNG_PROTO_PUSH,
+ .proto_name = "pull",
+ .proto_pipe_ops = &nni_pull_pipe_ops,
+ .proto_sock_ops = &nni_pull_sock_ops,
};
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index de774125..6cdc9cc5 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -21,16 +21,15 @@ typedef struct nni_push_sock nni_push_sock;
// An nni_push_sock is our per-socket protocol private structure.
struct nni_push_sock {
- nni_mtx mx;
nni_cv cv;
nni_msgq * uwq;
- nni_thr sender;
int raw;
int closing;
int wantw;
nni_list pipes;
nni_push_pipe * nextpipe;
int npipes;
+ nni_sock * sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
@@ -42,10 +41,8 @@ struct nni_push_pipe {
nni_list_node node;
};
-static void nni_push_rrdist(void *);
-
static int
-nni_push_init(void **pushp, nni_sock *sock)
+nni_push_sock_init(void **pushp, nni_sock *sock)
{
nni_push_sock *push;
int rv;
@@ -53,12 +50,7 @@ nni_push_init(void **pushp, nni_sock *sock)
if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&push->mx)) != 0) {
- NNI_FREE_STRUCT(push);
- return (rv);
- }
- if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) {
- nni_mtx_fini(&push->mx);
+ if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) {
NNI_FREE_STRUCT(push);
return (rv);
}
@@ -67,36 +59,32 @@ nni_push_init(void **pushp, nni_sock *sock)
push->npipes = 0;
push->wantw = 0;
push->nextpipe = NULL;
+ push->sock = sock;
push->uwq = nni_sock_sendq(sock);
*pushp = push;
nni_sock_recverr(sock, NNG_ENOTSUP);
- rv = nni_thr_init(&push->sender, nni_push_rrdist, push);
- if (rv != 0) {
- nni_cv_fini(&push->cv);
- nni_mtx_fini(&push->mx);
- NNI_FREE_STRUCT(push);
- return (rv);
- }
- nni_thr_run(&push->sender);
return (0);
}
static void
-nni_push_fini(void *arg)
+nni_push_sock_close(void *arg)
{
nni_push_sock *push = arg;
// Shut down the resender. We request it to exit by clearing
// its old value, then kick it.
- nni_mtx_lock(&push->mx);
push->closing = 1;
nni_cv_wake(&push->cv);
- nni_mtx_unlock(&push->mx);
+}
+
+
+static void
+nni_push_sock_fini(void *arg)
+{
+ nni_push_sock *push = arg;
- nni_thr_fini(&push->sender);
nni_cv_fini(&push->cv);
- nni_mtx_fini(&push->mx);
NNI_FREE_STRUCT(push);
}
@@ -142,9 +130,6 @@ nni_push_pipe_add(void *arg)
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
return (NNG_EPROTO);
}
- // Wake the sender since we have a new pipe.
- nni_mtx_lock(&push->mx);
-
// Turns out it should not really matter where we stick this.
// The end makes our test cases easier.
nni_list_append(&push->pipes, pp);
@@ -152,7 +137,6 @@ nni_push_pipe_add(void *arg)
// Wake the top sender, as we can accept a job.
push->npipes++;
nni_cv_wake(&push->cv);
- nni_mtx_unlock(&push->mx);
return (0);
}
@@ -163,13 +147,11 @@ nni_push_pipe_rem(void *arg)
nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
- nni_mtx_lock(&push->mx);
if (pp == push->nextpipe) {
push->nextpipe = nni_list_next(&push->pipes, pp);
}
push->npipes--;
nni_list_remove(&push->pipes, pp);
- nni_mtx_unlock(&push->mx);
}
@@ -178,17 +160,18 @@ nni_push_pipe_send(void *arg)
{
nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
+ nni_mtx *mx = nni_sock_mtx(push->sock);
nni_msg *msg;
for (;;) {
if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) {
break;
}
- nni_mtx_lock(&push->mx);
+ nni_mtx_lock(mx);
if (push->wantw) {
nni_cv_wake(&push->cv);
}
- nni_mtx_unlock(&push->mx);
+ nni_mtx_unlock(mx);
if (nni_pipe_send(pp->pipe, msg) != 0) {
nni_msg_free(msg);
break;
@@ -216,16 +199,14 @@ nni_push_pipe_recv(void *arg)
static int
-nni_push_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_push_sock *push = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&push->mx);
rv = nni_setopt_int(&push->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&push->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -235,16 +216,14 @@ nni_push_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_push_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_push_sock *push = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&push->mx);
rv = nni_getopt_int(&push->raw, buf, szp);
- nni_mtx_unlock(&push->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -254,12 +233,13 @@ nni_push_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
-nni_push_rrdist(void *arg)
+nni_push_sock_send(void *arg)
{
nni_push_sock *push = arg;
nni_push_pipe *pp;
nni_msgq *uwq = push->uwq;
nni_msg *msg = NULL;
+ nni_mtx *mx = nni_sock_mtx(push->sock);
int rv;
int i;
@@ -269,10 +249,10 @@ nni_push_rrdist(void *arg)
return;
}
- nni_mtx_lock(&push->mx);
+ nni_mtx_lock(mx);
if (push->closing) {
if (msg != NULL) {
- nni_mtx_unlock(&push->mx);
+ nni_mtx_unlock(mx);
nni_msg_free(msg);
return;
}
@@ -296,14 +276,14 @@ nni_push_rrdist(void *arg)
} else {
push->wantw = 0;
}
- nni_mtx_unlock(&push->mx);
+ nni_mtx_unlock(mx);
}
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_push_proto_pipe = {
+static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_init = nni_push_pipe_init,
.pipe_fini = nni_push_pipe_fini,
.pipe_add = nni_push_pipe_add,
@@ -312,15 +292,22 @@ static nni_proto_pipe nni_push_proto_pipe = {
.pipe_recv = nni_push_pipe_recv,
};
+static nni_proto_sock_ops nni_push_sock_ops = {
+ .sock_init = nni_push_sock_init,
+ .sock_fini = nni_push_sock_fini,
+ .sock_close = nni_push_sock_close,
+ .sock_setopt = nni_push_sock_setopt,
+ .sock_getopt = nni_push_sock_getopt,
+ .sock_send = nni_push_sock_send,
+ .sock_recv = NULL,
+ .sock_rfilter = NULL,
+ .sock_sfilter = NULL,
+};
+
nni_proto nni_push_proto = {
- .proto_self = NNG_PROTO_PUSH,
- .proto_peer = NNG_PROTO_PULL,
- .proto_name = "push",
- .proto_pipe = &nni_push_proto_pipe,
- .proto_init = nni_push_init,
- .proto_fini = nni_push_fini,
- .proto_setopt = nni_push_setopt,
- .proto_getopt = nni_push_getopt,
- .proto_recv_filter = NULL,
- .proto_send_filter = NULL,
+ .proto_self = NNG_PROTO_PUSH,
+ .proto_peer = NNG_PROTO_PULL,
+ .proto_name = "push",
+ .proto_pipe_ops = &nni_push_pipe_ops,
+ .proto_sock_ops = &nni_push_sock_ops,
};
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 860c7c7d..684b916d 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -23,10 +23,8 @@ typedef struct nni_pub_sock nni_pub_sock;
// An nni_pub_sock is our per-socket protocol private structure.
struct nni_pub_sock {
nni_sock * sock;
- nni_mtx mx;
nni_msgq * uwq;
int raw;
- nni_thr sender;
nni_list pipes;
};
@@ -39,10 +37,10 @@ struct nni_pub_pipe {
int sigclose;
};
-static void nni_pub_broadcast(void *);
+static void nni_pub_sock_send(void *);
static int
-nni_pub_init(void **pubp, nni_sock *sock)
+nni_pub_sock_init(void **pubp, nni_sock *sock)
{
nni_pub_sock *pub;
int rv;
@@ -50,36 +48,23 @@ nni_pub_init(void **pubp, nni_sock *sock)
if ((pub = NNI_ALLOC_STRUCT(pub)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pub->mx)) != 0) {
- NNI_FREE_STRUCT(pub);
- return (rv);
- }
pub->sock = sock;
pub->raw = 0;
NNI_LIST_INIT(&pub->pipes, nni_pub_pipe, node);
pub->uwq = nni_sock_sendq(sock);
- rv = nni_thr_init(&pub->sender, nni_pub_broadcast, pub);
- if (rv != 0) {
- nni_mtx_fini(&pub->mx);
- NNI_FREE_STRUCT(pub);
- return (rv);
- }
*pubp = pub;
nni_sock_recverr(sock, NNG_ENOTSUP);
- nni_thr_run(&pub->sender);
return (0);
}
static void
-nni_pub_fini(void *arg)
+nni_pub_sock_fini(void *arg)
{
nni_pub_sock *pub = arg;
- nni_thr_fini(&pub->sender);
- nni_mtx_fini(&pub->mx);
NNI_FREE_STRUCT(pub);
}
@@ -125,10 +110,7 @@ nni_pub_pipe_add(void *arg)
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_SUB) {
return (NNG_EPROTO);
}
- nni_mtx_lock(&pub->mx);
nni_list_append(&pub->pipes, pp);
- nni_mtx_unlock(&pub->mx);
-
return (0);
}
@@ -139,18 +121,17 @@ nni_pub_pipe_rem(void *arg)
nni_pub_pipe *pp = arg;
nni_pub_sock *pub = pp->pub;
- nni_mtx_lock(&pub->mx);
nni_list_remove(&pub->pipes, pp);
- nni_mtx_unlock(&pub->mx);
}
static void
-nni_pub_broadcast(void *arg)
+nni_pub_sock_send(void *arg)
{
nni_pub_sock *pub = arg;
nni_msgq *uwq = pub->uwq;
nni_msg *msg, *dup;
+ nni_mtx *mx = nni_sock_mtx(pub->sock);
for (;;) {
nni_pub_pipe *pp;
@@ -161,7 +142,7 @@ nni_pub_broadcast(void *arg)
break;
}
- nni_mtx_lock(&pub->mx);
+ nni_mtx_lock(mx);
last = nni_list_last(&pub->pipes);
NNI_LIST_FOREACH (&pub->pipes, pp) {
if (pp != last) {
@@ -176,7 +157,7 @@ nni_pub_broadcast(void *arg)
nni_msg_free(dup);
}
}
- nni_mtx_unlock(&pub->mx);
+ nni_mtx_unlock(mx);
if (last == NULL) {
nni_msg_free(msg);
@@ -236,16 +217,14 @@ nni_pub_pipe_recv(void *arg)
static int
-nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_pub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_pub_sock *pub = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&pub->mx);
rv = nni_setopt_int(&pub->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&pub->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -255,16 +234,14 @@ nni_pub_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_pub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_pub_sock *pub = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&pub->mx);
rv = nni_getopt_int(&pub->raw, buf, szp);
- nni_mtx_unlock(&pub->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -275,7 +252,7 @@ nni_pub_getopt(void *arg, int opt, void *buf, size_t *szp)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_pub_proto_pipe = {
+static nni_proto_pipe_ops nni_pub_pipe_ops = {
.pipe_init = nni_pub_pipe_init,
.pipe_fini = nni_pub_pipe_fini,
.pipe_add = nni_pub_pipe_add,
@@ -284,15 +261,22 @@ static nni_proto_pipe nni_pub_proto_pipe = {
.pipe_recv = nni_pub_pipe_recv,
};
+nni_proto_sock_ops nni_pub_sock_ops = {
+ .sock_init = nni_pub_sock_init,
+ .sock_fini = nni_pub_sock_fini,
+ .sock_close = NULL,
+ .sock_setopt = nni_pub_sock_setopt,
+ .sock_getopt = nni_pub_sock_getopt,
+ .sock_send = nni_pub_sock_send,
+ .sock_recv = NULL,
+ .sock_rfilter = NULL,
+ .sock_sfilter = NULL,
+};
+
nni_proto nni_pub_proto = {
- .proto_self = NNG_PROTO_PUB,
- .proto_peer = NNG_PROTO_SUB,
- .proto_name = "pub",
- .proto_pipe = &nni_pub_proto_pipe,
- .proto_init = nni_pub_init,
- .proto_fini = nni_pub_fini,
- .proto_setopt = nni_pub_setopt,
- .proto_getopt = nni_pub_getopt,
- .proto_recv_filter = NULL,
- .proto_send_filter = NULL,
+ .proto_self = NNG_PROTO_PUB,
+ .proto_peer = NNG_PROTO_SUB,
+ .proto_name = "pub",
+ .proto_sock_ops = &nni_pub_sock_ops,
+ .proto_pipe_ops = &nni_pub_pipe_ops,
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 19c06aa0..dd288d5e 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -29,7 +29,6 @@ struct nni_sub_topic {
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_sub_sock {
nni_sock * sock;
- nni_mtx mx;
nni_list topics;
nni_msgq * urq;
int raw;
@@ -42,7 +41,7 @@ struct nni_sub_pipe {
};
static int
-nni_sub_init(void **subp, nni_sock *sock)
+nni_sub_sock_init(void **subp, nni_sock *sock)
{
nni_sub_sock *sub;
int rv;
@@ -50,10 +49,6 @@ nni_sub_init(void **subp, nni_sock *sock)
if ((sub = NNI_ALLOC_STRUCT(sub)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&sub->mx)) != 0) {
- NNI_FREE_STRUCT(sub);
- return (rv);
- }
NNI_LIST_INIT(&sub->topics, nni_sub_topic, node);
sub->sock = sock;
sub->raw = 0;
@@ -66,7 +61,7 @@ nni_sub_init(void **subp, nni_sock *sock)
static void
-nni_sub_fini(void *arg)
+nni_sub_sock_fini(void *arg)
{
nni_sub_sock *sub = arg;
nni_sub_topic *topic;
@@ -76,7 +71,6 @@ nni_sub_fini(void *arg)
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
}
- nni_mtx_fini(&sub->mx);
NNI_FREE_STRUCT(sub);
}
@@ -106,32 +100,6 @@ nni_sub_pipe_fini(void *arg)
}
-static int
-nni_sub_pipe_add(void *arg)
-{
- nni_sub_pipe *sp = arg;
-
- if (nni_pipe_peer(sp->pipe) != NNG_PROTO_PUB) {
- return (NNG_EPROTO);
- }
- return (0);
-}
-
-
-static void
-nni_sub_pipe_rem(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-
-static void
-nni_sub_pipe_send(void *arg)
-{
- NNI_ARG_UNUSED(arg);
-}
-
-
static void
nni_sub_pipe_recv(void *arg)
{
@@ -242,26 +210,20 @@ nni_sub_unsubscribe(nni_sub_sock *sub, const void *buf, size_t sz)
static int
-nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_sub_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_sub_sock *sub = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&sub->mx);
rv = nni_setopt_int(&sub->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&sub->mx);
break;
case NNG_OPT_SUBSCRIBE:
- nni_mtx_lock(&sub->mx);
rv = nni_sub_subscribe(sub, buf, sz);
- nni_mtx_unlock(&sub->mx);
break;
case NNG_OPT_UNSUBSCRIBE:
- nni_mtx_lock(&sub->mx);
rv = nni_sub_unsubscribe(sub, buf, sz);
- nni_mtx_unlock(&sub->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -271,16 +233,14 @@ nni_sub_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_sub_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_sub_sock *sub = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&sub->mx);
rv = nni_getopt_int(&sub->raw, buf, szp);
- nni_mtx_unlock(&sub->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -290,7 +250,7 @@ nni_sub_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_msg *
-nni_sub_recvfilter(void *arg, nni_msg *msg)
+nni_sub_sock_rfilter(void *arg, nni_msg *msg)
{
nni_sub_sock *sub = arg;
nni_sub_topic *topic;
@@ -298,9 +258,7 @@ nni_sub_recvfilter(void *arg, nni_msg *msg)
size_t len;
int match;
- nni_mtx_lock(&sub->mx);
if (sub->raw) {
- nni_mtx_unlock(&sub->mx);
return (msg);
}
@@ -326,7 +284,6 @@ nni_sub_recvfilter(void *arg, nni_msg *msg)
break;
}
}
- nni_mtx_unlock(&sub->mx);
if (!match) {
nni_msg_free(msg);
return (NULL);
@@ -337,23 +294,31 @@ nni_sub_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_sub_proto_pipe = {
+static nni_proto_pipe_ops nni_sub_pipe_ops = {
.pipe_init = nni_sub_pipe_init,
.pipe_fini = nni_sub_pipe_fini,
- .pipe_add = nni_sub_pipe_add,
- .pipe_rem = nni_sub_pipe_rem,
- .pipe_send = nni_sub_pipe_send,
+ .pipe_add = NULL,
+ .pipe_rem = NULL,
+ .pipe_send = NULL,
.pipe_recv = nni_sub_pipe_recv,
};
+static nni_proto_sock_ops nni_sub_sock_ops = {
+ .sock_init = nni_sub_sock_init,
+ .sock_fini = nni_sub_sock_fini,
+ .sock_close = NULL,
+ .sock_setopt = nni_sub_sock_setopt,
+ .sock_getopt = nni_sub_sock_getopt,
+ .sock_rfilter = nni_sub_sock_rfilter,
+ .sock_sfilter = NULL,
+ .sock_send = NULL,
+ .sock_recv = NULL,
+};
+
nni_proto nni_sub_proto = {
- .proto_self = NNG_PROTO_SUB,
- .proto_peer = NNG_PROTO_PUB,
- .proto_name = "sub",
- .proto_pipe = &nni_sub_proto_pipe,
- .proto_init = nni_sub_init,
- .proto_fini = nni_sub_fini,
- .proto_setopt = nni_sub_setopt,
- .proto_getopt = nni_sub_getopt,
- .proto_recv_filter = nni_sub_recvfilter,
+ .proto_self = NNG_PROTO_SUB,
+ .proto_peer = NNG_PROTO_PUB,
+ .proto_name = "sub",
+ .proto_sock_ops = &nni_sub_sock_ops,
+ .proto_pipe_ops = &nni_sub_pipe_ops,
};
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 56ee2367..8de196c4 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -22,7 +22,6 @@ typedef struct nni_rep_sock nni_rep_sock;
// An nni_rep_sock is our per-socket protocol private structure.
struct nni_rep_sock {
nni_sock * sock;
- nni_mtx mx;
nni_msgq * uwq;
nni_msgq * urq;
int raw;
@@ -44,7 +43,7 @@ struct nni_rep_pipe {
static void nni_rep_topsender(void *);
static int
-nni_rep_init(void **repp, nni_sock *sock)
+nni_rep_sock_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
int rv;
@@ -52,17 +51,12 @@ nni_rep_init(void **repp, nni_sock *sock)
if ((rep = NNI_ALLOC_STRUCT(rep)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&rep->mx)) != 0) {
- NNI_FREE_STRUCT(rep);
- return (rv);
- }
rep->ttl = 8; // Per RFC
rep->sock = sock;
rep->raw = 0;
rep->btrace = NULL;
rep->btrace_len = 0;
if ((rv = nni_idhash_create(&rep->pipes)) != 0) {
- nni_mtx_fini(&rep->mx);
NNI_FREE_STRUCT(rep);
return (rv);
}
@@ -70,28 +64,18 @@ nni_rep_init(void **repp, nni_sock *sock)
rep->uwq = nni_sock_sendq(sock);
rep->urq = nni_sock_recvq(sock);
- rv = nni_thr_init(&rep->sender, nni_rep_topsender, rep);
- if (rv != 0) {
- nni_idhash_destroy(rep->pipes);
- nni_mtx_fini(&rep->mx);
- NNI_FREE_STRUCT(rep);
- return (rv);
- }
*repp = rep;
nni_sock_senderr(sock, NNG_ESTATE);
- nni_thr_run(&rep->sender);
return (0);
}
static void
-nni_rep_fini(void *arg)
+nni_rep_sock_fini(void *arg)
{
nni_rep_sock *rep = arg;
- nni_thr_fini(&rep->sender);
nni_idhash_destroy(rep->pipes);
- nni_mtx_fini(&rep->mx);
if (rep->btrace != NULL) {
nni_free(rep->btrace, rep->btrace_len);
}
@@ -135,16 +119,8 @@ nni_rep_pipe_add(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- int rv;
-
- if (nni_pipe_peer(rp->pipe) != NNG_PROTO_REQ) {
- return (NNG_EPROTO);
- }
- nni_mtx_lock(&rep->mx);
- rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
- nni_mtx_unlock(&rep->mx);
- return (rv);
+ return (nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp));
}
@@ -154,22 +130,21 @@ nni_rep_pipe_rem(void *arg)
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
- nni_mtx_unlock(&rep->mx);
}
-// nni_rep_topsender watches for messages from the upper write queue,
+// nni_rep_sock_send watches for messages from the upper write queue,
// extracts the destination pipe, and forwards it to the appropriate
// destination pipe via a separate queue. This prevents a single bad
// or slow pipe from gumming up the works for the entire socket.
static void
-nni_rep_topsender(void *arg)
+nni_rep_sock_send(void *arg)
{
nni_rep_sock *rep = arg;
nni_msgq *uwq = rep->uwq;
nni_msgq *urq = rep->urq;
+ nni_mtx *mx = nni_sock_mtx(rep->sock);
nni_msg *msg;
for (;;) {
@@ -190,9 +165,9 @@ nni_rep_topsender(void *arg)
NNI_GET32(header, id);
nni_msg_trim_header(msg, 4);
- nni_mtx_lock(&rep->mx);
+ nni_mtx_lock(mx);
if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) {
- nni_mtx_unlock(&rep->mx);
+ nni_mtx_unlock(mx);
nni_msg_free(msg);
continue;
}
@@ -204,7 +179,7 @@ nni_rep_topsender(void *arg)
// circumstances.
nni_msg_free(msg);
}
- nni_mtx_unlock(&rep->mx);
+ nni_mtx_unlock(mx);
}
}
@@ -218,8 +193,6 @@ nni_rep_pipe_send(void *arg)
nni_msgq *wq = rp->sendq;
nni_pipe *pipe = rp->pipe;
nni_msg *msg;
- uint8_t *body;
- size_t size;
int rv;
for (;;) {
@@ -311,21 +284,17 @@ again:
static int
-nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_rep_sock *rep = arg;
int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
- nni_mtx_lock(&rep->mx);
rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255);
- nni_mtx_unlock(&rep->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&rep->mx);
rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&rep->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -335,21 +304,17 @@ nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_rep_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_rep_sock *rep = arg;
int rv;
switch (opt) {
case NNG_OPT_MAXTTL:
- nni_mtx_lock(&rep->mx);
rv = nni_getopt_int(&rep->ttl, buf, szp);
- nni_mtx_unlock(&rep->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&rep->mx);
rv = nni_getopt_int(&rep->raw, buf, szp);
- nni_mtx_unlock(&rep->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -359,14 +324,12 @@ nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp)
static nni_msg *
-nni_rep_sendfilter(void *arg, nni_msg *msg)
+nni_rep_sock_sfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
size_t len;
- nni_mtx_lock(&rep->mx);
if (rep->raw) {
- nni_mtx_unlock(&rep->mx);
return (msg);
}
@@ -376,7 +339,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
if (rep->btrace == NULL) {
- nni_mtx_unlock(&rep->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -388,7 +350,6 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
nni_free(rep->btrace, rep->btrace_len);
rep->btrace = NULL;
rep->btrace_len = 0;
- nni_mtx_unlock(&rep->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -396,21 +357,18 @@ nni_rep_sendfilter(void *arg, nni_msg *msg)
nni_free(rep->btrace, rep->btrace_len);
rep->btrace = NULL;
rep->btrace_len = 0;
- nni_mtx_unlock(&rep->mx);
return (msg);
}
static nni_msg *
-nni_rep_recvfilter(void *arg, nni_msg *msg)
+nni_rep_sock_rfilter(void *arg, nni_msg *msg)
{
nni_rep_sock *rep = arg;
char *header;
size_t len;
- nni_mtx_lock(&rep->mx);
if (rep->raw) {
- nni_mtx_unlock(&rep->mx);
return (msg);
}
@@ -423,21 +381,19 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
rep->btrace_len = 0;
}
if ((rep->btrace = nni_alloc(len)) == NULL) {
- nni_mtx_unlock(&rep->mx);
nni_msg_free(msg);
return (NULL);
}
rep->btrace_len = len;
memcpy(rep->btrace, header, len);
nni_msg_trunc_header(msg, len);
- nni_mtx_unlock(&rep->mx);
return (msg);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_rep_proto_pipe = {
+static nni_proto_pipe_ops nni_rep_pipe_ops = {
.pipe_init = nni_rep_pipe_init,
.pipe_fini = nni_rep_pipe_fini,
.pipe_add = nni_rep_pipe_add,
@@ -446,15 +402,22 @@ static nni_proto_pipe nni_rep_proto_pipe = {
.pipe_recv = nni_rep_pipe_recv,
};
+static nni_proto_sock_ops nni_rep_sock_ops = {
+ .sock_init = nni_rep_sock_init,
+ .sock_fini = nni_rep_sock_fini,
+ .sock_close = NULL,
+ .sock_setopt = nni_rep_sock_setopt,
+ .sock_getopt = nni_rep_sock_getopt,
+ .sock_rfilter = nni_rep_sock_rfilter,
+ .sock_sfilter = nni_rep_sock_sfilter,
+ .sock_send = nni_rep_sock_send,
+ .sock_recv = NULL,
+};
+
nni_proto nni_rep_proto = {
- .proto_self = NNG_PROTO_REP,
- .proto_peer = NNG_PROTO_REQ,
- .proto_name = "rep",
- .proto_pipe = &nni_rep_proto_pipe,
- .proto_init = nni_rep_init,
- .proto_fini = nni_rep_fini,
- .proto_setopt = nni_rep_setopt,
- .proto_getopt = nni_rep_getopt,
- .proto_recv_filter = nni_rep_recvfilter,
- .proto_send_filter = nni_rep_sendfilter,
+ .proto_self = NNG_PROTO_REP,
+ .proto_peer = NNG_PROTO_REQ,
+ .proto_name = "rep",
+ .proto_sock_ops = &nni_rep_sock_ops,
+ .proto_pipe_ops = &nni_rep_pipe_ops,
};
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 1c58d7a1..d8104342 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -22,7 +22,6 @@ typedef struct nni_req_sock nni_req_sock;
// An nni_req_sock is our per-socket protocol private structure.
struct nni_req_sock {
nni_sock * sock;
- nni_mtx mx;
nni_cv cv;
nni_msgq * uwq;
nni_msgq * urq;
@@ -48,7 +47,7 @@ struct nni_req_pipe {
static void nni_req_resender(void *);
static int
-nni_req_init(void **reqp, nni_sock *sock)
+nni_req_sock_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
int rv;
@@ -56,12 +55,7 @@ nni_req_init(void **reqp, nni_sock *sock)
if ((req = NNI_ALLOC_STRUCT(req)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&req->mx)) != 0) {
- NNI_FREE_STRUCT(req);
- return (rv);
- }
- if ((rv = nni_cv_init(&req->cv, &req->mx)) != 0) {
- nni_mtx_fini(&req->mx);
+ if ((rv = nni_cv_init(&req->cv, nni_sock_mtx(sock))) != 0) {
NNI_FREE_STRUCT(req);
return (rv);
}
@@ -81,7 +75,6 @@ nni_req_init(void **reqp, nni_sock *sock)
rv = nni_thr_init(&req->resender, nni_req_resender, req);
if (rv != 0) {
nni_cv_fini(&req->cv);
- nni_mtx_fini(&req->mx);
NNI_FREE_STRUCT(req);
return (rv);
}
@@ -91,20 +84,24 @@ nni_req_init(void **reqp, nni_sock *sock)
static void
-nni_req_fini(void *arg)
+nni_req_sock_close(void *arg)
{
nni_req_sock *req = arg;
// Shut down the resender. We request it to exit by clearing
// its old value, then kick it.
- nni_mtx_lock(&req->mx);
req->closing = 1;
nni_cv_wake(&req->cv);
- nni_mtx_unlock(&req->mx);
+}
+
+
+static void
+nni_req_sock_fini(void *arg)
+{
+ nni_req_sock *req = arg;
nni_thr_fini(&req->resender);
nni_cv_fini(&req->cv);
- nni_mtx_fini(&req->mx);
if (req->reqmsg != NULL) {
nni_msg_free(req->reqmsg);
}
@@ -168,15 +165,16 @@ nni_req_pipe_send(void *arg)
nni_msgq *uwq = req->uwq;
nni_msgq *urq = req->urq;
nni_pipe *pipe = rp->pipe;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
nni_msg *msg;
int rv;
for (;;) {
- nni_mtx_lock(&req->mx);
+ nni_mtx_lock(mx);
if ((msg = req->retrymsg) != NULL) {
req->retrymsg = NULL;
}
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
if (msg == NULL) {
rv = nni_msgq_get_sig(uwq, &msg, &rp->sigclose);
if (rv != 0) {
@@ -237,21 +235,17 @@ nni_req_pipe_recv(void *arg)
static int
-nni_req_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_req_sock *req = arg;
int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
- nni_mtx_lock(&req->mx);
rv = nni_setopt_duration(&req->retry, buf, sz);
- nni_mtx_unlock(&req->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&req->mx);
rv = nni_setopt_int(&req->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&req->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -261,21 +255,17 @@ nni_req_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_req_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_req_sock *req = arg;
int rv;
switch (opt) {
case NNG_OPT_RESENDTIME:
- nni_mtx_lock(&req->mx);
rv = nni_getopt_duration(&req->retry, buf, szp);
- nni_mtx_unlock(&req->mx);
break;
case NNG_OPT_RAW:
- nni_mtx_lock(&req->mx);
rv = nni_getopt_int(&req->raw, buf, szp);
- nni_mtx_unlock(&req->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -288,17 +278,18 @@ static void
nni_req_resender(void *arg)
{
nni_req_sock *req = arg;
+ nni_mtx *mx = nni_sock_mtx(req->sock);
int rv;
for (;;) {
- nni_mtx_lock(&req->mx);
+ nni_mtx_lock(mx);
if (req->closing) {
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
return;
}
if (req->reqmsg == NULL) {
nni_cv_wait(&req->cv);
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
continue;
}
rv = nni_cv_until(&req->cv, req->resend);
@@ -309,22 +300,20 @@ nni_req_resender(void *arg)
}
req->resend = nni_clock() + req->retry;
}
- nni_mtx_unlock(&req->mx);
+ nni_mtx_unlock(mx);
}
}
static nni_msg *
-nni_req_sendfilter(void *arg, nni_msg *msg)
+nni_req_sock_sfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
uint32_t id;
- nni_mtx_lock(&req->mx);
if (req->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
- nni_mtx_unlock(&req->mx);
return (msg);
}
@@ -338,7 +327,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
if (nni_msg_append_header(msg, req->reqid, 4) != 0) {
// Should be ENOMEM.
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -351,7 +339,6 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
// Make a duplicate message... for retries.
if (nni_msg_dup(&req->reqmsg, msg) != 0) {
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -362,39 +349,33 @@ nni_req_sendfilter(void *arg, nni_msg *msg)
// Clear the error condition.
nni_sock_recverr(req->sock, 0);
- nni_mtx_unlock(&req->mx);
return (msg);
}
static nni_msg *
-nni_req_recvfilter(void *arg, nni_msg *msg)
+nni_req_sock_rfilter(void *arg, nni_msg *msg)
{
nni_req_sock *req = arg;
- nni_mtx_lock(&req->mx);
if (req->raw) {
// Pass it unmolested
- nni_mtx_unlock(&req->mx);
return (msg);
}
if (nni_msg_header_len(msg) < 4) {
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
if (req->reqmsg == NULL) {
// We had no outstanding request.
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
if (memcmp(nni_msg_header(msg), req->reqid, 4) != 0) {
// Wrong request id
- nni_mtx_unlock(&req->mx);
nni_msg_free(msg);
return (NULL);
}
@@ -403,14 +384,13 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
nni_msg_free(req->reqmsg);
req->reqmsg = NULL;
nni_cv_wake(&req->cv);
- nni_mtx_unlock(&req->mx);
return (msg);
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_req_proto_pipe = {
+static nni_proto_pipe_ops nni_req_pipe_ops = {
.pipe_init = nni_req_pipe_init,
.pipe_fini = nni_req_pipe_fini,
.pipe_add = nni_req_pipe_add,
@@ -419,15 +399,22 @@ static nni_proto_pipe nni_req_proto_pipe = {
.pipe_recv = nni_req_pipe_recv,
};
+static nni_proto_sock_ops nni_req_sock_ops = {
+ .sock_init = nni_req_sock_init,
+ .sock_fini = nni_req_sock_fini,
+ .sock_close = nni_req_sock_close,
+ .sock_setopt = nni_req_sock_setopt,
+ .sock_getopt = nni_req_sock_getopt,
+ .sock_rfilter = nni_req_sock_rfilter,
+ .sock_sfilter = nni_req_sock_sfilter,
+ .sock_send = NULL,
+ .sock_recv = NULL,
+};
+
nni_proto nni_req_proto = {
- .proto_self = NNG_PROTO_REQ,
- .proto_peer = NNG_PROTO_REP,
- .proto_name = "req",
- .proto_pipe = &nni_req_proto_pipe,
- .proto_init = nni_req_init,
- .proto_fini = nni_req_fini,
- .proto_setopt = nni_req_setopt,
- .proto_getopt = nni_req_getopt,
- .proto_recv_filter = nni_req_recvfilter,
- .proto_send_filter = nni_req_sendfilter,
+ .proto_self = NNG_PROTO_REQ,
+ .proto_peer = NNG_PROTO_REP,
+ .proto_name = "req",
+ .proto_sock_ops = &nni_req_sock_ops,
+ .proto_pipe_ops = &nni_req_pipe_ops,
};