aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-25 18:08:44 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-25 18:08:44 -0800
commit0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c (patch)
tree1098c7f4976033bb311b45c378079700c9330b62 /src/core
parent64de60d98e8e4a896f9d13e4aa70343f329d88b4 (diff)
downloadnng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.tar.gz
nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.tar.bz2
nng-0cd2fa7310f1fdf45443a8a9e3335658b1c3c64c.zip
Substantial fixes for listen & dialers.
At this point listening and dialing operations appear to function properly. As part of this I had to break the close logic up since otherwise we had a loop trying to reap a thread from itself. So there is now a separate reaper thread for pipes per-socket. I also changed lists to be a bit more rigid, and allocations now zero memory initially. (We had bugs due to uninitialized memory, and rather than hunt them all down, lets just init them to sane zero values.)
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c198
-rw-r--r--src/core/endpt.h5
-rw-r--r--src/core/list.c17
-rw-r--r--src/core/list.h5
-rw-r--r--src/core/panic.c2
-rw-r--r--src/core/pipe.c42
-rw-r--r--src/core/pipe.h9
-rw-r--r--src/core/protocol.h4
-rw-r--r--src/core/socket.c239
-rw-r--r--src/core/socket.h6
10 files changed, 333 insertions, 194 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 1c5f4db5..24d92206 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_listener = NULL;
ep->ep_close = 0;
ep->ep_start = 0;
+ ep->ep_bound = 0;
ep->ep_pipe = NULL;
+ NNI_LIST_NODE_INIT(&ep->ep_node);
if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep)
}
-int
-nni_endpt_bind(nni_endpt *ep)
+static int
+nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
{
- if (ep->ep_close) {
- return (NNG_ECLOSED);
+ nni_pipe *pipe;
+ int rv;
+
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
+ return (rv);
}
- return (ep->ep_ops.ep_bind(ep->ep_data));
+ if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
}
@@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep)
nni_pipe *pipe;
int rv;
- pipe = NULL;
-
- if (ep->ep_close) {
- return (NNG_ECLOSED);
- }
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
- return (rv);
- }
- if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
+ if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ return (0);
}
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
- }
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
+ return (rv);
}
@@ -160,7 +148,6 @@ static void
nni_dialer(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
nni_time cooldown;
@@ -180,6 +167,10 @@ nni_dialer(void *arg)
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
nni_mutex_exit(&ep->ep_mx);
rv = nni_dial_once(ep);
@@ -190,9 +181,12 @@ nni_dialer(void *arg)
case NNG_ENOMEM:
cooldown = 1000000;
break;
+ case NNG_ECLOSED:
+ break;
default:
// XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
+ printf("COOLING DOWN!!\n");
+ cooldown = 1000000;
break;
}
// we inject a delay so we don't just spin hard on
@@ -200,9 +194,12 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
+ nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
+ // We need a different condvar...
nni_cond_waituntil(&ep->ep_cv, cooldown);
}
+ nni_mutex_exit(&ep->ep_mx);
}
}
@@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
- nni_socket *sock = ep->ep_sock;
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
}
- if (sock->s_closing || ep->ep_close) {
+ if (ep->ep_close) {
rv = NNG_ECLOSED;
goto out;
}
@@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
+
if (rv == 0) {
ep->ep_start = 1;
} else {
@@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
out:
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
if (reap != NULL) {
nni_thread_reap(reap);
@@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
if (ep->ep_close) {
return (NNG_ECLOSED);
}
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
return (rv);
}
- if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) {
+ if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) {
nni_pipe_destroy(pipe);
return (rv);
}
- pipe->p_ep = ep;
*pp = pipe;
return (0);
}
+
+
+static void
+nni_listener(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ for (;;) {
+ nni_time cooldown;
+ nni_mutex_enter(&ep->ep_mx);
+
+ // If we didn't bind synchronously, do it now.
+ while (!ep->ep_bound && !ep->ep_close) {
+ int rv;
+
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ } else {
+ // Invalid address? Out of memory? Who knows.
+ // Try again in a bit (10ms).
+ // XXX: PROPER BACKOFF NEEDED
+ cooldown = 10000;
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv,
+ cooldown);
+ }
+ }
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ continue;
+ }
+ if (rv == NNG_ECLOSED) {
+ break;
+ }
+ cooldown = 1000; // 1 ms cooldown
+ if (rv == NNG_ENOMEM) {
+ // For out of memory, we need to give more
+ // time for the system to reclaim resources.
+ cooldown = 100000; // 100ms
+ }
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ }
+}
+
+
+int
+nni_endpt_listen(nni_endpt *ep, int flags)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
+ if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_listener;
+ ep->ep_listener = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 71d3ff59..36f55de2 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -18,7 +18,7 @@
struct nng_endpt {
nni_endpt_ops ep_ops;
void * ep_data; // Transport private
- nni_list_node ep_sock_node; // Per socket list
+ nni_list_node ep_node; // Per socket list
nni_socket * ep_sock;
char ep_addr[NNG_MAXADDRLEN];
nni_thread * ep_dialer;
@@ -26,6 +26,7 @@ struct nng_endpt {
int ep_stop; // thread exits before start
int ep_start; // start thread running
int ep_close; // full shutdown
+ int ep_bound; // true if we bound locally
nni_mutex ep_mx;
nni_cond ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
@@ -33,9 +34,9 @@ struct nng_endpt {
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
extern void nni_endpt_destroy(nni_endpt *);
-extern int nni_endpt_listen(nni_endpt *);
extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
extern void nni_endpt_close(nni_endpt *);
extern int nni_endpt_dial(nni_endpt *, int);
+extern int nni_endpt_listen(nni_endpt *, int);
#endif // CORE_ENDPT_H
diff --git a/src/core/list.c b/src/core/list.c
index 69a74db6..8d6c3ace 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -58,6 +58,9 @@ nni_list_append(nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("appending node already on a list or not inited");
+ }
node->ln_prev = list->ll_head.ln_prev;
node->ln_next = &list->ll_head;
node->ln_next->ln_prev = node;
@@ -70,6 +73,9 @@ nni_list_prepend(nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("prepending node already on a list or not inited");
+ }
node->ln_next = list->ll_head.ln_next;
node->ln_prev = &list->ll_head;
node->ln_next->ln_prev = node;
@@ -108,13 +114,6 @@ nni_list_remove(nni_list *list, void *item)
node->ln_prev->ln_next = node->ln_next;
node->ln_next->ln_prev = node->ln_prev;
-}
-
-
-void
-nni_list_node_init(nni_list *list, void *item)
-{
- nni_list_node *node = NODE(list, item);
-
- node->ln_prev = node->ln_next = NULL;
+ node->ln_next = NULL;
+ node->ln_prev = NULL;
}
diff --git a/src/core/list.h b/src/core/list.h
index 822d9621..9d95a527 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -28,6 +28,10 @@ extern void nni_list_init_offset(nni_list *list, size_t offset);
#define NNI_LIST_INIT(list, type, field) \
nni_list_init_offset(list, offsetof(type, field))
+
+#define NNI_LIST_NODE_INIT(node) \
+ { (node)->ln_prev = (node)->ln_next = 0; }
+
extern void *nni_list_first(nni_list *);
extern void *nni_list_last(nni_list *);
extern void nni_list_append(nni_list *, void *);
@@ -35,7 +39,6 @@ extern void nni_list_prepend(nni_list *, void *);
extern void *nni_list_next(nni_list *, void *);
extern void *nni_list_prev(nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
-extern void nni_list_node_init(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/panic.c b/src/core/panic.c
index d0d6078d..2205779f 100644
--- a/src/core/panic.c
+++ b/src/core/panic.c
@@ -19,7 +19,7 @@
#include "core/nng_impl.h"
// Panic handling.
-static void
+void
nni_show_backtrace(void)
{
#if NNG_HAVE_BACKTRACE
diff --git a/src/core/pipe.c b/src/core/pipe.c
index cc92e6fe..0a7bbed1 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -25,14 +25,14 @@ nni_pipe_id(nni_pipe *p)
int
nni_pipe_send(nni_pipe *p, nng_msg *msg)
{
- return (p->p_ops.p_send(p->p_data, msg));
+ return (p->p_ops.p_send(p->p_trandata, msg));
}
int
nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
{
- return (p->p_ops.p_recv(p->p_data, msgp));
+ return (p->p_ops.p_recv(p->p_trandata, msgp));
}
@@ -42,38 +42,60 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
void
nni_pipe_close(nni_pipe *p)
{
- p->p_ops.p_close(p->p_data);
+ nni_socket *sock = p->p_sock;
+
+ p->p_ops.p_close(p->p_trandata);
+
+ nni_mutex_enter(&sock->s_mx);
+ if (!p->p_reap) {
+ // schedule deferred reap/close
+ p->p_reap = 1;
+ if (p->p_active) {
+ nni_list_remove(&sock->s_pipes, p);
+ p->p_active = 0;
+ }
+ nni_list_append(&sock->s_reaps, p);
+ nni_cond_broadcast(&sock->s_cv);
+ }
+ nni_mutex_exit(&sock->s_mx);
}
uint16_t
nni_pipe_peer(nni_pipe *p)
{
- return (p->p_ops.p_peer(p->p_data));
+ return (p->p_ops.p_peer(p->p_trandata));
}
void
nni_pipe_destroy(nni_pipe *p)
{
- if (p->p_data != NULL) {
- p->p_ops.p_destroy(p->p_data);
+ if (p->p_trandata != NULL) {
+ p->p_ops.p_destroy(p->p_trandata);
}
nni_free(p, sizeof (*p));
}
int
-nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops)
+nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
{
nni_pipe *p;
if ((p = nni_alloc(sizeof (*p))) == NULL) {
return (NNG_ENOMEM);
}
- p->p_data = NULL;
- p->p_ops = *ops;
+ p->p_trandata = NULL;
+ p->p_protdata = NULL;
+ p->p_ops = *ep->ep_ops.ep_pipe_ops;
p->p_id = nni_plat_nextid();
+ p->p_ep = ep;
+ p->p_sock = ep->ep_sock;
+ if (ep->ep_dialer != NULL) {
+ ep->ep_pipe = p;
+ }
+ NNI_LIST_NODE_INIT(&p->p_node);
*pp = p;
return (0);
}
@@ -86,5 +108,5 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
if (p->p_ops.p_getopt == NULL) {
return (NNG_ENOTSUP);
}
- return (p->p_ops.p_getopt(p->p_data, opt, val, szp));
+ return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp));
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 5349bbb1..4594f65b 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -19,10 +19,13 @@
struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
- void * p_data;
- nni_list_node p_sock_node;
+ void * p_trandata;
+ void * p_protdata;
+ nni_list_node p_node;
nni_socket * p_sock;
nni_endpt * p_ep;
+ int p_reap;
+ int p_active;
};
// Pipe operations that protocols use.
@@ -33,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *);
+extern int nni_pipe_create(nni_pipe **, nni_endpt *);
extern void nni_pipe_destroy(nni_pipe *);
diff --git a/src/core/protocol.h b/src/core/protocol.h
index f73825d7..63683a6b 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -32,8 +32,8 @@ struct nni_protocol {
// Add and remove pipes. These are called as connections are
// created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *);
- int (*proto_rem_pipe)(void *, nni_pipe *);
+ int (*proto_add_pipe)(void *, nni_pipe *, void **);
+ void (*proto_rem_pipe)(void *, void *);
// Option manipulation. These may be NULL.
int (*proto_setopt)(void *, int, const void *, size_t);
diff --git a/src/core/socket.c b/src/core/socket.c
index 9263b142..ce5dbf3c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s)
}
+// Because we have to call back into the socket, and possibly also the proto,
+// and wait for threads to terminate, we do this in a special thread. The
+// assumption is that closing is always a "fast" operation.
+static void
+nni_reaper(void *arg)
+{
+ nni_socket *sock = arg;
+
+ for (;;) {
+ nni_pipe *pipe;
+ nni_endpt *ep;
+
+ nni_mutex_enter(&sock->s_mx);
+ if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
+ nni_list_remove(&sock->s_reaps, pipe);
+ nni_mutex_exit(&sock->s_mx);
+
+ // This should already have been done.
+ pipe->p_ops.p_close(pipe->p_trandata);
+
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
+ // Note that if a protocol has rejected the pipe, it
+ // won't have any data.
+ if (pipe->p_protdata != NULL) {
+ sock->s_ops.proto_rem_pipe(sock->s_data,
+ pipe->p_protdata);
+ }
+
+ // If pipe was a connected (dialer) pipe,
+ // then let the endpoint know so it can try to
+ // reestablish the connection.
+ if ((ep = pipe->p_ep) != NULL) {
+ ep->ep_pipe = NULL;
+ pipe->p_ep = NULL;
+ nni_mutex_enter(&ep->ep_mx);
+ nni_cond_signal(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ }
+
+ // XXX: also publish event...
+ nni_pipe_destroy(pipe);
+ continue;
+ }
+
+ if (sock->s_reaper == NULL) {
+ nni_mutex_exit(&sock->s_mx);
+ break;
+ }
+
+ nni_cond_wait(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ }
+}
+
+
// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket **sockp, uint16_t proto)
@@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
+ sock->s_closing = 0;
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = NNI_SECOND;
@@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (rv);
}
- NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node);
- NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node);
+ NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node);
+
+ if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) {
+ goto fail;
+ }
if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) ||
((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) {
@@ -81,6 +143,14 @@ fail:
if (sock->s_uwq != NULL) {
nni_msgqueue_destroy(sock->s_uwq);
}
+ if (sock->s_reaper != NULL) {
+ nni_thread *reap = sock->s_reaper;
+ nni_mutex_enter(&sock->s_mx);
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ nni_thread_reap(reap);
+ }
nni_cond_fini(&sock->s_cv);
nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
@@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock)
nni_pipe *pipe;
nni_endpt *ep;
nni_time linger;
+ nni_thread *reaper;
nni_mutex_enter(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
@@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock)
// safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- // Go through and close all the pipes.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
+ // Go through and schedule close on all pipes.
+ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
+ nni_list_remove(&sock->s_pipes, pipe);
+ pipe->p_active = 0;
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
}
- // At this point, the protocols should have all their operations
- // failing, if they have any remaining, and they should be returning
- // any pipes back to us very quickly. We'll wait for them to finish,
- // as it MUST occur shortly.
- while (nni_list_first(&sock->s_pipes) != NULL) {
- nni_cond_wait(&sock->s_cv);
- }
+ // Tell the reaper it's done once it finishes. Also kick it off.
+ reaper = sock->s_reaper;
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
// We already told the endpoints to shutdown. We just
// need to reap them now.
@@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
+ // Wait for the reaper to exit.
+ nni_thread_reap(reaper);
+
// At this point nothing else should be referencing us.
// The protocol needs to clean up its state.
sock->s_ops.proto_destroy(sock->s_data);
@@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock)
}
-// nni_socket_rem_pipe removes the pipe from the socket. This is often
-// called by the protocol when a pipe is removed due to close.
-void
-nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
-{
- nni_endpt *ep;
-
- nni_mutex_enter(&sock->s_mx);
- if (pipe->p_sock != sock) {
- nni_mutex_exit(&sock->s_mx);
- }
-
- // Remove the pipe from the protocol. Protocols may
- // keep lists of pipes for managing their topologies.
- sock->s_ops.proto_rem_pipe(sock->s_data, pipe);
-
- // Now remove it from our own list.
- nni_list_remove(&sock->s_pipes, pipe);
- pipe->p_sock = NULL;
-
- // If we were a connected (dialer) pipe, then let the endpoint
- // know so it can try to reestablish the connection.
- if ((ep = pipe->p_ep) != NULL) {
- ep->ep_pipe = NULL;
- pipe->p_ep = NULL;
- nni_mutex_enter(&ep->ep_mx);
- nni_cond_signal(&ep->ep_cv);
- nni_mutex_exit(&ep->ep_mx);
- }
-
- // XXX: also publish event...
- nni_pipe_destroy(pipe);
-
- // If we're closing, wake the socket if we finished draining.
- if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
- nni_cond_broadcast(&sock->s_cv);
- }
- nni_mutex_exit(&sock->s_mx);
-}
-
-
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
+nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
{
int rv;
@@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
nni_mutex_exit(&sock->s_mx);
return (NNG_ECLOSED);
}
- if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
+ rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata);
+ if (rv != 0) {
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
+ nni_cond_broadcast(&sock->s_cv);
nni_mutex_exit(&sock->s_mx);
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_active = 1;
- pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
@@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
if (rv != 0) {
nni_endpt_close(ep);
nni_endpt_destroy(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
- return (rv);
-}
-
-
-static void
-nni_socket_accepter(void *arg)
-{
- nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
- int rv;
-
- for (;;) {
- nni_mutex_enter(&ep->ep_mx);
- if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
- break;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- pipe = NULL;
-
- if (((rv = nni_endpt_accept(ep, &pipe)) != 0) ||
- ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) {
- if (rv == NNG_ECLOSED) {
- break;
- }
- if (pipe != NULL) {
- nni_pipe_destroy(pipe);
- }
- // XXX: Publish accept error event...
-
- // If we can't allocate memory, don't spin, so that
- // things get a chance to release memory later.
- // Other errors, like ECONNRESET, should not recur.
- // (If we find otherwise we can inject a short sleep
- // here of about 1 ms without too much penalty.)
- if (rv == NNG_ENOMEM) {
- nni_usleep(100000);
- }
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
}
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
+ return (rv);
}
int
-nni_socket_accept(nni_socket *sock, nni_endpt *ep)
+nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp,
+ int flags)
{
- int rv = 0;
+ nni_endpt *ep;
+ int rv;
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
- }
- if (ep->ep_sock != sock) { // Should never happen
- rv = NNG_EINVAL;
- goto out;
- }
- if (sock->s_closing) {
- rv = NNG_ECLOSED;
- goto out;
+ if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) {
+ return (rv);
}
- if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
+ rv = nni_endpt_listen(ep, flags);
+ if (rv != 0) {
+ nni_endpt_close(ep);
+ nni_endpt_destroy(ep);
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
+ }
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
- nni_list_append(&sock->s_eps, ep);
-out:
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
-
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4e1ea166..37a0e5eb 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -34,6 +34,9 @@ struct nng_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // pipes for this socket
+ nni_list s_reaps; // pipes to reap
+ nni_thread * s_reaper;
+
int s_closing; // Socket is closing
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
@@ -44,8 +47,7 @@ struct nng_socket {
extern int nni_socket_create(nni_socket **, uint16_t);
extern int nni_socket_close(nni_socket *);
-extern int nni_socket_add_pipe(nni_socket *, nni_pipe *, int);
-extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *);
+extern int nni_socket_add_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);