From 9e3f9d4d7a953d41210b9d3757fb003573b90308 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 14 Dec 2016 20:44:51 -0800 Subject: nni_socket_add_pipe and nni_socket_remove_pipe implementation. --- src/core/socket.c | 81 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 21 deletions(-) (limited to 'src/core/socket.c') diff --git a/src/core/socket.c b/src/core/socket.c index 094f9ce0..bc18b4a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -26,25 +26,6 @@ * Socket implementation. */ -struct nng_socket { - nni_mutex_t s_mx; - - nni_msgqueue_t s_uwq; /* Upper write queue. */ - nni_msgqueue_t s_urq; /* Upper read queue. */ - - struct nni_protocol s_ops; - - void *s_data; /* Protocol private. */ - - /* options */ - - nni_list_t s_eps; - nni_list_t s_pipes; - - int s_besteffort; /* Best effort mode delivery. */ - int s_senderr; /* Protocol state machine use. */ -}; - /* * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain * the upper read and write queues. @@ -76,7 +57,7 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } sock->s_ops = *ops; - nni_pipe_list_init(&sock->s_pipes); + NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { @@ -92,11 +73,25 @@ int nni_socket_close(nni_socket_t sock) { nni_pipe_t pipe; + nni_endpt_t ep; nni_msgqueue_close(sock->s_urq); /* XXX: drain this? */ nni_msgqueue_close(sock->s_uwq); + nni_mutex_enter(sock->s_mx); + NNI_LIST_FOREACH(&sock->s_eps, ep) { + #if 0 + nni_ep_stop(ep); + // OR.... + nni_mutex_enter(ep->ep_mx); + ep->ep_stop = 1; + nni_cond_broadcast(ep->ep_cond); + nni_mutex_exit(ep->ep_mx); + #endif + break; /* REMOVE ME */ + } + nni_mutex_exit(sock->s_mx); /* XXX: close endpoints - no new pipes made... */ /* XXX: protocol shutdown */ @@ -112,10 +107,14 @@ nni_socket_close(nni_socket_t sock) /* XXX: close remaining pipes */ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { nni_list_remove(&sock->s_pipes, pipe); - /* XXX: call nni_pipe_close, then nni_pipe_destroy */ + /* XXX: nni_pipe_destroy */ } /* XXX: wait for workers to cease activity */ + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_list_remove(&sock->s_eps, ep); + /* XXX: nni_ep_destroy(ep); */ + } return (0); } @@ -167,3 +166,43 @@ nni_socket_protocol(nni_socket_t sock) { return (sock->s_ops.proto_self); } + +void +nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe) +{ + nni_mutex_enter(sock->s_mx); + if (pipe->p_sock != sock) { + nni_mutex_exit(sock->s_mx); + } + /* + * Remove the pipe from the protocol. Protocols may + * keep lists of pipes for managing their topologies. + */ + sock->s_ops.proto_remove_pipe(sock->s_data, pipe); + + /* Now remove it from our own list */ + nni_list_remove(&sock->s_pipes, pipe); + pipe->p_sock = NULL; + // XXX: Redial + // XXX: also publish event... + //if (pipe->p_ep != NULL) { + // nn_endpt_remove_pipe(pipe->p_ep, pipe) + //} + nni_mutex_exit(sock->s_mx); +} + +int +nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe) +{ + int rv; + nni_mutex_enter(sock->s_mx); + if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { + nni_mutex_exit(sock->s_mx); + return (rv); + } + nni_list_append(&sock->s_pipes, pipe); + pipe->p_sock = sock; + /* XXX: Publish event */ + nni_mutex_exit(sock->s_mx); + return (0); +} -- cgit v1.2.3-70-g09d2