diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/pipe.c | 17 | ||||
| -rw-r--r-- | src/core/pipe.h | 20 | ||||
| -rw-r--r-- | src/core/socket.c | 81 | ||||
| -rw-r--r-- | src/core/socket.h | 27 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 24 |
5 files changed, 116 insertions, 53 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c index a08aab8a..1895b74b 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -29,14 +29,6 @@ * performed in the context of the protocol. */ -struct nng_pipe { - uint32_t p_id; - struct nni_pipe_ops p_ops; - void *p_tran; - nni_list_node_t p_node; - nni_socket_t p_sock; -}; - /* nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. */ uint32_t nni_pipe_id(nni_pipe_t p) @@ -64,10 +56,7 @@ nni_pipe_recv(nni_pipe_t p, nng_msg_t *msgp) void nni_pipe_close(nni_pipe_t p) { - /* XXX: we need to unregister from the parent socket. */ - /* XXX: also unregister from the protocol. */ p->p_ops.p_close(p->p_tran); - // XXX: nni_sock_remove_pipe(sock, p); } uint16_t @@ -82,9 +71,3 @@ nni_pipe_destroy(nni_pipe_t p) p->p_ops.p_destroy(p->p_tran); nni_free(p, sizeof (*p)); } - -void -nni_pipe_list_init(nni_list_t *list) -{ - NNI_LIST_INIT(list, struct nng_pipe, p_node); -}
\ No newline at end of file diff --git a/src/core/pipe.h b/src/core/pipe.h index fa3f2bfe..0cedf5e8 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -24,6 +24,25 @@ #define CORE_PIPE_H /* + * NB: This structure is supplied here for use by the CORE. Use of this library + * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR + * TRANSPORTS. + */ + +#include "core/transport.h" + +struct nng_pipe { + uint32_t p_id; + struct nni_pipe_ops p_ops; + void *p_tran; + nni_list_node_t p_sock_node; + nni_socket_t p_sock; + nni_list_node_t p_ep_node; + nni_endpt_t p_ep; +}; + + +/* * Pipe operations that protocols use. */ extern int nni_pipe_recv(nni_pipe_t, nng_msg_t *); @@ -35,7 +54,6 @@ extern void nni_pipe_close(nni_pipe_t); * Used only by the socket core - as we don't wish to expose the details * of the pipe structure outside of pipe.c. */ -extern void nni_pipe_list_init(nni_list_t *); extern int nni_pipe_create(nni_pipe_t *, struct nni_transport *); extern void nni_pipe_destroy(nni_pipe_t); diff --git a/src/core/socket.c b/src/core/socket.c index 094f9ce0..bc18b4a3 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -26,25 +26,6 @@ * Socket implementation. */ -struct nng_socket { - nni_mutex_t s_mx; - - nni_msgqueue_t s_uwq; /* Upper write queue. */ - nni_msgqueue_t s_urq; /* Upper read queue. */ - - struct nni_protocol s_ops; - - void *s_data; /* Protocol private. */ - - /* options */ - - nni_list_t s_eps; - nni_list_t s_pipes; - - int s_besteffort; /* Best effort mode delivery. */ - int s_senderr; /* Protocol state machine use. */ -}; - /* * nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain * the upper read and write queues. @@ -76,7 +57,7 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto) } sock->s_ops = *ops; - nni_pipe_list_init(&sock->s_pipes); + NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node); //NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { @@ -92,11 +73,25 @@ int nni_socket_close(nni_socket_t sock) { nni_pipe_t pipe; + nni_endpt_t ep; nni_msgqueue_close(sock->s_urq); /* XXX: drain this? */ nni_msgqueue_close(sock->s_uwq); + nni_mutex_enter(sock->s_mx); + NNI_LIST_FOREACH(&sock->s_eps, ep) { + #if 0 + nni_ep_stop(ep); + // OR.... + nni_mutex_enter(ep->ep_mx); + ep->ep_stop = 1; + nni_cond_broadcast(ep->ep_cond); + nni_mutex_exit(ep->ep_mx); + #endif + break; /* REMOVE ME */ + } + nni_mutex_exit(sock->s_mx); /* XXX: close endpoints - no new pipes made... */ /* XXX: protocol shutdown */ @@ -112,10 +107,14 @@ nni_socket_close(nni_socket_t sock) /* XXX: close remaining pipes */ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { nni_list_remove(&sock->s_pipes, pipe); - /* XXX: call nni_pipe_close, then nni_pipe_destroy */ + /* XXX: nni_pipe_destroy */ } /* XXX: wait for workers to cease activity */ + while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_list_remove(&sock->s_eps, ep); + /* XXX: nni_ep_destroy(ep); */ + } return (0); } @@ -167,3 +166,43 @@ nni_socket_protocol(nni_socket_t sock) { return (sock->s_ops.proto_self); } + +void +nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe) +{ + nni_mutex_enter(sock->s_mx); + if (pipe->p_sock != sock) { + nni_mutex_exit(sock->s_mx); + } + /* + * Remove the pipe from the protocol. Protocols may + * keep lists of pipes for managing their topologies. + */ + sock->s_ops.proto_remove_pipe(sock->s_data, pipe); + + /* Now remove it from our own list */ + nni_list_remove(&sock->s_pipes, pipe); + pipe->p_sock = NULL; + // XXX: Redial + // XXX: also publish event... + //if (pipe->p_ep != NULL) { + // nn_endpt_remove_pipe(pipe->p_ep, pipe) + //} + nni_mutex_exit(sock->s_mx); +} + +int +nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe) +{ + int rv; + nni_mutex_enter(sock->s_mx); + if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) { + nni_mutex_exit(sock->s_mx); + return (rv); + } + nni_list_append(&sock->s_pipes, pipe); + pipe->p_sock = sock; + /* XXX: Publish event */ + nni_mutex_exit(sock->s_mx); + return (0); +} diff --git a/src/core/socket.h b/src/core/socket.h index 30d863e2..cfc42806 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -24,6 +24,31 @@ #define CORE_SOCKET_H /* + * NB: This structure is supplied here for use by the CORE. Use of this library + * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR + * TRANSPORTS. + */ + +struct nng_socket { + nni_mutex_t s_mx; + + nni_msgqueue_t s_uwq; /* Upper write queue. */ + nni_msgqueue_t s_urq; /* Upper read queue. */ + + struct nni_protocol s_ops; + + void *s_data; /* Protocol private. */ + + /* options */ + + nni_list_t s_eps; + nni_list_t s_pipes; + + int s_besteffort; /* Best effort mode delivery. */ + int s_senderr; /* Protocol state machine use. */ +}; + +/* * Internally used socket API. Again, this stuff is not part of our public * API. */ @@ -31,7 +56,7 @@ extern int nni_socket_create(nni_socket_t *, uint16_t); extern int nni_socket_close(nni_socket_t); extern int nni_socket_add_pipe(nni_socket_t, nni_pipe_t); -extern int nni_socket_remove_pipe(nni_socket_t, nni_pipe_t); +extern void nni_socket_remove_pipe(nni_socket_t, nni_pipe_t); extern uint16_t nni_socket_protocol(nni_socket_t); extern int nni_socket_setopt(nni_socket_t, int, const void *, size_t); extern int nni_socket_getopt(nni_socket_t, int, void *, size_t *); diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 4b8a180b..6bb55a5d 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -183,18 +183,17 @@ pair_sender(void *arg) for (;;) { rv = nni_msgqueue_get_sig(uwq, &msg, -1, &pp->sigclose); if (rv != 0) { - (void) nni_pipe_close(pipe); - return; + break; } rv = nni_pipe_send(pipe, msg); if (rv != 0) { nni_msg_free(msg); - (void) nni_pipe_close(pipe); - /* signal the other side */ - nni_msgqueue_signal(urq, &pp->sigclose); - return; + break; } } + nni_msgqueue_signal(urq, &pp->sigclose); + nni_pipe_close(pipe); + nni_socket_remove_pipe(pair->sock, pipe); } static void @@ -218,18 +217,17 @@ pair_receiver(void *arg) for (;;) { rv = nni_pipe_recv(pipe, &msg); if (rv != 0) { - nni_msg_free(msg); - (void) nni_pipe_close(pipe); - /* signal the other side */ - nni_msgqueue_signal(uwq, &pp->sigclose); - return; + break; } rv = nni_msgqueue_put_sig(urq, msg, -1, &pp->sigclose); if (rv != 0) { - (void) nni_pipe_close(pipe); - return; + nni_msg_free(msg); + break; } } + nni_msgqueue_signal(uwq, &pp->sigclose); + nni_pipe_close(pipe); + nni_socket_remove_pipe(pair->sock, pipe); } static int |
