summaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-29 23:49:05 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-29 23:49:05 -0800
commit0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6 (patch)
treeb2ea7f56ae0dc6d32219e695725f55d78b9e0b34 /src/core
parent5d90b485fdb39cac7d1aac2ab8958ecd585ac69b (diff)
downloadnng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.tar.gz
nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.tar.bz2
nng-0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6.zip
Factor out repeated protocol code into common.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c6
-rw-r--r--src/core/pipe.c129
-rw-r--r--src/core/pipe.h11
-rw-r--r--src/core/protocol.h17
-rw-r--r--src/core/socket.c51
-rw-r--r--src/core/socket.h1
6 files changed, 153 insertions, 62 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index bb3ba268..901831ef 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -130,12 +130,11 @@ nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
static int
nni_dial_once(nni_endpt *ep)
{
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
- ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ ((rv = nni_pipe_start(pipe)) == 0)) {
return (0);
}
@@ -281,7 +280,6 @@ static void
nni_listener(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
@@ -332,7 +330,7 @@ nni_listener(void *arg)
pipe = NULL;
if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
- ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ ((rv = nni_pipe_start(pipe)) == 0)) {
continue;
}
if (rv == NNG_ECLOSED) {
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 15ae393b..0a3f1d33 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -71,9 +71,19 @@ nni_pipe_peer(nni_pipe *p)
void
nni_pipe_destroy(nni_pipe *p)
{
+ if (p->p_send_thr != NULL) {
+ nni_thread_reap(p->p_send_thr);
+ }
+ if (p->p_recv_thr != NULL) {
+ nni_thread_reap(p->p_recv_thr);
+ }
if (p->p_trandata != NULL) {
p->p_ops.p_destroy(p->p_trandata);
}
+ nni_cond_fini(&p->p_cv);
+ if (p->p_pdata != NULL) {
+ nni_free(p->p_pdata, p->p_psize);
+ }
nni_free(p, sizeof (*p));
}
@@ -82,15 +92,28 @@ int
nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
{
nni_pipe *p;
+ nni_socket *sock = ep->ep_sock;
+ int rv;
if ((p = nni_alloc(sizeof (*p))) == NULL) {
return (NNG_ENOMEM);
}
p->p_trandata = NULL;
- p->p_protdata = NULL;
+ if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) {
+ nni_free(p, sizeof (*p));
+ return (rv);
+ }
+ p->p_psize = sock->s_ops.proto_pipe_size;
+ if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) {
+ nni_cond_fini(&p->p_cv);
+ nni_free(p, sizeof (*p));
+ return (NNG_ENOMEM);
+ }
+ p->p_sock = sock;
p->p_ops = *ep->ep_ops.ep_pipe_ops;
p->p_ep = ep;
- p->p_sock = ep->ep_sock;
+ p->p_active = 0;
+ p->p_abort = 0;
if (ep->ep_dialer != NULL) {
ep->ep_pipe = p;
}
@@ -109,3 +132,105 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
}
return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp));
}
+
+
+static void
+nni_pipe_sender(void *arg)
+{
+ nni_pipe *p = arg;
+
+ nni_mutex_enter(&p->p_sock->s_mx);
+ while ((!p->p_active) && (!p->p_abort)) {
+ nni_cond_wait(&p->p_cv);
+ }
+ if (p->p_abort) {
+ nni_mutex_exit(&p->p_sock->s_mx);
+ return;
+ }
+ nni_mutex_exit(&p->p_sock->s_mx);
+ if (p->p_sock->s_ops.proto_pipe_send != NULL) {
+ p->p_sock->s_ops.proto_pipe_send(p->p_pdata);
+ }
+}
+
+
+static void
+nni_pipe_receiver(void *arg)
+{
+ nni_pipe *p = arg;
+
+ nni_mutex_enter(&p->p_sock->s_mx);
+ while ((!p->p_active) && (!p->p_abort)) {
+ nni_cond_wait(&p->p_cv);
+ }
+ if (p->p_abort) {
+ nni_mutex_exit(&p->p_sock->s_mx);
+ return;
+ }
+ nni_mutex_exit(&p->p_sock->s_mx);
+ if (p->p_sock->s_ops.proto_pipe_recv != NULL) {
+ p->p_sock->s_ops.proto_pipe_recv(p->p_pdata);
+ }
+}
+
+
+int
+nni_pipe_start(nni_pipe *pipe)
+{
+ int rv;
+ int collide;
+ nni_socket *sock = pipe->p_sock;
+
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ do {
+ // We generate a new pipe ID, but we make sure it does not
+ // collide with any we already have. This can only normally
+ // happen if we wrap -- i.e. we've had 4 billion or so pipes.
+ // XXX: consider making this a hash table!!
+ nni_pipe *check;
+ pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF;
+ collide = 0;
+ NNI_LIST_FOREACH (&sock->s_pipes, check) {
+ if (check->p_id == pipe->p_id) {
+ collide = 1;
+ break;
+ }
+ }
+ } while (collide);
+
+ rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+ rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_receiver, pipe);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata);
+ if (rv != 0) {
+ goto fail;
+ }
+ nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_active = 1;
+
+ // XXX: Publish event
+ nni_cond_broadcast(&pipe->p_cv);
+
+ nni_mutex_exit(&sock->s_mx);
+ return (0);
+
+fail:
+ pipe->p_abort = 1;
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
+ nni_cond_broadcast(&sock->s_cv);
+ nni_cond_broadcast(&pipe->p_cv);
+ nni_mutex_exit(&sock->s_mx);
+ return (rv);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 4594f65b..2b4feb7d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -20,12 +20,20 @@ struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
void * p_trandata;
- void * p_protdata;
+ void * p_pdata; // protocol specific data
+ size_t p_psize; // size of protocol data
nni_list_node p_node;
nni_socket * p_sock;
nni_endpt * p_ep;
int p_reap;
int p_active;
+ int p_abort;
+ nni_mutex p_mx;
+ nni_cond p_cv;
+ void (*p_send)(void *);
+ void (*p_recv)(void *);
+ nni_thread * p_send_thr;
+ nni_thread * p_recv_thr;
};
// Pipe operations that protocols use.
@@ -40,6 +48,7 @@ extern int nni_pipe_create(nni_pipe **, nni_endpt *);
extern void nni_pipe_destroy(nni_pipe *);
+extern int nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
#endif // CORE_PIPE_H
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 63683a6b..a8e93ea7 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -20,9 +20,10 @@
// As a consequence, most of the concurrency in nng exists in the protocol
// implementations.
struct nni_protocol {
- uint16_t proto_self; // our 16-bit protocol ID
- uint16_t proto_peer; // who we peer with (protocol ID)
- const char * proto_name; // string version of our name
+ uint16_t proto_self; // our 16-bit protocol ID
+ uint16_t proto_peer; // who we peer with (ID)
+ const char * proto_name; // string version of our name
+ size_t proto_pipe_size; // pipe private data size
//Create protocol instance, which will be stored on the socket.
int (*proto_create)(void **, nni_socket *);
@@ -32,11 +33,17 @@ struct nni_protocol {
// Add and remove pipes. These are called as connections are
// created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *, void **);
+ int (*proto_add_pipe)(void *, nni_pipe *, void *);
void (*proto_rem_pipe)(void *, void *);
+ // Thread functions for processing send & receive sides of
+ // protocol pipes. Send may be NULL, but recv should should not.
+ // (Recv needs to detect a closed pipe, if nothing else.)
+ void (*proto_pipe_send)(void *);
+ void (*proto_pipe_recv)(void *);
+
// Option manipulation. These may be NULL.
- int (*proto_setopt)(void *, int, const void *, size_t);
+ int (*proto_setopt)(void *, int, const void *, size_t);
int (*proto_getopt)(void *, int, void *, size_t *);
// Receive filter. This may be NULL, but if it isn't, then
diff --git a/src/core/socket.c b/src/core/socket.c
index 8c4cd339..6fa6712d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -53,9 +53,9 @@ nni_reaper(void *arg)
// keep lists of pipes for managing their topologies.
// Note that if a protocol has rejected the pipe, it
// won't have any data.
- if (pipe->p_protdata != NULL) {
+ if (pipe->p_active) {
sock->s_ops.proto_rem_pipe(sock->s_data,
- pipe->p_protdata);
+ pipe->p_pdata);
}
// If pipe was a connected (dialer) pipe,
@@ -217,7 +217,6 @@ nni_socket_close(nni_socket *sock)
// Go through and schedule close on all pipes.
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
nni_list_remove(&sock->s_pipes, pipe);
- pipe->p_active = 0;
pipe->p_reap = 1;
nni_list_append(&sock->s_reaps, pipe);
}
@@ -341,52 +340,6 @@ nni_socket_proto(nni_socket *sock)
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
-{
- int rv;
- int collide;
-
- nni_mutex_enter(&sock->s_mx);
- if (sock->s_closing) {
- nni_mutex_exit(&sock->s_mx);
- return (NNG_ECLOSED);
- }
-
- do {
- // We generate a new pipe ID, but we make sure it does not
- // collide with any we already have. This can only normally
- // happen if we wrap -- i.e. we've had 4 billion or so pipes.
- // XXX: consider making this a hash table!!
- nni_pipe *check;
- pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF;
- collide = 0;
- NNI_LIST_FOREACH (&sock->s_pipes, check) {
- if (check->p_id == pipe->p_id) {
- collide = 1;
- break;
- }
- }
- } while (collide);
-
- pipe->p_id = nni_plat_nextid();
- rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata);
- if (rv != 0) {
- pipe->p_reap = 1;
- nni_list_append(&sock->s_reaps, pipe);
- nni_cond_broadcast(&sock->s_cv);
- nni_mutex_exit(&sock->s_mx);
- return (rv);
- }
- nni_list_append(&sock->s_pipes, pipe);
- pipe->p_active = 1;
-
- // XXX: Publish event
- nni_mutex_exit(&sock->s_mx);
- return (0);
-}
-
-
-int
nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
{
nni_endpt *ep;
diff --git a/src/core/socket.h b/src/core/socket.h
index 7b49e85f..ff3ad865 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -47,7 +47,6 @@ struct nng_socket {
extern int nni_socket_create(nni_socket **, uint16_t);
extern int nni_socket_close(nni_socket *);
-extern int nni_socket_add_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);