aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c198
-rw-r--r--src/core/endpt.h5
-rw-r--r--src/core/list.c17
-rw-r--r--src/core/list.h5
-rw-r--r--src/core/panic.c2
-rw-r--r--src/core/pipe.c42
-rw-r--r--src/core/pipe.h9
-rw-r--r--src/core/protocol.h4
-rw-r--r--src/core/socket.c239
-rw-r--r--src/core/socket.h6
10 files changed, 333 insertions, 194 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 1c5f4db5..24d92206 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_listener = NULL;
ep->ep_close = 0;
ep->ep_start = 0;
+ ep->ep_bound = 0;
ep->ep_pipe = NULL;
+ NNI_LIST_NODE_INIT(&ep->ep_node);
if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep)
}
-int
-nni_endpt_bind(nni_endpt *ep)
+static int
+nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
{
- if (ep->ep_close) {
- return (NNG_ECLOSED);
+ nni_pipe *pipe;
+ int rv;
+
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
+ return (rv);
}
- return (ep->ep_ops.ep_bind(ep->ep_data));
+ if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
}
@@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep)
nni_pipe *pipe;
int rv;
- pipe = NULL;
-
- if (ep->ep_close) {
- return (NNG_ECLOSED);
- }
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
- return (rv);
- }
- if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
+ if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ return (0);
}
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
- }
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
+ return (rv);
}
@@ -160,7 +148,6 @@ static void
nni_dialer(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
nni_time cooldown;
@@ -180,6 +167,10 @@ nni_dialer(void *arg)
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
nni_mutex_exit(&ep->ep_mx);
rv = nni_dial_once(ep);
@@ -190,9 +181,12 @@ nni_dialer(void *arg)
case NNG_ENOMEM:
cooldown = 1000000;
break;
+ case NNG_ECLOSED:
+ break;
default:
// XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
+ printf("COOLING DOWN!!\n");
+ cooldown = 1000000;
break;
}
// we inject a delay so we don't just spin hard on
@@ -200,9 +194,12 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
+ nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
+ // We need a different condvar...
nni_cond_waituntil(&ep->ep_cv, cooldown);
}
+ nni_mutex_exit(&ep->ep_mx);
}
}
@@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
- nni_socket *sock = ep->ep_sock;
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
}
- if (sock->s_closing || ep->ep_close) {
+ if (ep->ep_close) {
rv = NNG_ECLOSED;
goto out;
}
@@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
+
if (rv == 0) {
ep->ep_start = 1;
} else {
@@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
out:
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
if (reap != NULL) {
nni_thread_reap(reap);
@@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
if (ep->ep_close) {
return (NNG_ECLOSED);
}
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
return (rv);
}
- if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) {
+ if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) {
nni_pipe_destroy(pipe);
return (rv);
}
- pipe->p_ep = ep;
*pp = pipe;
return (0);
}
+
+
+static void
+nni_listener(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ for (;;) {
+ nni_time cooldown;
+ nni_mutex_enter(&ep->ep_mx);
+
+ // If we didn't bind synchronously, do it now.
+ while (!ep->ep_bound && !ep->ep_close) {
+ int rv;
+
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ } else {
+ // Invalid address? Out of memory? Who knows.
+ // Try again in a bit (10ms).
+ // XXX: PROPER BACKOFF NEEDED
+ cooldown = 10000;
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv,
+ cooldown);
+ }
+ }
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ continue;
+ }
+ if (rv == NNG_ECLOSED) {
+ break;
+ }
+ cooldown = 1000; // 1 ms cooldown
+ if (rv == NNG_ENOMEM) {
+ // For out of memory, we need to give more
+ // time for the system to reclaim resources.
+ cooldown = 100000; // 100ms
+ }
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ }
+}
+
+
+int
+nni_endpt_listen(nni_endpt *ep, int flags)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
+ if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_listener;
+ ep->ep_listener = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 71d3ff59..36f55de2 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -18,7 +18,7 @@
struct nng_endpt {
nni_endpt_ops ep_ops;
void * ep_data; // Transport private
- nni_list_node ep_sock_node; // Per socket list
+ nni_list_node ep_node; // Per socket list
nni_socket * ep_sock;
char ep_addr[NNG_MAXADDRLEN];
nni_thread * ep_dialer;
@@ -26,6 +26,7 @@ struct nng_endpt {
int ep_stop; // thread exits before start
int ep_start; // start thread running
int ep_close; // full shutdown
+ int ep_bound; // true if we bound locally
nni_mutex ep_mx;
nni_cond ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
@@ -33,9 +34,9 @@ struct nng_endpt {
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
extern void nni_endpt_destroy(nni_endpt *);
-extern int nni_endpt_listen(nni_endpt *);
extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
extern void nni_endpt_close(nni_endpt *);
extern int nni_endpt_dial(nni_endpt *, int);
+extern int nni_endpt_listen(nni_endpt *, int);
#endif // CORE_ENDPT_H
diff --git a/src/core/list.c b/src/core/list.c
index 69a74db6..8d6c3ace 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -58,6 +58,9 @@ nni_list_append(nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("appending node already on a list or not inited");
+ }
node->ln_prev = list->ll_head.ln_prev;
node->ln_next = &list->ll_head;
node->ln_next->ln_prev = node;
@@ -70,6 +73,9 @@ nni_list_prepend(nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("prepending node already on a list or not inited");
+ }
node->ln_next = list->ll_head.ln_next;
node->ln_prev = &list->ll_head;
node->ln_next->ln_prev = node;
@@ -108,13 +114,6 @@ nni_list_remove(nni_list *list, void *item)
node->ln_prev->ln_next = node->ln_next;
node->ln_next->ln_prev = node->ln_prev;
-}
-
-
-void
-nni_list_node_init(nni_list *list, void *item)
-{
- nni_list_node *node = NODE(list, item);
-
- node->ln_prev = node->ln_next = NULL;
+ node->ln_next = NULL;
+ node->ln_prev = NULL;
}
diff --git a/src/core/list.h b/src/core/list.h
index 822d9621..9d95a527 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -28,6 +28,10 @@ extern void nni_list_init_offset(nni_list *list, size_t offset);
#define NNI_LIST_INIT(list, type, field) \
nni_list_init_offset(list, offsetof(type, field))
+
+#define NNI_LIST_NODE_INIT(node) \
+ { (node)->ln_prev = (node)->ln_next = 0; }
+
extern void *nni_list_first(nni_list *);
extern void *nni_list_last(nni_list *);
extern void nni_list_append(nni_list *, void *);
@@ -35,7 +39,6 @@ extern void nni_list_prepend(nni_list *, void *);
extern void *nni_list_next(nni_list *, void *);
extern void *nni_list_prev(nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
-extern void nni_list_node_init(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/panic.c b/src/core/panic.c
index d0d6078d..2205779f 100644
--- a/src/core/panic.c
+++ b/src/core/panic.c
@@ -19,7 +19,7 @@
#include "core/nng_impl.h"
// Panic handling.
-static void
+void
nni_show_backtrace(void)
{
#if NNG_HAVE_BACKTRACE
diff --git a/src/core/pipe.c b/src/core/pipe.c
index cc92e6fe..0a7bbed1 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -25,14 +25,14 @@ nni_pipe_id(nni_pipe *p)
int
nni_pipe_send(nni_pipe *p, nng_msg *msg)
{
- return (p->p_ops.p_send(p->p_data, msg));
+ return (p->p_ops.p_send(p->p_trandata, msg));
}
int
nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
{
- return (p->p_ops.p_recv(p->p_data, msgp));
+ return (p->p_ops.p_recv(p->p_trandata, msgp));
}
@@ -42,38 +42,60 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
void
nni_pipe_close(nni_pipe *p)
{
- p->p_ops.p_close(p->p_data);
+ nni_socket *sock = p->p_sock;
+
+ p->p_ops.p_close(p->p_trandata);
+
+ nni_mutex_enter(&sock->s_mx);
+ if (!p->p_reap) {
+ // schedule deferred reap/close
+ p->p_reap = 1;
+ if (p->p_active) {
+ nni_list_remove(&sock->s_pipes, p);
+ p->p_active = 0;
+ }
+ nni_list_append(&sock->s_reaps, p);
+ nni_cond_broadcast(&sock->s_cv);
+ }
+ nni_mutex_exit(&sock->s_mx);
}
uint16_t
nni_pipe_peer(nni_pipe *p)
{
- return (p->p_ops.p_peer(p->p_data));
+ return (p->p_ops.p_peer(p->p_trandata));
}
void
nni_pipe_destroy(nni_pipe *p)
{
- if (p->p_data != NULL) {
- p->p_ops.p_destroy(p->p_data);
+ if (p->p_trandata != NULL) {
+ p->p_ops.p_destroy(p->p_trandata);
}
nni_free(p, sizeof (*p));
}
int
-nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops)
+nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
{
nni_pipe *p;
if ((p = nni_alloc(sizeof (*p))) == NULL) {
return (NNG_ENOMEM);
}
- p->p_data = NULL;
- p->p_ops = *ops;
+ p->p_trandata = NULL;
+ p->p_protdata = NULL;
+ p->p_ops = *ep->ep_ops.ep_pipe_ops;
p->p_id = nni_plat_nextid();
+ p->p_ep = ep;
+ p->p_sock = ep->ep_sock;
+ if (ep->ep_dialer != NULL) {
+ ep->ep_pipe = p;
+ }
+ NNI_LIST_NODE_INIT(&p->p_node);
*pp = p;
return (0);
}
@@ -86,5 +108,5 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
if (p->p_ops.p_getopt == NULL) {
return (NNG_ENOTSUP);
}
- return (p->p_ops.p_getopt(p->p_data, opt, val, szp));
+ return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp));
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 5349bbb1..4594f65b 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -19,10 +19,13 @@
struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
- void * p_data;
- nni_list_node p_sock_node;
+ void * p_trandata;
+ void * p_protdata;
+ nni_list_node p_node;
nni_socket * p_sock;
nni_endpt * p_ep;
+ int p_reap;
+ int p_active;
};
// Pipe operations that protocols use.
@@ -33,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *);
+extern int nni_pipe_create(nni_pipe **, nni_endpt *);
extern void nni_pipe_destroy(nni_pipe *);
diff --git a/src/core/protocol.h b/src/core/protocol.h
index f73825d7..63683a6b 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -32,8 +32,8 @@ struct nni_protocol {
// Add and remove pipes. These are called as connections are
// created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *);
- int (*proto_rem_pipe)(void *, nni_pipe *);
+ int (*proto_add_pipe)(void *, nni_pipe *, void **);
+ void (*proto_rem_pipe)(void *, void *);
// Option manipulation. These may be NULL.
int (*proto_setopt)(void *, int, const void *, size_t);
diff --git a/src/core/socket.c b/src/core/socket.c
index 9263b142..ce5dbf3c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s)
}
+// Because we have to call back into the socket, and possibly also the proto,
+// and wait for threads to terminate, we do this in a special thread. The
+// assumption is that closing is always a "fast" operation.
+static void
+nni_reaper(void *arg)
+{
+ nni_socket *sock = arg;
+
+ for (;;) {
+ nni_pipe *pipe;
+ nni_endpt *ep;
+
+ nni_mutex_enter(&sock->s_mx);
+ if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
+ nni_list_remove(&sock->s_reaps, pipe);
+ nni_mutex_exit(&sock->s_mx);
+
+ // This should already have been done.
+ pipe->p_ops.p_close(pipe->p_trandata);
+
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
+ // Note that if a protocol has rejected the pipe, it
+ // won't have any data.
+ if (pipe->p_protdata != NULL) {
+ sock->s_ops.proto_rem_pipe(sock->s_data,
+ pipe->p_protdata);
+ }
+
+ // If pipe was a connected (dialer) pipe,
+ // then let the endpoint know so it can try to
+ // reestablish the connection.
+ if ((ep = pipe->p_ep) != NULL) {
+ ep->ep_pipe = NULL;
+ pipe->p_ep = NULL;
+ nni_mutex_enter(&ep->ep_mx);
+ nni_cond_signal(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ }
+
+ // XXX: also publish event...
+ nni_pipe_destroy(pipe);
+ continue;
+ }
+
+ if (sock->s_reaper == NULL) {
+ nni_mutex_exit(&sock->s_mx);
+ break;
+ }
+
+ nni_cond_wait(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ }
+}
+
+
// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket **sockp, uint16_t proto)
@@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
+ sock->s_closing = 0;
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = NNI_SECOND;
@@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (rv);
}
- NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node);
- NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node);
+ NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node);
+
+ if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) {
+ goto fail;
+ }
if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) ||
((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) {
@@ -81,6 +143,14 @@ fail:
if (sock->s_uwq != NULL) {
nni_msgqueue_destroy(sock->s_uwq);
}
+ if (sock->s_reaper != NULL) {
+ nni_thread *reap = sock->s_reaper;
+ nni_mutex_enter(&sock->s_mx);
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ nni_thread_reap(reap);
+ }
nni_cond_fini(&sock->s_cv);
nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
@@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock)
nni_pipe *pipe;
nni_endpt *ep;
nni_time linger;
+ nni_thread *reaper;
nni_mutex_enter(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
@@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock)
// safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- // Go through and close all the pipes.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
+ // Go through and schedule close on all pipes.
+ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
+ nni_list_remove(&sock->s_pipes, pipe);
+ pipe->p_active = 0;
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
}
- // At this point, the protocols should have all their operations
- // failing, if they have any remaining, and they should be returning
- // any pipes back to us very quickly. We'll wait for them to finish,
- // as it MUST occur shortly.
- while (nni_list_first(&sock->s_pipes) != NULL) {
- nni_cond_wait(&sock->s_cv);
- }
+ // Tell the reaper it's done once it finishes. Also kick it off.
+ reaper = sock->s_reaper;
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
// We already told the endpoints to shutdown. We just
// need to reap them now.
@@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
+ // Wait for the reaper to exit.
+ nni_thread_reap(reaper);
+
// At this point nothing else should be referencing us.
// The protocol needs to clean up its state.
sock->s_ops.proto_destroy(sock->s_data);
@@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock)
}
-// nni_socket_rem_pipe removes the pipe from the socket. This is often
-// called by the protocol when a pipe is removed due to close.
-void
-nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
-{
- nni_endpt *ep;
-
- nni_mutex_enter(&sock->s_mx);
- if (pipe->p_sock != sock) {
- nni_mutex_exit(&sock->s_mx);
- }
-
- // Remove the pipe from the protocol. Protocols may
- // keep lists of pipes for managing their topologies.
- sock->s_ops.proto_rem_pipe(sock->s_data, pipe);
-
- // Now remove it from our own list.
- nni_list_remove(&sock->s_pipes, pipe);
- pipe->p_sock = NULL;
-
- // If we were a connected (dialer) pipe, then let the endpoint
- // know so it can try to reestablish the connection.
- if ((ep = pipe->p_ep) != NULL) {
- ep->ep_pipe = NULL;
- pipe->p_ep = NULL;
- nni_mutex_enter(&ep->ep_mx);
- nni_cond_signal(&ep->ep_cv);
- nni_mutex_exit(&ep->ep_mx);
- }
-
- // XXX: also publish event...
- nni_pipe_destroy(pipe);
-
- // If we're closing, wake the socket if we finished draining.
- if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
- nni_cond_broadcast(&sock->s_cv);
- }
- nni_mutex_exit(&sock->s_mx);
-}
-
-
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
+nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
{
int rv;
@@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
nni_mutex_exit(&sock->s_mx);
return (NNG_ECLOSED);
}
- if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
+ rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata);
+ if (rv != 0) {
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
+ nni_cond_broadcast(&sock->s_cv);
nni_mutex_exit(&sock->s_mx);
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_active = 1;
- pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
@@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
if (rv != 0) {
nni_endpt_close(ep);
nni_endpt_destroy(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
- return (rv);
-}
-
-
-static void
-nni_socket_accepter(void *arg)
-{
- nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
- int rv;
-
- for (;;) {
- nni_mutex_enter(&ep->ep_mx);
- if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
- break;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- pipe = NULL;
-
- if (((rv = nni_endpt_accept(ep, &pipe)) != 0) ||
- ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) {
- if (rv == NNG_ECLOSED) {
- break;
- }
- if (pipe != NULL) {
- nni_pipe_destroy(pipe);
- }
- // XXX: Publish accept error event...
-
- // If we can't allocate memory, don't spin, so that
- // things get a chance to release memory later.
- // Other errors, like ECONNRESET, should not recur.
- // (If we find otherwise we can inject a short sleep
- // here of about 1 ms without too much penalty.)
- if (rv == NNG_ENOMEM) {
- nni_usleep(100000);
- }
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
}
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
+ return (rv);
}
int
-nni_socket_accept(nni_socket *sock, nni_endpt *ep)
+nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp,
+ int flags)
{
- int rv = 0;
+ nni_endpt *ep;
+ int rv;
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
- }
- if (ep->ep_sock != sock) { // Should never happen
- rv = NNG_EINVAL;
- goto out;
- }
- if (sock->s_closing) {
- rv = NNG_ECLOSED;
- goto out;
+ if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) {
+ return (rv);
}
- if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
+ rv = nni_endpt_listen(ep, flags);
+ if (rv != 0) {
+ nni_endpt_close(ep);
+ nni_endpt_destroy(ep);
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
+ }
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
- nni_list_append(&sock->s_eps, ep);
-out:
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
-
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4e1ea166..37a0e5eb 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -34,6 +34,9 @@ struct nng_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // pipes for this socket
+ nni_list s_reaps; // pipes to reap
+ nni_thread * s_reaper;
+
int s_closing; // Socket is closing
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
@@ -44,8 +47,7 @@ struct nng_socket {
extern int nni_socket_create(nni_socket **, uint16_t);
extern int nni_socket_close(nni_socket *);
-extern int nni_socket_add_pipe(nni_socket *, nni_pipe *, int);
-extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *);
+extern int nni_socket_add_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);