aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-02 14:29:47 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-02 14:29:47 -0800
commitb6374f9d9b07c929522066f27ed9a7a05c6bb23b (patch)
tree9195694f13261ba5cd4d8f2446743f815a06619f
parentf729db021a4fd7c782cc08a07185c955f3567ea2 (diff)
downloadnng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.gz
nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.bz2
nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.zip
Protocol initialization restructuring.
-rw-r--r--src/core/defs.h3
-rw-r--r--src/core/pipe.c37
-rw-r--r--src/core/pipe.h4
-rw-r--r--src/core/protocol.c34
-rw-r--r--src/core/protocol.h77
-rw-r--r--src/core/socket.c95
-rw-r--r--src/core/socket.h2
-rw-r--r--src/protocol/pair/pair.c75
-rw-r--r--src/protocol/reqrep/rep.c82
-rw-r--r--src/protocol/reqrep/req.c75
10 files changed, 303 insertions, 181 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index b10701e0..dd225567 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -28,7 +28,8 @@ typedef struct nni_tran nni_tran;
typedef struct nni_tran_ep nni_tran_ep;
typedef struct nni_tran_pipe nni_tran_pipe;
-typedef struct nni_protocol nni_protocol;
+typedef struct nni_proto_pipe nni_proto_pipe;
+typedef struct nni_proto nni_proto;
typedef int nni_signal; // Turnstile/wakeup channel.
typedef uint64_t nni_time; // Absolute time (usec).
diff --git a/src/core/pipe.c b/src/core/pipe.c
index e8864d58..768b7240 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -76,8 +76,8 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_destroy(p->p_tran_data);
}
- if (p->p_pdata != NULL) {
- nni_free(p->p_pdata, p->p_psize);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_fini(p->p_proto_data);
}
NNI_FREE_STRUCT(p);
}
@@ -88,7 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
{
nni_pipe *p;
nni_sock *sock = ep->ep_sock;
- nni_protocol *proto = &sock->s_ops;
+ nni_proto *proto = &sock->s_proto;
+ const nni_proto_pipe *pops = proto->proto_pipe;
int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
@@ -96,32 +97,34 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
}
p->p_sock = sock;
p->p_tran_data = NULL;
+ p->p_proto_data = NULL;
p->p_active = 0;
- p->p_psize = proto->proto_pipe_size;
NNI_LIST_NODE_INIT(&p->p_node);
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *ep->ep_tran->tran_pipe;
- if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) {
+ // Make a copy of the protocol ops. Same rationale.
+ p->p_proto_ops = *pops;
+
+ if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) {
NNI_FREE_STRUCT(p);
- return (NNG_ENOMEM);
+ return (rv);
}
- rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata);
+ rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data);
if (rv != 0) {
- nni_free(p->p_pdata, p->p_psize);
+ pops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
- rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata);
+ rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data);
if (rv != 0) {
nni_thr_fini(&p->p_recv_thr);
- nni_free(p->p_pdata, p->p_psize);
+ pops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
- p->p_psize = sock->s_ops.proto_pipe_size;
nni_mtx_lock(&sock->s_mx);
nni_list_append(&sock->s_pipes, p);
nni_mtx_unlock(&sock->s_mx);
@@ -171,9 +174,11 @@ nni_pipe_start(nni_pipe *pipe)
}
} while (collide);
- rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata);
+ rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data);
if (rv != 0) {
- goto fail;
+ nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
+ return (rv);
}
nni_thr_run(&pipe->p_send_thr);
nni_thr_run(&pipe->p_recv_thr);
@@ -183,10 +188,4 @@ nni_pipe_start(nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
return (0);
-
-fail:
- pipe->p_reap = 1;
- nni_mtx_unlock(&sock->s_mx);
- nni_pipe_close(pipe);
- return (rv);
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index f037bb3b..d2819a31 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -21,8 +21,8 @@ struct nng_pipe {
uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
- void * p_pdata; // protocol specific data
- size_t p_psize; // size of protocol data
+ nni_proto_pipe p_proto_ops;
+ void * p_proto_data;
nni_list_node p_node;
nni_sock * p_sock;
nni_ep * p_ep;
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 6399cd41..8669275a 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -16,18 +16,18 @@
// The list of protocols is hardwired. This is reasonably unlikely to
// change, as adding new protocols is not something intended to be done
// outside of the core.
-extern nni_protocol nni_pair_protocol;
+extern nni_proto nni_pair_proto;
-static nni_protocol *protocols[] = {
- &nni_pair_protocol,
+static nni_proto *protocols[] = {
+ &nni_pair_proto,
NULL
};
-nni_protocol *
-nni_protocol_find(uint16_t num)
+nni_proto *
+nni_proto_find(uint16_t num)
{
int i;
- nni_protocol *p;
+ nni_proto *p;
for (i = 0; (p = protocols[i]) != NULL; i++) {
if (p->proto_self == num) {
@@ -39,11 +39,11 @@ nni_protocol_find(uint16_t num)
const char *
-nni_protocol_name(uint16_t num)
+nni_proto_name(uint16_t num)
{
- nni_protocol *p;
+ nni_proto *p;
- if ((p = nni_protocol_find(num)) == NULL) {
+ if ((p = nni_proto_find(num)) == NULL) {
return (NULL);
}
return (p->proto_name);
@@ -51,9 +51,9 @@ nni_protocol_name(uint16_t num)
uint16_t
-nni_protocol_number(const char *name)
+nni_proto_number(const char *name)
{
- nni_protocol *p;
+ nni_proto *p;
int i;
for (i = 0; (p = protocols[i]) != NULL; i++) {
@@ -63,3 +63,15 @@ nni_protocol_number(const char *name)
}
return (NNG_PROTO_NONE);
}
+
+uint16_t
+nni_proto_peer(uint16_t num)
+{
+ nni_proto *p;
+
+ if ((p = nni_proto_find(num)) == NULL) {
+ return (NNG_PROTO_NONE);
+ }
+ return (p->proto_peer);
+}
+
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 301782b4..c858ad39 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -19,48 +19,75 @@
// work, and the pipe functions generally assume no locking is needed.
// As a consequence, most of the concurrency in nng exists in the protocol
// implementations.
-struct nni_protocol {
- uint16_t proto_self; // our 16-bit protocol ID
- uint16_t proto_peer; // who we peer with (ID)
- const char * proto_name; // string version of our name
- size_t proto_pipe_size; // pipe private data size
- //Create protocol instance, which will be stored on the socket.
- int (*proto_create)(void **, nni_sock *);
+// nni_proto_pipe contains protocol-specific per-pipe operations.
+struct nni_proto_pipe {
+ // pipe_init creates the protocol-specific per pipe data structure.
+ // The last argument is the per-socket protocol private data.
+ int (*pipe_init)(void **, nni_pipe *, void *);
- // Destroy the protocol instance.
- void (*proto_destroy)(void *);
+ // pipe_fini releases any pipe data structures. This is called after
+ // the pipe has been removed from the protocol, and the generic
+ // pipe threads have been stopped.
+ void (*pipe_fini)(void *);
+
+ // pipe_add is called to register a pipe with the protocol. The
+ // protocol can reject this, for example if another pipe is already
+ // active on a 1:1 protocol. The protocol may not block during this,
+ // as the socket lock is held.
+ int (*pipe_add)(void *);
+
+ // pipe_rem is called to unregister a pipe from the protocol.
+ // Threads may still acccess data structures, so the protocol
+ // should not free anything yet. This is called with the socket
+ // lock held, so the protocol may not call back into the socket, and
+ // must not block.
+ void (*pipe_rem)(void *);
+
+ // pipe_send is a function run in a thread per pipe, to process
+ // send activity. This can be NULL.
+ void (*pipe_send)(void *);
- // Add and remove pipes. These are called as connections are
- // created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *, void *);
- void (*proto_rem_pipe)(void *, void *);
+ // pipe_recv is a function run in a thread per pipe, to process
+ // receive activity. While this can be NULL, it should NOT be, as
+ // otherwise the protocol may not be able to discover the closure of
+ // the underlying transport (such as a remote disconnect).
+ void (*pipe_recv)(void *);
+};
+
+struct nni_proto {
+ uint16_t proto_self; // our 16-bit protocol ID
+ uint16_t proto_peer; // who we peer with (ID)
+ const char * proto_name; // string version of our name
+ const nni_proto_pipe * proto_pipe; // Per-pipe operations.
- // Thread functions for processing send & receive sides of
- // protocol pipes. Send may be NULL, but recv should should not.
- // (Recv needs to detect a closed pipe, if nothing else.)
- void (*proto_pipe_send)(void *);
- void (*proto_pipe_recv)(void *);
+ // Create protocol instance, which will be stored on the socket.
+ int (*proto_init)(void **, nni_sock *);
+
+ // Destroy the protocol instance.
+ void (*proto_fini)(void *);
// Option manipulation. These may be NULL.
- int (*proto_setopt)(void *, int, const void *, size_t);
- int (*proto_getopt)(void *, int, void *, size_t *);
+ int (*proto_setopt)(void *, int, const void *,
+ size_t);
+ int (*proto_getopt)(void *, int, void *, size_t *);
// Receive filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
// delivered to the application. To drop the message, the prtocol
// should return NULL, otherwise the message (possibly modified).
- nni_msg * (*proto_recv_filter)(void *, nni_msg *);
+ nni_msg * (*proto_recv_filter)(void *, nni_msg *);
// Send filter. This may be NULL, but if it isn't, then messages
// here are filtered just after they come from the application.
- nni_msg * (*proto_send_filter)(void *, nni_msg *);
+ nni_msg * (*proto_send_filter)(void *, nni_msg *);
};
// These functions are not used by protocols, but rather by the socket
// core implementation. The lookups can be used by transports as well.
-extern nni_protocol *nni_protocol_find(uint16_t);
-extern const char *nni_protocol_name(uint16_t);
-extern uint16_t nni_protocol_number(const char *);
+extern nni_proto *nni_proto_find(uint16_t);
+extern const char *nni_proto_name(uint16_t);
+extern uint16_t nni_proto_number(const char *);
+extern uint16_t nni_proto_peer(uint16_t);
#endif // CORE_PROTOCOL_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 6aa40bfc..cad4b10a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -50,16 +50,15 @@ nni_reaper(void *arg)
ep->ep_pipe = NULL;
nni_cv_wake(&ep->ep_cv);
}
- nni_mtx_unlock(&sock->s_mx);
// Remove the pipe from the protocol. Protocols may
// keep lists of pipes for managing their topologies.
// Note that if a protocol has rejected the pipe, it
// won't have any data.
if (pipe->p_active) {
- sock->s_ops.proto_rem_pipe(sock->s_data,
- pipe->p_pdata);
+ pipe->p_proto_ops.pipe_rem(pipe->p_proto_data);
}
+ nni_mtx_unlock(&sock->s_mx);
// XXX: also publish event...
nni_pipe_destroy(pipe);
@@ -79,21 +78,53 @@ nni_reaper(void *arg)
}
+static nni_msg *
+nni_sock_nullfilter(void *arg, nni_msg *mp)
+{
+ NNI_ARG_UNUSED(arg);
+ return (mp);
+}
+
+
+static int
+nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp)
+{
+ NNI_ARG_UNUSED(arg);
+ NNI_ARG_UNUSED(num);
+ NNI_ARG_UNUSED(data);
+ NNI_ARG_UNUSED(szp);
+ return (NNG_ENOTSUP);
+}
+
+
+static int
+nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz)
+{
+ NNI_ARG_UNUSED(arg);
+ NNI_ARG_UNUSED(num);
+ NNI_ARG_UNUSED(data);
+ NNI_ARG_UNUSED(sz);
+ return (NNG_ENOTSUP);
+}
+
+
// nn_sock_open creates the underlying socket.
int
-nni_sock_open(nni_sock **sockp, uint16_t proto)
+nni_sock_open(nni_sock **sockp, uint16_t pnum)
{
nni_sock *sock;
- nni_protocol *ops;
+ nni_proto *proto;
int rv;
- if ((ops = nni_protocol_find(proto)) == NULL) {
+ if ((proto = nni_proto_find(pnum)) == NULL) {
return (NNG_ENOTSUP);
}
if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- sock->s_ops = *ops;
+
+ // We make a copy of the protocol operations.
+ sock->s_proto = *proto;
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
@@ -137,7 +168,7 @@ nni_sock_open(nni_sock **sockp, uint16_t proto)
return (rv);
}
- if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
+ if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) {
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
@@ -146,6 +177,18 @@ nni_sock_open(nni_sock **sockp, uint16_t proto)
NNI_FREE_STRUCT(sock);
return (rv);
}
+ if (sock->s_proto.proto_send_filter == NULL) {
+ sock->s_proto.proto_send_filter = nni_sock_nullfilter;
+ }
+ if (sock->s_proto.proto_recv_filter == NULL) {
+ sock->s_proto.proto_recv_filter = nni_sock_nullfilter;
+ }
+ if (sock->s_proto.proto_getopt == NULL) {
+ sock->s_proto.proto_getopt = nni_sock_nullgetopt;
+ }
+ if (sock->s_proto.proto_setopt == NULL) {
+ sock->s_proto.proto_setopt = nni_sock_nullsetopt;
+ }
nni_thr_run(&sock->s_reaper);
*sockp = sock;
return (0);
@@ -223,7 +266,7 @@ nni_sock_close(nni_sock *sock)
// At this point nothing else should be referencing us.
// The protocol needs to clean up its state.
- sock->s_ops.proto_destroy(sock->s_data);
+ sock->s_proto.proto_fini(sock->s_data);
// And we need to clean up *our* state.
nni_msgq_fini(sock->s_urq);
@@ -256,11 +299,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
besteffort = sock->s_besteffort;
nni_mtx_unlock(&sock->s_mx);
- if (sock->s_ops.proto_send_filter != NULL) {
- msg = sock->s_ops.proto_send_filter(sock->s_data, msg);
- if (msg == NULL) {
- return (0);
- }
+ msg = sock->s_proto.proto_send_filter(sock->s_data, msg);
+ if (msg == NULL) {
+ return (0);
}
if (besteffort) {
@@ -300,9 +341,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- if (sock->s_ops.proto_recv_filter != NULL) {
- msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
- }
+ msg = sock->s_proto.proto_recv_filter(sock->s_data, msg);
if (msg != NULL) {
break;
}
@@ -318,7 +357,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
uint16_t
nni_sock_proto(nni_sock *sock)
{
- return (sock->s_ops.proto_self);
+ return (sock->s_proto.proto_self);
}
@@ -386,12 +425,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
- if (sock->s_ops.proto_setopt != NULL) {
- rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size);
- if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
+ rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size);
+ if (rv != NNG_ENOTSUP) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
}
switch (opt) {
case NNG_OPT_LINGER:
@@ -429,12 +466,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
- if (sock->s_ops.proto_getopt != NULL) {
- rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep);
- if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
+ rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep);
+ if (rv != NNG_ENOTSUP) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
}
switch (opt) {
case NNG_OPT_LINGER:
diff --git a/src/core/socket.h b/src/core/socket.h
index 548b4697..8b3c3ff9 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -20,7 +20,7 @@ struct nng_socket {
nni_msgq * s_uwq; // Upper write queue
nni_msgq * s_urq; // Upper read queue
- nni_protocol s_ops;
+ nni_proto s_proto;
void * s_data; // Protocol private
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 037d5f1e..1d4e58d0 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -23,7 +23,6 @@ typedef struct nni_pair_sock nni_pair_sock;
struct nni_pair_sock {
nni_sock * sock;
nni_pair_pipe * pipe;
- nni_mtx mx;
nni_msgq * uwq;
nni_msgq * urq;
};
@@ -42,7 +41,7 @@ static void nni_pair_receiver(void *);
static void nni_pair_sender(void *);
static int
-nni_pair_create(void **pairp, nni_sock *sock)
+nni_pair_init(void **pairp, nni_sock *sock)
{
nni_pair_sock *pair;
int rv;
@@ -50,10 +49,6 @@ nni_pair_create(void **pairp, nni_sock *sock)
if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&pair->mx)) != 0) {
- NNI_FREE_STRUCT(pair);
- return (rv);
- }
pair->sock = sock;
pair->pipe = NULL;
pair->uwq = nni_sock_sendq(sock);
@@ -64,7 +59,7 @@ nni_pair_create(void **pairp, nni_sock *sock)
static void
-nni_pair_destroy(void *arg)
+nni_pair_fini(void *arg)
{
nni_pair_sock *pair = arg;
@@ -72,49 +67,61 @@ nni_pair_destroy(void *arg)
// this wold be the time to shut them all down. We don't, because
// the socket already shut us down, and we don't have any other
// threads that run.
- nni_mtx_fini(&pair->mx);
NNI_FREE_STRUCT(pair);
}
static int
-nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data)
+nni_pair_pipe_init(void **ppp, nni_pipe *pipe, void *psock)
{
- nni_pair_sock *pair = arg;
- nni_pair_pipe *pp = data;
- int rv;
+ nni_pair_pipe *pp;
+ if ((pp = NNI_ALLOC_STRUCT(pp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
pp->pipe = pipe;
pp->sigclose = 0;
- pp->pair = pair;
+ pp->pair = psock;
+ *ppp = pp;
+ return (0);
+}
+
+
+static void
+nni_pair_pipe_fini(void *arg)
+{
+ nni_pair_pipe *pp = arg;
+ NNI_FREE_STRUCT(pp);
+}
+
+static int
+nni_pair_pipe_add(void *arg)
+{
+ nni_pair_pipe *pp = arg;
+ nni_pair_sock *pair = pp->pair;
- nni_mtx_lock(&pair->mx);
if (pair->pipe != NULL) {
- nni_mtx_unlock(&pair->mx);
return (NNG_EBUSY); // Already have a peer, denied.
}
pair->pipe = pp;
- nni_mtx_unlock(&pair->mx);
return (0);
}
static void
-nni_pair_rem_pipe(void *arg, void *data)
+nni_pair_pipe_rem(void *arg)
{
- nni_pair_sock *pair = arg;
- nni_pair_pipe *pp = data;
+ nni_pair_pipe *pp = arg;
+ nni_pair_sock *pair = pp->pair;
- nni_mtx_lock(&pair->mx);
if (pair->pipe == pp) {
pair->pipe = NULL;
}
- nni_mtx_unlock(&pair->mx);
}
static void
-nni_pair_sender(void *arg)
+nni_pair_pipe_send(void *arg)
{
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
@@ -141,7 +148,7 @@ nni_pair_sender(void *arg)
static void
-nni_pair_receiver(void *arg)
+nni_pair_pipe_recv(void *arg)
{
nni_pair_pipe *pp = arg;
nni_pair_sock *pair = pp->pair;
@@ -185,17 +192,23 @@ nni_pair_getopt(void *arg, int opt, void *buf, size_t *szp)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_pair_protocol = {
+
+static nni_proto_pipe nni_pair_proto_pipe = {
+ .pipe_init = nni_pair_pipe_init,
+ .pipe_fini = nni_pair_pipe_fini,
+ .pipe_add = nni_pair_pipe_add,
+ .pipe_rem = nni_pair_pipe_rem,
+ .pipe_send = nni_pair_pipe_send,
+ .pipe_recv = nni_pair_pipe_recv,
+};
+
+nni_proto nni_pair_proto = {
.proto_self = NNG_PROTO_PAIR,
.proto_peer = NNG_PROTO_PAIR,
.proto_name = "pair",
- .proto_create = nni_pair_create,
- .proto_destroy = nni_pair_destroy,
- .proto_add_pipe = nni_pair_add_pipe,
- .proto_rem_pipe = nni_pair_rem_pipe,
- .proto_pipe_size = sizeof (nni_pair_pipe),
- .proto_pipe_send = nni_pair_sender,
- .proto_pipe_recv = nni_pair_receiver,
+ .proto_pipe = &nni_pair_proto_pipe,
+ .proto_init = nni_pair_init,
+ .proto_fini = nni_pair_fini,
.proto_setopt = nni_pair_setopt,
.proto_getopt = nni_pair_getopt,
.proto_recv_filter = NULL,
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 75f2af71..8d51ee0a 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -41,12 +41,10 @@ struct nni_rep_pipe {
int sigclose;
};
-static void nni_rep_receiver(void *);
-static void nni_rep_sender(void *);
static void nni_rep_topsender(void *);
static int
-nni_rep_create(void **repp, nni_sock *sock)
+nni_rep_init(void **repp, nni_sock *sock)
{
nni_rep_sock *rep;
int rv;
@@ -87,7 +85,7 @@ nni_rep_create(void **repp, nni_sock *sock)
static void
-nni_rep_destroy(void *arg)
+nni_rep_fini(void *arg)
{
nni_rep_sock *rep = arg;
@@ -102,41 +100,60 @@ nni_rep_destroy(void *arg)
static int
-nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap)
+nni_rep_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_rep_sock *rep = arg;
nni_rep_pipe *rp;
int rv;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((rv = nni_msgq_init(&rp->sendq, 2)) != 0) {
+ NNI_FREE_STRUCT(rp);
+ return (rv);
+ }
rp->pipe = pipe;
+ rp->rep = rsock;
rp->sigclose = 0;
+ *rpp = rp;
+ return (0);
+}
+
+
+static void
+nni_rep_pipe_fini(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+
+ nni_msgq_fini(rp->sendq);
+ NNI_FREE_STRUCT(rp);
+}
- rv = nni_msgq_init(&rp->sendq, 2);
- if (rv != 0) {
- return (rv);
- }
+
+static int
+nni_rep_pipe_add(void *arg)
+{
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
+ int rv;
nni_mtx_lock(&rep->mx);
- if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) {
- nni_msgq_fini(rp->sendq);
- nni_mtx_unlock(&rep->mx);
- return (rv);
- }
+ rv = nni_idhash_insert(rep->pipes, nni_pipe_id(rp->pipe), rp);
nni_mtx_unlock(&rep->mx);
- return (0);
+
+ return (rv);
}
static void
-nni_rep_rem_pipe(void *arg, void *data)
+nni_rep_pipe_rem(void *arg)
{
- nni_rep_sock *rep = arg;
- nni_rep_pipe *rp = data;
+ nni_rep_pipe *rp = arg;
+ nni_rep_sock *rep = rp->rep;
nni_mtx_lock(&rep->mx);
nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe));
nni_mtx_unlock(&rep->mx);
- nni_msgq_fini(rp->sendq);
}
@@ -197,7 +214,7 @@ nni_rep_topsender(void *arg)
static void
-nni_rep_sender(void *arg)
+nni_rep_pipe_send(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -227,7 +244,7 @@ nni_rep_sender(void *arg)
static void
-nni_rep_receiver(void *arg)
+nni_rep_pipe_recv(void *arg)
{
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
@@ -426,17 +443,22 @@ nni_rep_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_rep_protocol = {
+static nni_proto_pipe nni_rep_proto_pipe = {
+ .pipe_init = nni_rep_pipe_init,
+ .pipe_fini = nni_rep_pipe_fini,
+ .pipe_add = nni_rep_pipe_add,
+ .pipe_rem = nni_rep_pipe_rem,
+ .pipe_send = nni_rep_pipe_send,
+ .pipe_recv = nni_rep_pipe_recv,
+};
+
+nni_proto nni_rep_protocol = {
.proto_self = NNG_PROTO_REP,
.proto_peer = NNG_PROTO_REQ,
.proto_name = "rep",
- .proto_create = nni_rep_create,
- .proto_destroy = nni_rep_destroy,
- .proto_add_pipe = nni_rep_add_pipe,
- .proto_rem_pipe = nni_rep_rem_pipe,
- .proto_pipe_size = sizeof (nni_rep_pipe),
- .proto_pipe_send = nni_rep_sender,
- .proto_pipe_recv = nni_rep_receiver,
+ .proto_pipe = &nni_rep_proto_pipe,
+ .proto_init = nni_rep_init,
+ .proto_fini = nni_rep_fini,
.proto_setopt = nni_rep_setopt,
.proto_getopt = nni_rep_getopt,
.proto_recv_filter = nni_rep_recvfilter,
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index ba7d4f6b..c26d66e8 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -31,7 +31,6 @@ struct nni_req_sock {
nni_thr resender;
int raw;
int closing;
- nni_list pipes;
nni_msg * reqmsg;
uint32_t nextid; // next id
uint8_t reqid[4]; // outstanding request ID (big endian)
@@ -45,12 +44,10 @@ struct nni_req_pipe {
nni_list_node node;
};
-static void nni_req_receiver(void *);
-static void nni_req_sender(void *);
static void nni_req_resender(void *);
static int
-nni_req_create(void **reqp, nni_sock *sock)
+nni_req_init(void **reqp, nni_sock *sock)
{
nni_req_sock *req;
int rv;
@@ -74,7 +71,6 @@ nni_req_create(void **reqp, nni_sock *sock)
req->reqmsg = NULL;
req->raw = 0;
req->resend = NNI_TIME_ZERO;
- NNI_LIST_INIT(&req->pipes, nni_req_pipe, node);
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
@@ -93,7 +89,7 @@ nni_req_create(void **reqp, nni_sock *sock)
static void
-nni_req_destroy(void *arg)
+nni_req_fini(void *arg)
{
nni_req_sock *req = arg;
@@ -112,37 +108,49 @@ nni_req_destroy(void *arg)
static int
-nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data)
+nni_req_pipe_init(void **rpp, nni_pipe *pipe, void *rsock)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp = data;
- int rv;
+ nni_req_pipe *rp;
+ if ((rp = NNI_ALLOC_STRUCT(rp)) == NULL) {
+ return (NNG_ENOMEM);
+ }
rp->pipe = pipe;
rp->sigclose = 0;
- rp->req = req;
-
- nni_mtx_lock(&req->mx);
- nni_list_append(&req->pipes, rp);
- nni_mtx_unlock(&req->mx);
+ rp->req = rsock;
return (0);
}
static void
-nni_req_rem_pipe(void *arg, void *data)
+nni_req_pipe_fini(void *arg)
{
- nni_req_sock *req = arg;
- nni_req_pipe *rp = data;
+ nni_req_pipe *rp = arg;
- nni_mtx_lock(&req->mx);
- nni_list_remove(&req->pipes, rp);
- nni_mtx_unlock(&req->mx);
+ NNI_FREE_STRUCT(rp);
+}
+
+
+static int
+nni_req_pipe_add(void *arg)
+{
+ // We have nothing to do, since we don't need to maintain a global
+ // list of related pipes.
+ NNI_ARG_UNUSED(arg);
+ return (0);
+}
+
+
+static void
+nni_req_pipe_rem(void *arg)
+{
+ // As with add, nothing to do here.
+ NNI_ARG_UNUSED(arg);
}
static void
-nni_req_sender(void *arg)
+nni_req_pipe_send(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
@@ -169,7 +177,7 @@ nni_req_sender(void *arg)
static void
-nni_req_receiver(void *arg)
+nni_req_pipe_recv(void *arg)
{
nni_req_pipe *rp = arg;
nni_req_sock *req = rp->req;
@@ -397,19 +405,24 @@ nni_req_recvfilter(void *arg, nni_msg *msg)
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-struct nni_protocol nni_req_protocol = {
+static nni_proto_pipe nni_req_proto_pipe = {
+ .pipe_init = nni_req_pipe_init,
+ .pipe_fini = nni_req_pipe_fini,
+ .pipe_add = nni_req_pipe_add,
+ .pipe_rem = nni_req_pipe_rem,
+ .pipe_send = nni_req_pipe_send,
+ .pipe_recv = nni_req_pipe_recv,
+};
+
+nni_proto nni_req_protocol = {
.proto_self = NNG_PROTO_REQ,
.proto_peer = NNG_PROTO_REP,
.proto_name = "req",
- .proto_create = nni_req_create,
- .proto_destroy = nni_req_destroy,
- .proto_add_pipe = nni_req_add_pipe,
- .proto_rem_pipe = nni_req_rem_pipe,
+ .proto_pipe = &nni_req_proto_pipe,
+ .proto_init = nni_req_init,
+ .proto_fini = nni_req_fini,
.proto_setopt = nni_req_setopt,
.proto_getopt = nni_req_getopt,
- .proto_pipe_size = sizeof (nni_req_pipe),
- .proto_pipe_send = nni_req_sender,
- .proto_pipe_recv = nni_req_receiver,
.proto_recv_filter = nni_req_recvfilter,
.proto_send_filter = nni_req_sendfilter,
};