summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c198
-rw-r--r--src/core/endpt.h5
-rw-r--r--src/core/list.c17
-rw-r--r--src/core/list.h5
-rw-r--r--src/core/panic.c2
-rw-r--r--src/core/pipe.c42
-rw-r--r--src/core/pipe.h9
-rw-r--r--src/core/protocol.h4
-rw-r--r--src/core/socket.c239
-rw-r--r--src/core/socket.h6
-rw-r--r--src/nng.c8
-rw-r--r--src/platform/posix/posix_alloc.c2
-rw-r--r--src/platform/posix/posix_synch.c6
-rw-r--r--src/platform/posix/posix_thread.c2
-rw-r--r--src/protocol/pair/pair.c52
-rw-r--r--src/transport/inproc/inproc.c14
16 files changed, 384 insertions, 227 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 1c5f4db5..24d92206 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -36,7 +36,9 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr)
ep->ep_listener = NULL;
ep->ep_close = 0;
ep->ep_start = 0;
+ ep->ep_bound = 0;
ep->ep_pipe = NULL;
+ NNI_LIST_NODE_INIT(&ep->ep_node);
if ((rv = nni_mutex_init(&ep->ep_mx)) != 0) {
nni_free(ep, sizeof (*ep));
return (NNG_ENOMEM);
@@ -105,13 +107,21 @@ nni_endpt_close(nni_endpt *ep)
}
-int
-nni_endpt_bind(nni_endpt *ep)
+static int
+nni_endpt_connect(nni_endpt *ep, nni_pipe **pp)
{
- if (ep->ep_close) {
- return (NNG_ECLOSED);
+ nni_pipe *pipe;
+ int rv;
+
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
+ return (rv);
}
- return (ep->ep_ops.ep_bind(ep->ep_data));
+ if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_trandata)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
}
@@ -124,34 +134,12 @@ nni_dial_once(nni_endpt *ep)
nni_pipe *pipe;
int rv;
- pipe = NULL;
-
- if (ep->ep_close) {
- return (NNG_ECLOSED);
- }
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
- return (rv);
- }
- if ((rv = ep->ep_ops.ep_connect(ep->ep_data, &pipe->p_data)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
+ if (((rv = nni_endpt_connect(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ return (0);
}
- if ((rv = nni_socket_add_pipe(sock, pipe, 1)) != 0) {
- nni_pipe_destroy(pipe);
- return (rv);
- }
-
- nni_mutex_enter(&ep->ep_mx);
- if (!ep->ep_close) {
- // Set up the linkage so that when the pipe closes
- // we can notify the dialer to redial.
- pipe->p_ep = ep;
- ep->ep_pipe = pipe;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- return (0);
+ return (rv);
}
@@ -160,7 +148,6 @@ static void
nni_dialer(void *arg)
{
nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
nni_pipe *pipe;
int rv;
nni_time cooldown;
@@ -180,6 +167,10 @@ nni_dialer(void *arg)
while ((!ep->ep_close) && (ep->ep_pipe != NULL)) {
nni_cond_wait(&ep->ep_cv);
}
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
nni_mutex_exit(&ep->ep_mx);
rv = nni_dial_once(ep);
@@ -190,9 +181,12 @@ nni_dialer(void *arg)
case NNG_ENOMEM:
cooldown = 1000000;
break;
+ case NNG_ECLOSED:
+ break;
default:
// XXX: THIS NEEDS TO BE A PROPER BACKOFF.
- cooldown = 100000;
+ printf("COOLING DOWN!!\n");
+ cooldown = 1000000;
break;
}
// we inject a delay so we don't just spin hard on
@@ -200,9 +194,12 @@ nni_dialer(void *arg)
// wait even longer, since the system needs time to
// release resources.
cooldown += nni_clock();
+ nni_mutex_enter(&ep->ep_mx);
while (!ep->ep_close) {
+ // We need a different condvar...
nni_cond_waituntil(&ep->ep_cv, cooldown);
}
+ nni_mutex_exit(&ep->ep_mx);
}
}
@@ -212,15 +209,13 @@ nni_endpt_dial(nni_endpt *ep, int flags)
{
int rv = 0;
nni_thread *reap = NULL;
- nni_socket *sock = ep->ep_sock;
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
rv = NNG_EBUSY;
goto out;
}
- if (sock->s_closing || ep->ep_close) {
+ if (ep->ep_close) {
rv = NNG_ECLOSED;
goto out;
}
@@ -233,10 +228,9 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
rv = nni_dial_once(ep);
- nni_mutex_enter(&sock->s_mx);
nni_mutex_enter(&ep->ep_mx);
+
if (rv == 0) {
ep->ep_start = 1;
} else {
@@ -250,7 +244,6 @@ nni_endpt_dial(nni_endpt *ep, int flags)
}
out:
nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
if (reap != NULL) {
nni_thread_reap(reap);
@@ -269,14 +262,135 @@ nni_endpt_accept(nni_endpt *ep, nni_pipe **pp)
if (ep->ep_close) {
return (NNG_ECLOSED);
}
- if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ if ((rv = nni_pipe_create(&pipe, ep)) != 0) {
return (rv);
}
- if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_data)) != 0) {
+ if ((rv = ep->ep_ops.ep_accept(ep->ep_data, &pipe->p_trandata)) != 0) {
nni_pipe_destroy(pipe);
return (rv);
}
- pipe->p_ep = ep;
*pp = pipe;
return (0);
}
+
+
+static void
+nni_listener(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_start) && (!ep->ep_close) && (!ep->ep_stop)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_stop || ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ return;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ for (;;) {
+ nni_time cooldown;
+ nni_mutex_enter(&ep->ep_mx);
+
+ // If we didn't bind synchronously, do it now.
+ while (!ep->ep_bound && !ep->ep_close) {
+ int rv;
+
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ } else {
+ // Invalid address? Out of memory? Who knows.
+ // Try again in a bit (10ms).
+ // XXX: PROPER BACKOFF NEEDED
+ cooldown = 10000;
+ cooldown += nni_clock();
+ while (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv,
+ cooldown);
+ }
+ }
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_accept(ep, &pipe)) == 0) &&
+ ((rv = nni_socket_add_pipe(sock, pipe)) == 0)) {
+ continue;
+ }
+ if (rv == NNG_ECLOSED) {
+ break;
+ }
+ cooldown = 1000; // 1 ms cooldown
+ if (rv == NNG_ENOMEM) {
+ // For out of memory, we need to give more
+ // time for the system to reclaim resources.
+ cooldown = 100000; // 100ms
+ }
+ nni_mutex_enter(&ep->ep_mx);
+ if (!ep->ep_close) {
+ nni_cond_waituntil(&ep->ep_cv, cooldown);
+ }
+ nni_mutex_exit(&ep->ep_mx);
+ }
+}
+
+
+int
+nni_endpt_listen(nni_endpt *ep, int flags)
+{
+ int rv = 0;
+ nni_thread *reap = NULL;
+
+ nni_mutex_enter(&ep->ep_mx);
+ if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
+ rv = NNG_EBUSY;
+ goto out;
+ }
+ if (ep->ep_close) {
+ rv = NNG_ECLOSED;
+ goto out;
+ }
+
+ ep->ep_stop = 0;
+ ep->ep_start = (flags & NNG_FLAG_SYNCH) ? 0 : 1;
+ if (nni_thread_create(&ep->ep_listener, nni_listener, ep) != 0) {
+ rv = NNG_ENOMEM;
+ goto out;
+ }
+ if ((rv == 0) && (flags & NNG_FLAG_SYNCH)) {
+ nni_mutex_exit(&ep->ep_mx);
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ nni_mutex_enter(&ep->ep_mx);
+ if (rv == 0) {
+ ep->ep_bound = 1;
+ ep->ep_start = 1;
+ } else {
+ // This will cause the thread to exit instead of
+ // starting.
+ ep->ep_stop = 1;
+ reap = ep->ep_listener;
+ ep->ep_listener = NULL;
+ }
+ nni_cond_signal(&ep->ep_cv);
+ }
+out:
+ nni_mutex_exit(&ep->ep_mx);
+
+ if (reap != NULL) {
+ nni_thread_reap(reap);
+ }
+
+ return (rv);
+}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 71d3ff59..36f55de2 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -18,7 +18,7 @@
struct nng_endpt {
nni_endpt_ops ep_ops;
void * ep_data; // Transport private
- nni_list_node ep_sock_node; // Per socket list
+ nni_list_node ep_node; // Per socket list
nni_socket * ep_sock;
char ep_addr[NNG_MAXADDRLEN];
nni_thread * ep_dialer;
@@ -26,6 +26,7 @@ struct nng_endpt {
int ep_stop; // thread exits before start
int ep_start; // start thread running
int ep_close; // full shutdown
+ int ep_bound; // true if we bound locally
nni_mutex ep_mx;
nni_cond ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
@@ -33,9 +34,9 @@ struct nng_endpt {
extern int nni_endpt_create(nni_endpt **, nni_socket *, const char *);
extern void nni_endpt_destroy(nni_endpt *);
-extern int nni_endpt_listen(nni_endpt *);
extern int nni_endpt_accept(nni_endpt *, nni_pipe **);
extern void nni_endpt_close(nni_endpt *);
extern int nni_endpt_dial(nni_endpt *, int);
+extern int nni_endpt_listen(nni_endpt *, int);
#endif // CORE_ENDPT_H
diff --git a/src/core/list.c b/src/core/list.c
index 69a74db6..8d6c3ace 100644
--- a/src/core/list.c
+++ b/src/core/list.c
@@ -58,6 +58,9 @@ nni_list_append(nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("appending node already on a list or not inited");
+ }
node->ln_prev = list->ll_head.ln_prev;
node->ln_next = &list->ll_head;
node->ln_next->ln_prev = node;
@@ -70,6 +73,9 @@ nni_list_prepend(nni_list *list, void *item)
{
nni_list_node *node = NODE(list, item);
+ if ((node->ln_next != NULL) || (node->ln_prev != NULL)) {
+ nni_panic("prepending node already on a list or not inited");
+ }
node->ln_next = list->ll_head.ln_next;
node->ln_prev = &list->ll_head;
node->ln_next->ln_prev = node;
@@ -108,13 +114,6 @@ nni_list_remove(nni_list *list, void *item)
node->ln_prev->ln_next = node->ln_next;
node->ln_next->ln_prev = node->ln_prev;
-}
-
-
-void
-nni_list_node_init(nni_list *list, void *item)
-{
- nni_list_node *node = NODE(list, item);
-
- node->ln_prev = node->ln_next = NULL;
+ node->ln_next = NULL;
+ node->ln_prev = NULL;
}
diff --git a/src/core/list.h b/src/core/list.h
index 822d9621..9d95a527 100644
--- a/src/core/list.h
+++ b/src/core/list.h
@@ -28,6 +28,10 @@ extern void nni_list_init_offset(nni_list *list, size_t offset);
#define NNI_LIST_INIT(list, type, field) \
nni_list_init_offset(list, offsetof(type, field))
+
+#define NNI_LIST_NODE_INIT(node) \
+ { (node)->ln_prev = (node)->ln_next = 0; }
+
extern void *nni_list_first(nni_list *);
extern void *nni_list_last(nni_list *);
extern void nni_list_append(nni_list *, void *);
@@ -35,7 +39,6 @@ extern void nni_list_prepend(nni_list *, void *);
extern void *nni_list_next(nni_list *, void *);
extern void *nni_list_prev(nni_list *, void *);
extern void nni_list_remove(nni_list *, void *);
-extern void nni_list_node_init(nni_list *, void *);
#define NNI_LIST_FOREACH(l, it) \
for (it = nni_list_first(l); it != NULL; it = nni_list_next(l, it))
diff --git a/src/core/panic.c b/src/core/panic.c
index d0d6078d..2205779f 100644
--- a/src/core/panic.c
+++ b/src/core/panic.c
@@ -19,7 +19,7 @@
#include "core/nng_impl.h"
// Panic handling.
-static void
+void
nni_show_backtrace(void)
{
#if NNG_HAVE_BACKTRACE
diff --git a/src/core/pipe.c b/src/core/pipe.c
index cc92e6fe..0a7bbed1 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -25,14 +25,14 @@ nni_pipe_id(nni_pipe *p)
int
nni_pipe_send(nni_pipe *p, nng_msg *msg)
{
- return (p->p_ops.p_send(p->p_data, msg));
+ return (p->p_ops.p_send(p->p_trandata, msg));
}
int
nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
{
- return (p->p_ops.p_recv(p->p_data, msgp));
+ return (p->p_ops.p_recv(p->p_trandata, msgp));
}
@@ -42,38 +42,60 @@ nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
void
nni_pipe_close(nni_pipe *p)
{
- p->p_ops.p_close(p->p_data);
+ nni_socket *sock = p->p_sock;
+
+ p->p_ops.p_close(p->p_trandata);
+
+ nni_mutex_enter(&sock->s_mx);
+ if (!p->p_reap) {
+ // schedule deferred reap/close
+ p->p_reap = 1;
+ if (p->p_active) {
+ nni_list_remove(&sock->s_pipes, p);
+ p->p_active = 0;
+ }
+ nni_list_append(&sock->s_reaps, p);
+ nni_cond_broadcast(&sock->s_cv);
+ }
+ nni_mutex_exit(&sock->s_mx);
}
uint16_t
nni_pipe_peer(nni_pipe *p)
{
- return (p->p_ops.p_peer(p->p_data));
+ return (p->p_ops.p_peer(p->p_trandata));
}
void
nni_pipe_destroy(nni_pipe *p)
{
- if (p->p_data != NULL) {
- p->p_ops.p_destroy(p->p_data);
+ if (p->p_trandata != NULL) {
+ p->p_ops.p_destroy(p->p_trandata);
}
nni_free(p, sizeof (*p));
}
int
-nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops)
+nni_pipe_create(nni_pipe **pp, nni_endpt *ep)
{
nni_pipe *p;
if ((p = nni_alloc(sizeof (*p))) == NULL) {
return (NNG_ENOMEM);
}
- p->p_data = NULL;
- p->p_ops = *ops;
+ p->p_trandata = NULL;
+ p->p_protdata = NULL;
+ p->p_ops = *ep->ep_ops.ep_pipe_ops;
p->p_id = nni_plat_nextid();
+ p->p_ep = ep;
+ p->p_sock = ep->ep_sock;
+ if (ep->ep_dialer != NULL) {
+ ep->ep_pipe = p;
+ }
+ NNI_LIST_NODE_INIT(&p->p_node);
*pp = p;
return (0);
}
@@ -86,5 +108,5 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
if (p->p_ops.p_getopt == NULL) {
return (NNG_ENOTSUP);
}
- return (p->p_ops.p_getopt(p->p_data, opt, val, szp));
+ return (p->p_ops.p_getopt(p->p_trandata, opt, val, szp));
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 5349bbb1..4594f65b 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -19,10 +19,13 @@
struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
- void * p_data;
- nni_list_node p_sock_node;
+ void * p_trandata;
+ void * p_protdata;
+ nni_list_node p_node;
nni_socket * p_sock;
nni_endpt * p_ep;
+ int p_reap;
+ int p_active;
};
// Pipe operations that protocols use.
@@ -33,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *);
+extern int nni_pipe_create(nni_pipe **, nni_endpt *);
extern void nni_pipe_destroy(nni_pipe *);
diff --git a/src/core/protocol.h b/src/core/protocol.h
index f73825d7..63683a6b 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -32,8 +32,8 @@ struct nni_protocol {
// Add and remove pipes. These are called as connections are
// created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *);
- int (*proto_rem_pipe)(void *, nni_pipe *);
+ int (*proto_add_pipe)(void *, nni_pipe *, void **);
+ void (*proto_rem_pipe)(void *, void *);
// Option manipulation. These may be NULL.
int (*proto_setopt)(void *, int, const void *, size_t);
diff --git a/src/core/socket.c b/src/core/socket.c
index 9263b142..ce5dbf3c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -29,6 +29,62 @@ nni_socket_recvq(nni_socket *s)
}
+// Because we have to call back into the socket, and possibly also the proto,
+// and wait for threads to terminate, we do this in a special thread. The
+// assumption is that closing is always a "fast" operation.
+static void
+nni_reaper(void *arg)
+{
+ nni_socket *sock = arg;
+
+ for (;;) {
+ nni_pipe *pipe;
+ nni_endpt *ep;
+
+ nni_mutex_enter(&sock->s_mx);
+ if ((pipe = nni_list_first(&sock->s_reaps)) != NULL) {
+ nni_list_remove(&sock->s_reaps, pipe);
+ nni_mutex_exit(&sock->s_mx);
+
+ // This should already have been done.
+ pipe->p_ops.p_close(pipe->p_trandata);
+
+ // Remove the pipe from the protocol. Protocols may
+ // keep lists of pipes for managing their topologies.
+ // Note that if a protocol has rejected the pipe, it
+ // won't have any data.
+ if (pipe->p_protdata != NULL) {
+ sock->s_ops.proto_rem_pipe(sock->s_data,
+ pipe->p_protdata);
+ }
+
+ // If pipe was a connected (dialer) pipe,
+ // then let the endpoint know so it can try to
+ // reestablish the connection.
+ if ((ep = pipe->p_ep) != NULL) {
+ ep->ep_pipe = NULL;
+ pipe->p_ep = NULL;
+ nni_mutex_enter(&ep->ep_mx);
+ nni_cond_signal(&ep->ep_cv);
+ nni_mutex_exit(&ep->ep_mx);
+ }
+
+ // XXX: also publish event...
+ nni_pipe_destroy(pipe);
+ continue;
+ }
+
+ if (sock->s_reaper == NULL) {
+ nni_mutex_exit(&sock->s_mx);
+ break;
+ }
+
+ nni_cond_wait(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ }
+}
+
+
// nn_socket_create creates the underlying socket.
int
nni_socket_create(nni_socket **sockp, uint16_t proto)
@@ -47,6 +103,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
+ sock->s_closing = 0;
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = NNI_SECOND;
@@ -60,8 +117,13 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (rv);
}
- NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node);
- NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node);
+ NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_node);
+
+ if ((rv = nni_thread_create(&sock->s_reaper, nni_reaper, sock)) != 0) {
+ goto fail;
+ }
if (((rv = nni_msgqueue_create(&sock->s_uwq, 0)) != 0) ||
((rv = nni_msgqueue_create(&sock->s_urq, 0)) != 0)) {
@@ -81,6 +143,14 @@ fail:
if (sock->s_uwq != NULL) {
nni_msgqueue_destroy(sock->s_uwq);
}
+ if (sock->s_reaper != NULL) {
+ nni_thread *reap = sock->s_reaper;
+ nni_mutex_enter(&sock->s_mx);
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
+ nni_mutex_exit(&sock->s_mx);
+ nni_thread_reap(reap);
+ }
nni_cond_fini(&sock->s_cv);
nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
@@ -95,6 +165,7 @@ nni_socket_close(nni_socket *sock)
nni_pipe *pipe;
nni_endpt *ep;
nni_time linger;
+ nni_thread *reaper;
nni_mutex_enter(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
@@ -143,18 +214,18 @@ nni_socket_close(nni_socket *sock)
// safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- // Go through and close all the pipes.
- NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
- nni_pipe_close(pipe);
+ // Go through and schedule close on all pipes.
+ while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
+ nni_list_remove(&sock->s_pipes, pipe);
+ pipe->p_active = 0;
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
}
- // At this point, the protocols should have all their operations
- // failing, if they have any remaining, and they should be returning
- // any pipes back to us very quickly. We'll wait for them to finish,
- // as it MUST occur shortly.
- while (nni_list_first(&sock->s_pipes) != NULL) {
- nni_cond_wait(&sock->s_cv);
- }
+ // Tell the reaper it's done once it finishes. Also kick it off.
+ reaper = sock->s_reaper;
+ sock->s_reaper = NULL;
+ nni_cond_broadcast(&sock->s_cv);
// We already told the endpoints to shutdown. We just
// need to reap them now.
@@ -167,6 +238,9 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
+ // Wait for the reaper to exit.
+ nni_thread_reap(reaper);
+
// At this point nothing else should be referencing us.
// The protocol needs to clean up its state.
sock->s_ops.proto_destroy(sock->s_data);
@@ -264,49 +338,8 @@ nni_socket_proto(nni_socket *sock)
}
-// nni_socket_rem_pipe removes the pipe from the socket. This is often
-// called by the protocol when a pipe is removed due to close.
-void
-nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
-{
- nni_endpt *ep;
-
- nni_mutex_enter(&sock->s_mx);
- if (pipe->p_sock != sock) {
- nni_mutex_exit(&sock->s_mx);
- }
-
- // Remove the pipe from the protocol. Protocols may
- // keep lists of pipes for managing their topologies.
- sock->s_ops.proto_rem_pipe(sock->s_data, pipe);
-
- // Now remove it from our own list.
- nni_list_remove(&sock->s_pipes, pipe);
- pipe->p_sock = NULL;
-
- // If we were a connected (dialer) pipe, then let the endpoint
- // know so it can try to reestablish the connection.
- if ((ep = pipe->p_ep) != NULL) {
- ep->ep_pipe = NULL;
- pipe->p_ep = NULL;
- nni_mutex_enter(&ep->ep_mx);
- nni_cond_signal(&ep->ep_cv);
- nni_mutex_exit(&ep->ep_mx);
- }
-
- // XXX: also publish event...
- nni_pipe_destroy(pipe);
-
- // If we're closing, wake the socket if we finished draining.
- if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
- nni_cond_broadcast(&sock->s_cv);
- }
- nni_mutex_exit(&sock->s_mx);
-}
-
-
int
-nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
+nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
{
int rv;
@@ -315,13 +348,17 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe, int dialer)
nni_mutex_exit(&sock->s_mx);
return (NNG_ECLOSED);
}
- if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
+ rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, &pipe->p_protdata);
+ if (rv != 0) {
+ pipe->p_reap = 1;
+ nni_list_append(&sock->s_reaps, pipe);
+ nni_cond_broadcast(&sock->s_cv);
nni_mutex_exit(&sock->s_mx);
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+ pipe->p_active = 1;
- pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
@@ -341,82 +378,40 @@ nni_socket_dial(nni_socket *sock, const char *addr, nni_endpt **epp, int flags)
if (rv != 0) {
nni_endpt_close(ep);
nni_endpt_destroy(ep);
- } else if (epp != NULL) {
- *epp = ep;
- }
- return (rv);
-}
-
-
-static void
-nni_socket_accepter(void *arg)
-{
- nni_endpt *ep = arg;
- nni_socket *sock = ep->ep_sock;
- nni_pipe *pipe;
- int rv;
-
- for (;;) {
- nni_mutex_enter(&ep->ep_mx);
- if (ep->ep_close) {
- nni_mutex_exit(&ep->ep_mx);
- break;
- }
- nni_mutex_exit(&ep->ep_mx);
-
- pipe = NULL;
-
- if (((rv = nni_endpt_accept(ep, &pipe)) != 0) ||
- ((rv = nni_socket_add_pipe(sock, pipe, 0)) != 0)) {
- if (rv == NNG_ECLOSED) {
- break;
- }
- if (pipe != NULL) {
- nni_pipe_destroy(pipe);
- }
- // XXX: Publish accept error event...
-
- // If we can't allocate memory, don't spin, so that
- // things get a chance to release memory later.
- // Other errors, like ECONNRESET, should not recur.
- // (If we find otherwise we can inject a short sleep
- // here of about 1 ms without too much penalty.)
- if (rv == NNG_ENOMEM) {
- nni_usleep(100000);
- }
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
}
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
+ return (rv);
}
int
-nni_socket_accept(nni_socket *sock, nni_endpt *ep)
+nni_socket_listen(nni_socket *sock, const char *addr, nni_endpt **epp,
+ int flags)
{
- int rv = 0;
+ nni_endpt *ep;
+ int rv;
- nni_mutex_enter(&sock->s_mx);
- nni_mutex_enter(&ep->ep_mx);
- if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) {
- rv = NNG_EBUSY;
- goto out;
- }
- if (ep->ep_sock != sock) { // Should never happen
- rv = NNG_EINVAL;
- goto out;
- }
- if (sock->s_closing) {
- rv = NNG_ECLOSED;
- goto out;
+ if ((rv = nni_endpt_create(&ep, sock, addr)) != 0) {
+ return (rv);
}
- if (nni_thread_create(&ep->ep_dialer, nni_socket_accepter, ep) != 0) {
- rv = NNG_ENOMEM;
- goto out;
+ rv = nni_endpt_listen(ep, flags);
+ if (rv != 0) {
+ nni_endpt_close(ep);
+ nni_endpt_destroy(ep);
+ } else {
+ if (epp != NULL) {
+ *epp = ep;
+ }
+ nni_mutex_enter(&sock->s_mx);
+ nni_list_append(&sock->s_eps, ep);
+ nni_mutex_exit(&sock->s_mx);
}
- nni_list_append(&sock->s_eps, ep);
-out:
- nni_mutex_exit(&ep->ep_mx);
- nni_mutex_exit(&sock->s_mx);
-
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4e1ea166..37a0e5eb 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -34,6 +34,9 @@ struct nng_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // pipes for this socket
+ nni_list s_reaps; // pipes to reap
+ nni_thread * s_reaper;
+
int s_closing; // Socket is closing
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
@@ -44,8 +47,7 @@ struct nng_socket {
extern int nni_socket_create(nni_socket **, uint16_t);
extern int nni_socket_close(nni_socket *);
-extern int nni_socket_add_pipe(nni_socket *, nni_pipe *, int);
-extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *);
+extern int nni_socket_add_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);
diff --git a/src/nng.c b/src/nng.c
index 8cf0d04f..fb2d0861 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -104,6 +104,14 @@ nng_dial(nng_socket *s, const char *addr, nng_endpt **epp, int flags)
int
+nng_listen(nng_socket *s, const char *addr, nng_endpt **epp, int flags)
+{
+ NNI_INIT_INT();
+ return (nni_socket_listen(s, addr, epp, flags));
+}
+
+
+int
nng_setopt(nng_socket *s, int opt, const void *val, size_t sz)
{
NNI_INIT_INT();
diff --git a/src/platform/posix/posix_alloc.c b/src/platform/posix/posix_alloc.c
index 83fe305d..98a76669 100644
--- a/src/platform/posix/posix_alloc.c
+++ b/src/platform/posix/posix_alloc.c
@@ -17,7 +17,7 @@
void *
nni_alloc(size_t size)
{
- return (malloc(size));
+ return (calloc(1, size));
}
diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c
index 2fb92915..0526eb7c 100644
--- a/src/platform/posix/posix_synch.c
+++ b/src/platform/posix/posix_synch.c
@@ -45,8 +45,10 @@ nni_mutex_fini(nni_mutex *mp)
void
nni_mutex_enter(nni_mutex *m)
{
- if (pthread_mutex_lock(&m->mx) != 0) {
- nni_panic("pthread_mutex_lock failed");
+ int rv;
+
+ if ((rv = pthread_mutex_lock(&m->mx)) != 0) {
+ nni_panic("pthread_mutex_lock failed: %s", strerror(rv));
}
}
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 3fd9bfe7..f5cad7a9 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -80,7 +80,7 @@ nni_thread_reap(nni_thread *thr)
int rv;
if ((rv = pthread_join(thr->tid, NULL)) != 0) {
- nni_panic("pthread_thread: %s", strerror(errno));
+ nni_panic("pthread_thread: %s", strerror(rv));
}
nni_free(thr, sizeof (*thr));
}
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index 692f4b0e..98995186 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -16,27 +16,30 @@
// While a peer is connected to the server, all other peer connection
// attempts are discarded.
+typedef struct nni_pair_pipe nni_pair_pipe;
+typedef struct nni_pair_sock nni_pair_sock;
+
// An nni_pair_sock is our per-socket protocol private structure.
-typedef struct nni_pair_sock {
+struct nni_pair_sock {
nni_socket * sock;
- nni_pipe * pipe;
+ nni_pair_pipe * pipe;
nni_mutex mx;
nni_msgqueue * uwq;
nni_msgqueue * urq;
-} nni_pair_sock;
+};
// An nni_pair_pipe is our per-pipe protocol private structure. We keep
// one of these even though in theory we'd only have a single underlying
// pipe. The separate data structure is more like other protocols that do
// manage multiple pipes.
-typedef struct nni_pair_pipe {
+struct nni_pair_pipe {
nni_pipe * pipe;
nni_pair_sock * pair;
int good;
nni_thread * sthr;
nni_thread * rthr;
int sigclose;
-} nni_pair_pipe;
+};
static void nni_pair_receiver(void *);
static void nni_pair_sender(void *);
@@ -55,6 +58,7 @@ nni_pair_create(void **pairp, nni_socket *sock)
return (rv);
}
pair->sock = sock;
+ pair->pipe = NULL;
pair->uwq = nni_socket_sendq(sock);
pair->urq = nni_socket_recvq(sock);
*pairp = pair;
@@ -77,7 +81,7 @@ nni_pair_destroy(void *arg)
static int
-nni_pair_add_pipe(void *arg, nni_pipe *pipe)
+nni_pair_add_pipe(void *arg, nni_pipe *pipe, void **datap)
{
nni_pair_sock *pair = arg;
nni_pair_pipe *pp;
@@ -89,6 +93,7 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe)
pp->sigclose = 0;
pp->sthr = NULL;
pp->rthr = NULL;
+ pp->pair = pair;
nni_mutex_enter(&pair->mx);
if (pair->pipe != NULL) {
@@ -106,31 +111,33 @@ nni_pair_add_pipe(void *arg, nni_pipe *pipe)
return (rv);
}
pp->good = 1;
- pair->pipe = pipe;
+ pair->pipe = pp;
+ *datap = pp;
nni_mutex_exit(&pair->mx);
- return (NNG_EINVAL);
+ return (0);
}
-static int
-nni_pair_rem_pipe(void *arg, nni_pipe *pipe)
+static void
+nni_pair_rem_pipe(void *arg, void *data)
{
- nni_pair_pipe *pp = arg;
- nni_pair_sock *pair = pp->pair;
+ nni_pair_sock *pair = arg;
+ nni_pair_pipe *pp = data;
- if (pp->sthr) {
- (void) nni_thread_reap(pp->sthr);
- }
- if (pp->rthr) {
- (void) nni_thread_reap(pp->rthr);
- }
nni_mutex_enter(&pair->mx);
- if (pair->pipe != pipe) {
+ if (pair->pipe != pp) {
nni_mutex_exit(&pair->mx);
- return (NNG_EINVAL);
+ return;
}
+ pair->pipe = NULL;
nni_mutex_exit(&pair->mx);
- return (NNG_EINVAL);
+
+ if (pp->sthr != NULL) {
+ (void) nni_thread_reap(pp->sthr);
+ }
+ if (pp->rthr != NULL) {
+ (void) nni_thread_reap(pp->rthr);
+ }
}
@@ -166,7 +173,6 @@ nni_pair_sender(void *arg)
}
nni_msgqueue_signal(urq, &pp->sigclose);
nni_pipe_close(pipe);
- nni_socket_rem_pipe(pair->sock, pipe);
}
@@ -187,7 +193,6 @@ nni_pair_receiver(void *arg)
return;
}
nni_mutex_exit(&pair->mx);
-
for (;;) {
rv = nni_pipe_recv(pipe, &msg);
if (rv != 0) {
@@ -201,7 +206,6 @@ nni_pair_receiver(void *arg)
}
nni_msgqueue_signal(uwq, &pp->sigclose);
nni_pipe_close(pipe);
- nni_socket_rem_pipe(pair->sock, pipe);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 0fc0a72c..5a9c0472 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -199,8 +199,7 @@ nni_inproc_ep_create(void **epp, const char *url, uint16_t proto)
ep->mode = NNI_INPROC_EP_IDLE;
ep->closed = 0;
ep->proto = proto;
- nni_list_node_init(&nni_inproc.eps, ep);
- nni_list_append(&nni_inproc.eps, ep);
+ NNI_LIST_NODE_INIT(&ep->node);
(void) snprintf(ep->addr, sizeof (ep->addr), "%s", url);
*epp = ep;
return (0);
@@ -227,7 +226,10 @@ nni_inproc_ep_close(void *arg)
nni_mutex_enter(&nni_inproc.mx);
if (!ep->closed) {
ep->closed = 1;
- nni_list_remove(&nni_inproc.eps, ep);
+ if ((ep->mode == NNI_INPROC_EP_LISTEN) ||
+ (ep->mode == NNI_INPROC_EP_DIAL)) {
+ nni_list_remove(&nni_inproc.eps, ep);
+ }
nni_cond_broadcast(&nni_inproc.cv);
}
nni_mutex_exit(&nni_inproc.mx);
@@ -273,7 +275,6 @@ nni_inproc_ep_connect(void *arg, void **pipep)
nni_cond_wait(&nni_inproc.cv);
}
// NB: The acceptor or closer removes us from the list.
- ep->mode = NNI_INPROC_EP_IDLE;
*pipep = ep->cpipe;
nni_mutex_exit(&nni_inproc.mx);
return (ep->closed ? NNG_ECLOSED : 0);
@@ -333,7 +334,7 @@ nni_inproc_ep_accept(void *arg, void **pipep)
return (rv);
}
if (((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0) ||
- ((rv = nni_msgqueue_create(&pair->q[0], 4)) != 0)) {
+ ((rv = nni_msgqueue_create(&pair->q[1], 4)) != 0)) {
nni_inproc_pair_destroy(pair);
return (rv);
}
@@ -360,6 +361,9 @@ nni_inproc_ep_accept(void *arg, void **pipep)
}
nni_cond_wait(&nni_inproc.cv);
}
+
+ nni_list_remove(&nni_inproc.eps, srch);
+ srch->mode = NNI_INPROC_EP_IDLE;
(void) snprintf(pair->addr, sizeof (pair->addr), "%s", ep->addr);
pair->pipe[0].rq = pair->pipe[1].wq = pair->q[0];
pair->pipe[1].rq = pair->pipe[0].wq = pair->q[1];