From 0f5ed9c09bcbe0c0dabdf48ea14e85d418b3d8b6 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 29 Dec 2016 23:49:05 -0800 Subject: Factor out repeated protocol code into common. --- src/core/endpt.c | 6 +-- src/core/pipe.c | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++- src/core/pipe.h | 11 ++++- src/core/protocol.h | 17 +++++-- src/core/socket.c | 51 +-------------------- src/core/socket.h | 1 - 6 files changed, 153 insertions(+), 62 deletions(-) (limited to 'src/core') diff --git a/src/core/endpt.c b/src/core/endpt.c index bb3ba268..901831ef 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -130,12 +130,11 @@ nni_endpt_connect(nni_endpt *ep, nni_pipe **pp) static int nni_dial_once(nni_endpt *ep) { - nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; if (((rv = nni_endpt_connect(ep, &pipe)) == 0) && - ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) { + ((rv = nni_pipe_start(pipe)) == 0)) { return (0); } @@ -281,7 +280,6 @@ static void nni_listener(void *arg) { nni_endpt *ep = arg; - nni_socket *sock = ep->ep_sock; nni_pipe *pipe; int rv; @@ -332,7 +330,7 @@ nni_listener(void *arg) pipe = NULL; if (((rv = nni_endpt_accept(ep, &pipe)) == 0) && - ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) { + ((rv = nni_pipe_start(pipe)) == 0)) { continue; } if (rv == NNG_ECLOSED) { diff --git a/src/core/pipe.c b/src/core/pipe.c index 15ae393b..0a3f1d33 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -71,9 +71,19 @@ nni_pipe_peer(nni_pipe *p) void nni_pipe_destroy(nni_pipe *p) { + if (p->p_send_thr != NULL) { + nni_thread_reap(p->p_send_thr); + } + if (p->p_recv_thr != NULL) { + nni_thread_reap(p->p_recv_thr); + } if (p->p_trandata != NULL) { p->p_ops.p_destroy(p->p_trandata); } + nni_cond_fini(&p->p_cv); + if (p->p_pdata != NULL) { + nni_free(p->p_pdata, p->p_psize); + } nni_free(p, sizeof (*p)); } @@ -82,15 +92,28 @@ int nni_pipe_create(nni_pipe **pp, nni_endpt *ep) { nni_pipe *p; + nni_socket *sock = ep->ep_sock; + int rv; if ((p = nni_alloc(sizeof (*p))) == NULL) { return (NNG_ENOMEM); } p->p_trandata = NULL; - p->p_protdata = NULL; + if ((rv = nni_cond_init(&p->p_cv, &sock->s_mx)) != 0) { + nni_free(p, sizeof (*p)); + return (rv); + } + p->p_psize = sock->s_ops.proto_pipe_size; + if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + nni_cond_fini(&p->p_cv); + nni_free(p, sizeof (*p)); + return (NNG_ENOMEM); + } + p->p_sock = sock; p->p_ops = *ep->ep_ops.ep_pipe_ops; p->p_ep = ep; - p->p_sock = ep->ep_sock; + p->p_active = 0; + p->p_abort = 0; if (ep->ep_dialer != NULL) { ep->ep_pipe = p; } @@ -109,3 +132,105 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) } return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp)); } + + +static void +nni_pipe_sender(void *arg) +{ + nni_pipe *p = arg; + + nni_mutex_enter(&p->p_sock->s_mx); + while ((!p->p_active) && (!p->p_abort)) { + nni_cond_wait(&p->p_cv); + } + if (p->p_abort) { + nni_mutex_exit(&p->p_sock->s_mx); + return; + } + nni_mutex_exit(&p->p_sock->s_mx); + if (p->p_sock->s_ops.proto_pipe_send != NULL) { + p->p_sock->s_ops.proto_pipe_send(p->p_pdata); + } +} + + +static void +nni_pipe_receiver(void *arg) +{ + nni_pipe *p = arg; + + nni_mutex_enter(&p->p_sock->s_mx); + while ((!p->p_active) && (!p->p_abort)) { + nni_cond_wait(&p->p_cv); + } + if (p->p_abort) { + nni_mutex_exit(&p->p_sock->s_mx); + return; + } + nni_mutex_exit(&p->p_sock->s_mx); + if (p->p_sock->s_ops.proto_pipe_recv != NULL) { + p->p_sock->s_ops.proto_pipe_recv(p->p_pdata); + } +} + + +int +nni_pipe_start(nni_pipe *pipe) +{ + int rv; + int collide; + nni_socket *sock = pipe->p_sock; + + nni_mutex_enter(&sock->s_mx); + if (sock->s_closing) { + nni_mutex_exit(&sock->s_mx); + return (NNG_ECLOSED); + } + + do { + // We generate a new pipe ID, but we make sure it does not + // collide with any we already have. This can only normally + // happen if we wrap -- i.e. we've had 4 billion or so pipes. + // XXX: consider making this a hash table!! + nni_pipe *check; + pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF; + collide = 0; + NNI_LIST_FOREACH (&sock->s_pipes, check) { + if (check->p_id == pipe->p_id) { + collide = 1; + break; + } + } + } while (collide); + + rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_sender, pipe); + if (rv != 0) { + goto fail; + } + rv = nni_thread_create(&pipe->p_send_thr, nni_pipe_receiver, pipe); + if (rv != 0) { + goto fail; + } + + rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); + if (rv != 0) { + goto fail; + } + nni_list_append(&sock->s_pipes, pipe); + pipe->p_active = 1; + + // XXX: Publish event + nni_cond_broadcast(&pipe->p_cv); + + nni_mutex_exit(&sock->s_mx); + return (0); + +fail: + pipe->p_abort = 1; + pipe->p_reap = 1; + nni_list_append(&sock->s_reaps, pipe); + nni_cond_broadcast(&sock->s_cv); + nni_cond_broadcast(&pipe->p_cv); + nni_mutex_exit(&sock->s_mx); + return (rv); +} diff --git a/src/core/pipe.h b/src/core/pipe.h index 4594f65b..2b4feb7d 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -20,12 +20,20 @@ struct nng_pipe { uint32_t p_id; struct nni_pipe_ops p_ops; void * p_trandata; - void * p_protdata; + void * p_pdata; // protocol specific data + size_t p_psize; // size of protocol data nni_list_node p_node; nni_socket * p_sock; nni_endpt * p_ep; int p_reap; int p_active; + int p_abort; + nni_mutex p_mx; + nni_cond p_cv; + void (*p_send)(void *); + void (*p_recv)(void *); + nni_thread * p_send_thr; + nni_thread * p_recv_thr; }; // Pipe operations that protocols use. @@ -40,6 +48,7 @@ extern int nni_pipe_create(nni_pipe **, nni_endpt *); extern void nni_pipe_destroy(nni_pipe *); +extern int nni_pipe_start(nni_pipe *); extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); #endif // CORE_PIPE_H diff --git a/src/core/protocol.h b/src/core/protocol.h index 63683a6b..a8e93ea7 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -20,9 +20,10 @@ // As a consequence, most of the concurrency in nng exists in the protocol // implementations. struct nni_protocol { - uint16_t proto_self; // our 16-bit protocol ID - uint16_t proto_peer; // who we peer with (protocol ID) - const char * proto_name; // string version of our name + uint16_t proto_self; // our 16-bit protocol ID + uint16_t proto_peer; // who we peer with (ID) + const char * proto_name; // string version of our name + size_t proto_pipe_size; // pipe private data size //Create protocol instance, which will be stored on the socket. int (*proto_create)(void **, nni_socket *); @@ -32,11 +33,17 @@ struct nni_protocol { // Add and remove pipes. These are called as connections are // created or destroyed. - int (*proto_add_pipe)(void *, nni_pipe *, void **); + int (*proto_add_pipe)(void *, nni_pipe *, void *); void (*proto_rem_pipe)(void *, void *); + // Thread functions for processing send & receive sides of + // protocol pipes. Send may be NULL, but recv should should not. + // (Recv needs to detect a closed pipe, if nothing else.) + void (*proto_pipe_send)(void *); + void (*proto_pipe_recv)(void *); + // Option manipulation. These may be NULL. - int (*proto_setopt)(void *, int, const void *, size_t); + int (*proto_setopt)(void *, int, const void *, size_t); int (*proto_getopt)(void *, int, void *, size_t *); // Receive filter. This may be NULL, but if it isn't, then diff --git a/src/core/socket.c b/src/core/socket.c index 8c4cd339..6fa6712d 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -53,9 +53,9 @@ nni_reaper(void *arg) // keep lists of pipes for managing their topologies. // Note that if a protocol has rejected the pipe, it // won't have any data. - if (pipe->p_protdata != NULL) { + if (pipe->p_active) { sock->s_ops.proto_rem_pipe(sock->s_data, - pipe->p_protdata); + pipe->p_pdata); } // If pipe was a connected (dialer) pipe, @@ -217,7 +217,6 @@ nni_socket_close(nni_socket *sock) // Go through and schedule close on all pipes. while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { nni_list_remove(&sock->s_pipes, pipe); - pipe->p_active = 0; pipe->p_reap = 1; nni_list_append(&sock->s_reaps, pipe); } @@ -340,52 +339,6 @@ nni_socket_proto(nni_socket *sock) } -int -nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) -{ - int rv; - int collide; - - nni_mutex_enter(&sock->s_mx); - if (sock->s_closing) { - nni_mutex_exit(&sock->s_mx); - return (NNG_ECLOSED); - } - - do { - // We generate a new pipe ID, but we make sure it does not - // collide with any we already have. This can only normally - // happen if we wrap -- i.e. we've had 4 billion or so pipes. - // XXX: consider making this a hash table!! - nni_pipe *check; - pipe->p_id = nni_plat_nextid() & 0x7FFFFFFF; - collide = 0; - NNI_LIST_FOREACH (&sock->s_pipes, check) { - if (check->p_id == pipe->p_id) { - collide = 1; - break; - } - } - } while (collide); - - pipe->p_id = nni_plat_nextid(); - rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata); - if (rv != 0) { - pipe->p_reap = 1; - nni_list_append(&sock->s_reaps, pipe); - nni_cond_broadcast(&sock->s_cv); - nni_mutex_exit(&sock->s_mx); - return (rv); - } - nni_list_append(&sock->s_pipes, pipe); - pipe->p_active = 1; - - // XXX: Publish event - nni_mutex_exit(&sock->s_mx); - return (0); -} - - int nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags) { diff --git a/src/core/socket.h b/src/core/socket.h index 7b49e85f..ff3ad865 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -47,7 +47,6 @@ struct nng_socket { extern int nni_socket_create(nni_socket **, uint16_t); extern int nni_socket_close(nni_socket *); -extern int nni_socket_add_pipe(nni_socket *, nni_pipe *); extern uint16_t nni_socket_proto(nni_socket *); extern int nni_socket_setopt(nni_socket *, int, const void *, size_t); extern int nni_socket_getopt(nni_socket *, int, void *, size_t *); -- cgit v1.2.3-70-g09d2