aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-14 20:44:51 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-14 20:44:51 -0800
commit9e3f9d4d7a953d41210b9d3757fb003573b90308 (patch)
treec2e634c4eab4f836eedfa12e30e2eecd9c1a0009 /src/core/socket.c
parent3583d5e407476b8836228c0abc52c400d74aba61 (diff)
downloadnng-9e3f9d4d7a953d41210b9d3757fb003573b90308.tar.gz
nng-9e3f9d4d7a953d41210b9d3757fb003573b90308.tar.bz2
nng-9e3f9d4d7a953d41210b9d3757fb003573b90308.zip
nni_socket_add_pipe and nni_socket_remove_pipe implementation.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c81
1 files changed, 60 insertions, 21 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 094f9ce0..bc18b4a3 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -26,25 +26,6 @@
* Socket implementation.
*/
-struct nng_socket {
- nni_mutex_t s_mx;
-
- nni_msgqueue_t s_uwq; /* Upper write queue. */
- nni_msgqueue_t s_urq; /* Upper read queue. */
-
- struct nni_protocol s_ops;
-
- void *s_data; /* Protocol private. */
-
- /* options */
-
- nni_list_t s_eps;
- nni_list_t s_pipes;
-
- int s_besteffort; /* Best effort mode delivery. */
- int s_senderr; /* Protocol state machine use. */
-};
-
/*
* nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
* the upper read and write queues.
@@ -76,7 +57,7 @@ nni_socket_create(nni_socket_t *sockp, uint16_t proto)
}
sock->s_ops = *ops;
- nni_pipe_list_init(&sock->s_pipes);
+ NNI_LIST_INIT(&sock->s_pipes, struct nng_pipe, p_sock_node);
//NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
@@ -92,11 +73,25 @@ int
nni_socket_close(nni_socket_t sock)
{
nni_pipe_t pipe;
+ nni_endpt_t ep;
nni_msgqueue_close(sock->s_urq);
/* XXX: drain this? */
nni_msgqueue_close(sock->s_uwq);
+ nni_mutex_enter(sock->s_mx);
+ NNI_LIST_FOREACH(&sock->s_eps, ep) {
+ #if 0
+ nni_ep_stop(ep);
+ // OR....
+ nni_mutex_enter(ep->ep_mx);
+ ep->ep_stop = 1;
+ nni_cond_broadcast(ep->ep_cond);
+ nni_mutex_exit(ep->ep_mx);
+ #endif
+ break; /* REMOVE ME */
+ }
+ nni_mutex_exit(sock->s_mx);
/* XXX: close endpoints - no new pipes made... */
/* XXX: protocol shutdown */
@@ -112,10 +107,14 @@ nni_socket_close(nni_socket_t sock)
/* XXX: close remaining pipes */
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
nni_list_remove(&sock->s_pipes, pipe);
- /* XXX: call nni_pipe_close, then nni_pipe_destroy */
+ /* XXX: nni_pipe_destroy */
}
/* XXX: wait for workers to cease activity */
+ while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_list_remove(&sock->s_eps, ep);
+ /* XXX: nni_ep_destroy(ep); */
+ }
return (0);
}
@@ -167,3 +166,43 @@ nni_socket_protocol(nni_socket_t sock)
{
return (sock->s_ops.proto_self);
}
+
+void
+nni_socket_remove_pipe(nni_socket_t sock, nni_pipe_t pipe)
+{
+ nni_mutex_enter(sock->s_mx);
+ if (pipe->p_sock != sock) {
+ nni_mutex_exit(sock->s_mx);
+ }
+ /*
+ * Remove the pipe from the protocol. Protocols may
+ * keep lists of pipes for managing their topologies.
+ */
+ sock->s_ops.proto_remove_pipe(sock->s_data, pipe);
+
+ /* Now remove it from our own list */
+ nni_list_remove(&sock->s_pipes, pipe);
+ pipe->p_sock = NULL;
+ // XXX: Redial
+ // XXX: also publish event...
+ //if (pipe->p_ep != NULL) {
+ // nn_endpt_remove_pipe(pipe->p_ep, pipe)
+ //}
+ nni_mutex_exit(sock->s_mx);
+}
+
+int
+nni_socket_add_pipe(nni_socket_t sock, nni_pipe_t pipe)
+{
+ int rv;
+ nni_mutex_enter(sock->s_mx);
+ if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
+ nni_mutex_exit(sock->s_mx);
+ return (rv);
+ }
+ nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_sock = sock;
+ /* XXX: Publish event */
+ nni_mutex_exit(sock->s_mx);
+ return (0);
+}