aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-29 23:49:05 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-29 23:49:05 -0800
commit0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6 (patch)
treeb2ea7f56ae0dc6d32219e695725f55d78b9e0b34 /src
parent5d90b485fdb39cac7d1aac2ab8958ecd585ac69b (diff)
downloadnng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.tar.gz
nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.tar.bz2
nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.zip
Factor out repeated protocol code into common.
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c6
-rw-r--r--src/core/pipe.c129
-rw-r--r--src/core/pipe.h11
-rw-r--r--src/core/protocol.h17
-rw-r--r--src/core/socket.c51
-rw-r--r--src/core/socket.h1
-rw-r--r--src/protocol/pair/pair.c53
-rw-r--r--src/protocol/reqrep/req.c54
8 files changed, 163 insertions, 159 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index bb3ba268..901831ef 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -130,12 +130,11 @@ nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
static int
nni_dial_once(nni_endpt *ep)
{
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
- ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ ((rv = nni_pipe_start(pipe)) == 0)) {
return (0);
}
@@ -281,7 +280,6 @@ static void
nni_listener(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
@@ -332,7 +330,7 @@ nni_listener(void *arg)
pipe = NULL;
if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
- ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ ((rv = nni_pipe_start(pipe)) == 0)) {
continue;
}
if (rv == NNG_ECLOSED) {
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 15ae393b..0a3f1d33 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -71,9 +71,19 @@ nni_pipe_peer(nni_pipe *p)
void
nni_pipe_destroy(nni_pipe *p)
{
+ if (p->p_send_thr != NULL) {
+ nni_thread_reap(p->p_send_thr);
+ }
+ if (p->p_recv_thr != NULL) {
+ nni_thread_reap(p->p_recv_thr);
+ }
if (p->p_trandata != NULL) {
p->p_ops.p_destroy(p->p_trandata);
}
+ nni_cond_fini(&p->p_cv);
+ if (p->p_pdata != NULL) {
+ nni_free(p->p_pdata, p->p_psize);
+ }
nni_free(p, sizeof (*p));
}
@@ -82,15 +92,28 @@ int
nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
{
nni_pipe *p;
+ nni_socket *sock = ep->ep_sock;
+ int rv;
if ((p = nni_alloc(sizeof (*p))) == NULL) {
return (NNG_ENOMEM);
}
p->p_trandata = NULL;
- p->p_protdata = NULL;
+ if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) {
+ nni_free(p, sizeof (*p));
+ return (rv);
+ }
+ p->p_psize = sock->s_ops.proto_pipe_size;
+ if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) {
+ nni_cond_fini(&p->p_cv);
+ nni_free(p, sizeof (*p));
+ return (NNG_ENOMEM);
+ }
+ p->p_sock = sock;
p->p_ops = *ep->ep_ops.ep_pipe_ops;
p->p_ep = ep;
- p->p_sock = ep->ep_sock;
+ p->p_active = 0;
+ p->p_abort = 0;
if (ep->ep_dialer != NULL) {
ep->ep_pipe = p;
}
@@ -109,3 +132,105 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
}
return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp));
}
+
+
+static void
+nni_pipe_sender(void *arg)
+{
+ nni_pipe *p = arg;
+
+ nni_mutex_enter(&p->p_sock->s_mx);
+ while ((!p->p_active) && (!p->p_abort)) {
+ nni_cond_wait(&p->p_cv);
+ }
+ if (p->p_abort) {
+ nni_mutex_exit(&p->p_sock->s_mx);
+ return;
+ }
+ nni_mutex_exit(&p->p_sock->s_mx);
+ if (p->p_sock->s_ops.proto_pipe_send != NULL) {
+ p->p_sock->s_ops.proto_pipe_send(p->p_pdata);
+ }
+}
+
+
+static void
+nni_pipe_receiver(void *arg)
+{
+ nni_pipe *p = arg;
+
+ nni_mutex_enter(&p->p_sock->s_mx);
+ while ((!p->p_active) && (!p->p_abort)) {
+ nni_cond_wait(&p->p_cv);
+ }
+ if (p->p_abort) {
+ nni_mutex_exit(&p->p_sock->s_mx);
+ return;
+ }
+ nni_mutex_exit(&p->p_sock->s_mx);
+ if (p->p_sock->s_ops.proto_pipe_recv != NULL) {
+ p->p_sock->s_ops.proto_pipe_recv(p->p_pdata);
+ }
+}
+
+
+int
+nni_pipe_start(nni_pipe *pipe)
+{
+ int rv;
+ int collide;
+ nni_socket *sock = pipe->p_sock;
+
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ do {
+ // We generate a new pipe ID, but we make sure it does not
+ // collide with any we already have. This can only normally
+ // happen if we wrap -- i.e. we've had 4 billion or so pipes.
+ // XXX: consider making this a hash table!!
+ nni_pipe *check;
+ pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF;
+ collide = 0;
+ NNI_LIST_FOREACH (&sock->s_pipes, check) {
+ if (check->p_id == pipe->p_id) {
+ collide = 1;
+ break;
+ }
+ }
+ } while (collide);
+
+ rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+ rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_receiver, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata);
+ if (rv != 0) {
+ goto fail;
+ }
+ nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_active = 1;
+
+ // XXX: Publish event
+ nni_cond_broadcast(&pipe->p_cv);
+
+ nni_mutex_exit(&sock->s_mx);
+ return (0);
+
+fail:
+ pipe->p_abort = 1;
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
+ nni_cond_broadcast(&sock->s_cv);
+ nni_cond_broadcast(&pipe->p_cv);
+ nni_mutex_exit(&sock->s_mx);
+ return (rv);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 4594f65b..2b4feb7d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -20,12 +20,20 @@ struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
void * p_trandata;
- void * p_protdata;
+ void * p_pdata; // protocol specific data
+ size_t p_psize; // size of protocol data
nni_list_node p_node;
nni_socket * p_sock;
nni_endpt * p_ep;
int p_reap;
int p_active;
+ int p_abort;
+ nni_mutex p_mx;
+ nni_cond p_cv;
+ void (*p_send)(void *);
+ void (*p_recv)(void *);
+ nni_thread * p_send_thr;
+ nni_thread * p_recv_thr;
};
// Pipe operations that protocols use.
@@ -40,6 +48,7 @@ extern int nni_pipe_create(nni_pipe **, nni_endpt *);
extern void nni_pipe_destroy(nni_pipe *);
+extern int nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
#endif // CORE_PIPE_H
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 63683a6b..a8e93ea7 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -20,9 +20,10 @@
// As a consequence, most of the concurrency in nng exists in the protocol
// implementations.
struct nni_protocol {
- uint16_t proto_self; // our 16-bit protocol ID
- uint16_t proto_peer; // who we peer with (protocol ID)
- const char * proto_name; // string version of our name
+ uint16_t proto_self; // our 16-bit protocol ID
+ uint16_t proto_peer; // who we peer with (ID)
+ const char * proto_name; // string version of our name
+ size_t proto_pipe_size; // pipe private data size
//Create protocol instance, which will be stored on the socket.
int (*proto_create)(void **, nni_socket *);
@@ -32,11 +33,17 @@ struct nni_protocol {
// Add and remove pipes. These are called as connections are
// created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *, void **);
+ int (*proto_add_pipe)(void *, nni_pipe *, void *);
void (*proto_rem_pipe)(void *, void *);
+ // Thread functions for processing send & receive sides of
+ // protocol pipes. Send may be NULL, but recv should should not.
+ // (Recv needs to detect a closed pipe, if nothing else.)
+ void (*proto_pipe_send)(void *);
+ void (*proto_pipe_recv)(void *);
+
// Option manipulation. These may be NULL.
- int (*proto_setopt)(void *, int, const void *, size_t);
+ int (*proto_setopt)(void *, int, const void *, size_t);
int (*proto_getopt)(void *, int, void *, size_t *);
// Receive filter. This may be NULL, but if it isn't, then
diff --git a/src/core/socket.c b/src/core/socket.c
index 8c4cd339..6fa6712d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,9 +53,9 @@ nni_reaper(void *arg)
// keep lists of pipes for managing their topologies.
// Note that if a protocol has rejected the pipe, it
// won't have any data.
- if (pipe->p_protdata != NULL) {
+ if (pipe->p_active) {
sock->s_ops.proto_rem_pipe(sock->s_data,
- pipe->p_protdata);
+ pipe->p_pdata);
}
// If pipe was a connected (dialer) pipe,
@@ -217,7 +217,6 @@ nni_socket_close(nni_socket *sock)
// Go through and schedule close on all pipes.
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
nni_list_remove(&sock->s_pipes, pipe);
- pipe->p_active = 0;
pipe->p_reap = 1;
nni_list_append(&sock->s_reaps, pipe);
}
@@ -341,52 +340,6 @@ nni_socket_proto(nni_socket *sock)
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
-{
- int rv;
- int collide;
-
- nni_mutex_enter(&sock->s_mx);
- if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
- return (NNG_ECLOSED);
- }
-
- do {
- // We generate a new pipe ID, but we make sure it does not
- // collide with any we already have. This can only normally
- // happen if we wrap -- i.e. we've had 4 billion or so pipes.
- // XXX: consider making this a hash table!!
- nni_pipe *check;
- pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF;
- collide = 0;
- NNI_LIST_FOREACH (&sock->s_pipes, check) {
- if (check->p_id == pipe->p_id) {
- collide = 1;
- break;
- }
- }
- } while (collide);
-
- pipe->p_id = nni_plat_nextid();
- rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata);
- if (rv != 0) {
- pipe->p_reap = 1;
- nni_list_append(&sock->s_reaps, pipe);
- nni_cond_broadcast(&sock->s_cv);
- nni_mutex_exit(&sock->s_mx);
- return (rv);
- }
- nni_list_append(&sock->s_pipes, pipe);
- pipe->p_active = 1;
-
- // XXX: Publish event
- nni_mutex_exit(&sock->s_mx);
- return (0);
-}
-
-
-int
nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
{
nni_endpt *ep;
diff --git a/src/core/socket.h b/src/core/socket.h
index 7b49e85f..ff3ad865 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -47,7 +47,6 @@ struct nng_socket {
extern int nni_socket_create(nni_socket **, uint16_t);
extern int nni_socket_close(nni_socket *);
-extern int nni_socket_add_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 306615b7..3f57c12b 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -35,9 +35,6 @@ struct nni_pair_sock {
struct nni_pair_pipe {
nni_pipe * pipe;
nni_pair_sock * pair;
- int good;
- nni_thread * sthr;
- nni_thread * rthr;
int sigclose;
};
@@ -81,21 +78,14 @@ nni_pair_destroy(void *arg)
static int
-nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap)
+nni_pair_add_pipe(void *arg, nni_pipe *pipe, void *data)
{
nni_pair_sock *pair = arg;
- nni_pair_pipe *pp;
+ nni_pair_pipe *pp = data;
int rv;
- if ((pp = nni_alloc(sizeof (*pp))) == NULL) {
- return (NNG_ENOMEM);
- }
-
pp->pipe = pipe;
- pp->good = 0;
pp->sigclose = 0;
- pp->sthr = NULL;
- pp->rthr = NULL;
pp->pair = pair;
nni_mutex_enter(&pair->mx);
@@ -103,27 +93,12 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap)
rv = NNG_EBUSY; // Already have a peer, denied.
goto fail;
}
- if ((rv = nni_thread_create(&pp->rthr, nni_pair_receiver, pp)) != 0) {
- goto fail;
- }
- if ((rv = nni_thread_create(&pp->sthr, nni_pair_sender, pp)) != 0) {
- goto fail;
- }
- pp->good = 1;
pair->pipe = pp;
- *datap = pp;
nni_mutex_exit(&pair->mx);
return (0);
fail:
nni_mutex_exit(&pair->mx);
- if (pp->rthr != NULL) {
- nni_thread_reap(pp->rthr);
- }
- if (pp->sthr != NULL) {
- nni_thread_reap(pp->sthr);
- }
- nni_free(pp, sizeof (*pp));
return (rv);
}
@@ -141,13 +116,6 @@ nni_pair_rem_pipe(void *arg, void *data)
}
pair->pipe = NULL;
nni_mutex_exit(&pair->mx);
-
- if (pp->sthr != NULL) {
- (void) nni_thread_reap(pp->sthr);
- }
- if (pp->rthr != NULL) {
- (void) nni_thread_reap(pp->rthr);
- }
}
@@ -162,14 +130,6 @@ nni_pair_sender(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&pair->mx);
- if (!pp->good) {
- nni_mutex_exit(&pair->mx);
- return;
- }
- nni_mutex_exit(&pair->mx);
-
-
for (;;) {
rv = nni_msgqueue_get_sig(uwq, &msg, &pp->sigclose);
if (rv != 0) {
@@ -197,12 +157,6 @@ nni_pair_receiver(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&pair->mx);
- if (!pp->good) {
- nni_mutex_exit(&pair->mx);
- return;
- }
- nni_mutex_exit(&pair->mx);
for (;;) {
rv = nni_pipe_recv(pipe, &msg);
if (rv != 0) {
@@ -245,6 +199,9 @@ struct nni_protocol nni_pair_protocol = {
.proto_destroy = nni_pair_destroy,
.proto_add_pipe = nni_pair_add_pipe,
.proto_rem_pipe = nni_pair_rem_pipe,
+ .proto_pipe_size = sizeof (nni_pair_pipe),
+ .proto_pipe_send = nni_pair_sender,
+ .proto_pipe_recv = nni_pair_receiver,
.proto_setopt = nni_pair_setopt,
.proto_getopt = nni_pair_getopt,
.proto_recv_filter = NULL,
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 54ac7b11..366e0c57 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -40,9 +40,6 @@ struct nni_req_sock {
struct nni_req_pipe {
nni_pipe * pipe;
nni_req_sock * req;
- int good;
- nni_thread * sthr;
- nni_thread * rthr;
int sigclose;
nni_list_node node;
};
@@ -115,44 +112,20 @@ nni_req_destroy(void *arg)
static int
-nni_req_add_pipe(void *arg, nni_pipe *pipe, void **datap)
+nni_req_add_pipe(void *arg, nni_pipe *pipe, void *data)
{
nni_req_sock *req = arg;
- nni_req_pipe *rp;
+ nni_req_pipe *rp = data;
int rv;
- if ((rp = nni_alloc(sizeof (*rp))) == NULL) {
- return (NNG_ENOMEM);
- }
rp->pipe = pipe;
- rp->good = 0;
rp->sigclose = 0;
- rp->sthr = NULL;
- rp->rthr = NULL;
rp->req = req;
nni_mutex_enter(&req->mx);
- if ((rv = nni_thread_create(&rp->rthr, nni_req_receiver, rp)) != 0) {
- goto fail;
- }
- if ((rv = nni_thread_create(&rp->sthr, nni_req_sender, rp)) != 0) {
- goto fail;
- }
- rp->good = 1;
nni_list_append(&req->pipes, rp);
- *datap = rp;
nni_mutex_exit(&req->mx);
return (0);
-fail:
- nni_mutex_exit(&req->mx);
- if (rp->rthr) {
- nni_thread_reap(rp->rthr);
- }
- if (rp->sthr) {
- nni_thread_reap(rp->sthr);
- }
- nni_free(rp, sizeof (*rp));
- return (rv);
}
@@ -165,13 +138,6 @@ nni_req_rem_pipe(void *arg, void *data)
nni_mutex_enter(&req->mx);
nni_list_remove(&req->pipes, rp);
nni_mutex_exit(&req->mx);
-
- if (rp->sthr != NULL) {
- (void) nni_thread_reap(rp->sthr);
- }
- if (rp->rthr != NULL) {
- (void) nni_thread_reap(rp->rthr);
- }
}
@@ -186,13 +152,6 @@ nni_req_sender(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&req->mx);
- if (!rp->good) {
- nni_mutex_exit(&req->mx);
- return;
- }
- nni_mutex_exit(&req->mx);
-
for (;;) {
rv = nni_msgqueue_get_sig(uwq, &msg, &rp->sigclose);
if (rv != 0) {
@@ -220,12 +179,6 @@ nni_req_receiver(void *arg)
nni_msg *msg;
int rv;
- nni_mutex_enter(&req->mx);
- if (!rp->good) {
- nni_mutex_exit(&req->mx);
- return;
- }
- nni_mutex_exit(&req->mx);
for (;;) {
size_t len;
char *body;
@@ -454,6 +407,9 @@ struct nni_protocol nni_req_protocol = {
.proto_rem_pipe = nni_req_rem_pipe,
.proto_setopt = nni_req_setopt,
.proto_getopt = nni_req_getopt,
+ .proto_pipe_size = sizeof (nni_req_pipe),
+ .proto_pipe_send = nni_req_sender,
+ .proto_pipe_recv = nni_req_receiver,
.proto_recv_filter = nni_req_recvfilter,
.proto_send_filter = nni_req_sendfilter,
};