aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-14 20:44:51 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-14 20:44:51 -0800
commit9e3f9d4d7a953d41210b9d3757fb003573b90308 (patch)
treec2e634c4eab4f836eedfa12e30e2eecd9c1a0009 /src
parent3583d5e407476b8836228c0abc52c400d74aba61 (diff)
downloadnng-9e3f9d4d7a953d41210b9d3757fb003573b90308.tar.gz
nng-9e3f9d4d7a953d41210b9d3757fb003573b90308.tar.bz2
nng-9e3f9d4d7a953d41210b9d3757fb003573b90308.zip
nni_socket_add_pipe and nni_socket_remove_pipe implementation.
Diffstat (limited to 'src')
-rw-r--r--src/core/pipe.c17
-rw-r--r--src/core/pipe.h20
-rw-r--r--src/core/socket.c81
-rw-r--r--src/core/socket.h27
-rw-r--r--src/protocol/pair/pair.c24
5 files changed, 116 insertions, 53 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index a08aab8a..1895b74b 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -29,14 +29,6 @@
* performed in the context of the protocol.
*/
-struct nng_pipe {
- uint32_t p_id;
- struct nni_pipe_ops p_ops;
- void *p_tran;
- nni_list_node_t p_node;
- nni_socket_t p_sock;
-};
-
/* nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. */
uint32_t
nni_pipe_id(nni_pipe_t p)
@@ -64,10 +56,7 @@ nni_pipe_recv(nni_pipe_t p, nng_msg_t *msgp)
void
nni_pipe_close(nni_pipe_t p)
{
- /* XXX: we need to unregister from the parent socket. */
- /* XXX: also unregister from the protocol. */
p->p_ops.p_close(p->p_tran);
- // XXX: nni_sock_remove_pipe(sock, p);
}
uint16_t
@@ -82,9 +71,3 @@ nni_pipe_destroy(nni_pipe_t p)
p->p_ops.p_destroy(p->p_tran);
nni_free(p, sizeof (*p));
}
-
-void
-nni_pipe_list_init(nni_list_t *list)
-{
- NNI_LIST_INIT(list, struct nng_pipe, p_node);
-} \ No newline at end of file
diff --git a/src/core/pipe.h b/src/core/pipe.h
index fa3f2bfe..0cedf5e8 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -24,6 +24,25 @@
#define CORE_PIPE_H
/*
+ * NB: This structure is supplied here for use by the CORE. Use of this library
+ * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
+ * TRANSPORTS.
+ */
+
+#include "core/transport.h"
+
+struct nng_pipe {
+ uint32_t p_id;
+ struct nni_pipe_ops p_ops;
+ void *p_tran;
+ nni_list_node_t p_sock_node;
+ nni_socket_t p_sock;
+ nni_list_node_t p_ep_node;
+ nni_endpt_t p_ep;
+};
+
+
+/*
* Pipe operations that protocols use.
*/
extern int nni_pipe_recv(nni_pipe_t, nng_msg_t *);
@@ -35,7 +54,6 @@ extern void nni_pipe_close(nni_pipe_t);
* Used only by the socket core - as we don't wish to expose the details
* of the pipe structure outside of pipe.c.
*/
-extern void nni_pipe_list_init(nni_list_t *);
extern int nni_pipe_create(nni_pipe_t *, struct nni_transport *);
extern void nni_pipe_destroy(nni_pipe_t);
diff --git a/src/core/socket.c b/src/core/socket.c
index 094f9ce0..bc18b4a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -26,25 +26,6 @@
* Socket implementation.
*/
-struct nng_socket {
- nni_mutex_t s_mx;
-
- nni_msgqueue_t s_uwq; /* Upper write queue. */
- nni_msgqueue_t s_urq; /* Upper read queue. */
-
- struct nni_protocol s_ops;
-
- void *s_data; /* Protocol private. */
-
- /* options */
-
- nni_list_t s_eps;
- nni_list_t s_pipes;
-
- int s_besteffort; /* Best effort mode delivery. */
- int s_senderr; /* Protocol state machine use. */
-};
-
/*
* nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
* the upper read and write queues.
@@ -76,7 +57,7 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
sock->s_ops = *ops;
- nni_pipe_list_init(&sock->s_pipes);
+ NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node);
//NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
@@ -92,11 +73,25 @@ int
nni_socket_close(nni_socket_t sock)
{
nni_pipe_t pipe;
+ nni_endpt_t ep;
nni_msgqueue_close(sock->s_urq);
/* XXX: drain this? */
nni_msgqueue_close(sock->s_uwq);
+ nni_mutex_enter(sock->s_mx);
+ NNI_LIST_FOREACH(&sock->s_eps, ep) {
+ #if 0
+ nni_ep_stop(ep);
+ // OR....
+ nni_mutex_enter(ep->ep_mx);
+ ep->ep_stop = 1;
+ nni_cond_broadcast(ep->ep_cond);
+ nni_mutex_exit(ep->ep_mx);
+ #endif
+ break; /* REMOVE ME */
+ }
+ nni_mutex_exit(sock->s_mx);
/* XXX: close endpoints - no new pipes made... */
/* XXX: protocol shutdown */
@@ -112,10 +107,14 @@ nni_socket_close(nni_socket_t sock)
/* XXX: close remaining pipes */
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
nni_list_remove(&sock->s_pipes, pipe);
- /* XXX: call nni_pipe_close, then nni_pipe_destroy */
+ /* XXX: nni_pipe_destroy */
}
/* XXX: wait for workers to cease activity */
+ while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_list_remove(&sock->s_eps, ep);
+ /* XXX: nni_ep_destroy(ep); */
+ }
return (0);
}
@@ -167,3 +166,43 @@ nni_socket_protocol(nni_socket_t sock)
{
return (sock->s_ops.proto_self);
}
+
+void
+nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe)
+{
+ nni_mutex_enter(sock->s_mx);
+ if (pipe->p_sock != sock) {
+ nni_mutex_exit(sock->s_mx);
+ }
+ /*
+ * Remove the pipe from the protocol. Protocols may
+ * keep lists of pipes for managing their topologies.
+ */
+ sock->s_ops.proto_remove_pipe(sock->s_data, pipe);
+
+ /* Now remove it from our own list */
+ nni_list_remove(&sock->s_pipes, pipe);
+ pipe->p_sock = NULL;
+ // XXX: Redial
+ // XXX: also publish event...
+ //if (pipe->p_ep != NULL) {
+ // nn_endpt_remove_pipe(pipe->p_ep, pipe)
+ //}
+ nni_mutex_exit(sock->s_mx);
+}
+
+int
+nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe)
+{
+ int rv;
+ nni_mutex_enter(sock->s_mx);
+ if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
+ nni_mutex_exit(sock->s_mx);
+ return (rv);
+ }
+ nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_sock = sock;
+ /* XXX: Publish event */
+ nni_mutex_exit(sock->s_mx);
+ return (0);
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 30d863e2..cfc42806 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -24,6 +24,31 @@
#define CORE_SOCKET_H
/*
+ * NB: This structure is supplied here for use by the CORE. Use of this library
+ * OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
+ * TRANSPORTS.
+ */
+
+struct nng_socket {
+ nni_mutex_t s_mx;
+
+ nni_msgqueue_t s_uwq; /* Upper write queue. */
+ nni_msgqueue_t s_urq; /* Upper read queue. */
+
+ struct nni_protocol s_ops;
+
+ void *s_data; /* Protocol private. */
+
+ /* options */
+
+ nni_list_t s_eps;
+ nni_list_t s_pipes;
+
+ int s_besteffort; /* Best effort mode delivery. */
+ int s_senderr; /* Protocol state machine use. */
+};
+
+/*
* Internally used socket API. Again, this stuff is not part of our public
* API.
*/
@@ -31,7 +56,7 @@
extern int nni_socket_create(nni_socket_t *, uint16_t);
extern int nni_socket_close(nni_socket_t);
extern int nni_socket_add_pipe(nni_socket_t, nni_pipe_t);
-extern int nni_socket_remove_pipe(nni_socket_t, nni_pipe_t);
+extern void nni_socket_remove_pipe(nni_socket_t, nni_pipe_t);
extern uint16_t nni_socket_protocol(nni_socket_t);
extern int nni_socket_setopt(nni_socket_t, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket_t, int, void *, size_t *);
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 4b8a180b..6bb55a5d 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -183,18 +183,17 @@ pair_sender(void *arg)
for (;;) {
rv = nni_msgqueue_get_sig(uwq, &msg, -1, &pp->sigclose);
if (rv != 0) {
- (void) nni_pipe_close(pipe);
- return;
+ break;
}
rv = nni_pipe_send(pipe, msg);
if (rv != 0) {
nni_msg_free(msg);
- (void) nni_pipe_close(pipe);
- /* signal the other side */
- nni_msgqueue_signal(urq, &pp->sigclose);
- return;
+ break;
}
}
+ nni_msgqueue_signal(urq, &pp->sigclose);
+ nni_pipe_close(pipe);
+ nni_socket_remove_pipe(pair->sock, pipe);
}
static void
@@ -218,18 +217,17 @@ pair_receiver(void *arg)
for (;;) {
rv = nni_pipe_recv(pipe, &msg);
if (rv != 0) {
- nni_msg_free(msg);
- (void) nni_pipe_close(pipe);
- /* signal the other side */
- nni_msgqueue_signal(uwq, &pp->sigclose);
- return;
+ break;
}
rv = nni_msgqueue_put_sig(urq, msg, -1, &pp->sigclose);
if (rv != 0) {
- (void) nni_pipe_close(pipe);
- return;
+ nni_msg_free(msg);
+ break;
}
}
+ nni_msgqueue_signal(uwq, &pp->sigclose);
+ nni_pipe_close(pipe);
+ nni_socket_remove_pipe(pair->sock, pipe);
}
static int