aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c395
-rw-r--r--src/core/endpt.h7
-rw-r--r--src/core/pipe.c14
-rw-r--r--src/core/pipe.h2
-rw-r--r--src/core/socket.c87
-rw-r--r--src/core/socket.h2
-rw-r--r--src/nng.c12
-rw-r--r--src/platform/windows/win_iocp.c2
8 files changed, 202 insertions, 319 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 8eb7dd12..fef6ac83 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -16,69 +16,31 @@
// Functionality related to end points.
-static nni_objhash *nni_eps = NULL;
-static void * nni_ep_ctor(uint32_t);
-static void nni_ep_dtor(void *);
+static void nni_ep_accept_start(nni_ep *);
+static void nni_ep_accept_done(void *);
+
+static nni_idhash *nni_eps;
int
nni_ep_sys_init(void)
{
int rv;
- rv = nni_objhash_init(&nni_eps, nni_ep_ctor, nni_ep_dtor);
- if (rv != 0) {
+ if ((rv = nni_idhash_init(&nni_eps)) != 0) {
return (rv);
}
- return (rv);
-}
-
-void
-nni_ep_sys_fini(void)
-{
- nni_objhash_fini(nni_eps);
- nni_eps = NULL;
-}
-
-int
-nni_ep_find(nni_ep **epp, uint32_t id)
-{
- int rv;
- nni_ep *ep;
- if ((rv = nni_init()) != 0) {
- return (rv);
- }
+ nni_idhash_set_limits(
+ nni_eps, 1, 0x7fffffff, nni_random() & 0x7fffffff);
- rv = nni_objhash_find(nni_eps, id, (void **) &ep);
- if (rv != 0) {
- return (NNG_ECLOSED);
- }
- nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- nni_objhash_unref(nni_eps, id);
- return (NNG_ECLOSED);
- }
- nni_mtx_unlock(&ep->ep_mtx);
- if (epp != NULL) {
- *epp = ep;
- }
return (0);
}
void
-nni_ep_hold(nni_ep *ep)
-{
- int rv;
-
- rv = nni_objhash_find(nni_eps, ep->ep_id, NULL);
- NNI_ASSERT(rv == 0);
-}
-
-void
-nni_ep_rele(nni_ep *ep)
+nni_ep_sys_fini(void)
{
- nni_objhash_unref(nni_eps, ep->ep_id);
+ nni_idhash_fini(nni_eps);
+ nni_eps = NULL;
}
uint32_t
@@ -87,47 +49,20 @@ nni_ep_id(nni_ep *ep)
return (ep->ep_id);
}
-static void *
-nni_ep_ctor(uint32_t id)
-{
- nni_ep *ep;
- int rv;
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NULL);
- }
- ep->ep_closed = 0;
- ep->ep_bound = 0;
- ep->ep_pipe = NULL;
- ep->ep_id = id;
- ep->ep_data = NULL;
-
- NNI_LIST_NODE_INIT(&ep->ep_node);
-
- nni_pipe_ep_list_init(&ep->ep_pipes);
-
- if ((rv = nni_mtx_init(&ep->ep_mtx)) != 0) {
- NNI_FREE_STRUCT(ep);
- return (NULL);
- }
-
- if ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) {
- nni_mtx_fini(&ep->ep_mtx);
- NNI_FREE_STRUCT(ep);
- return (NULL);
- }
-
- return (ep);
-}
-
static void
-nni_ep_dtor(void *ptr)
+nni_ep_destroy(nni_ep *ep)
{
- nni_ep *ep = ptr;
-
+ if (ep == NULL) {
+ return;
+ }
+ nni_aio_fini(&ep->ep_acc_aio);
if (ep->ep_data != NULL) {
ep->ep_ops.ep_fini(ep->ep_data);
}
+ if (ep->ep_id != 0) {
+ nni_idhash_remove(nni_eps, ep->ep_id);
+ }
+ nni_thr_fini(&ep->ep_thr);
nni_cv_fini(&ep->ep_cv);
nni_mtx_fini(&ep->ep_mtx);
NNI_FREE_STRUCT(ep);
@@ -148,10 +83,31 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
return (NNG_EINVAL);
}
- rv = nni_objhash_alloc(nni_eps, &id, (void **) &ep);
+ if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ ep->ep_closed = 0;
+ ep->ep_bound = 0;
+ ep->ep_pipe = NULL;
+ ep->ep_id = id;
+ ep->ep_data = NULL;
+
+ NNI_LIST_NODE_INIT(&ep->ep_node);
+
+ nni_pipe_ep_list_init(&ep->ep_pipes);
+
+ if (((rv = nni_mtx_init(&ep->ep_mtx)) != 0) ||
+ ((rv = nni_cv_init(&ep->ep_cv, &ep->ep_mtx)) != 0) ||
+ ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0)) {
+ nni_ep_destroy(ep);
+ return (rv);
+ }
+ rv = nni_aio_init(&ep->ep_acc_aio, nni_ep_accept_done, ep);
if (rv != 0) {
+ nni_ep_destroy(ep);
return (rv);
}
+
ep->ep_sock = sock;
ep->ep_tran = tran;
ep->ep_mode = mode;
@@ -165,12 +121,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
ep->ep_ops = *tran->tran_ep;
if ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock, mode)) != 0) {
- nni_objhash_unref(nni_eps, id);
- return (rv);
- }
-
- if ((rv = nni_sock_ep_add(sock, ep)) != 0) {
- nni_objhash_unref(nni_eps, id);
+ nni_ep_destroy(ep);
return (rv);
}
@@ -179,8 +130,11 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr, int mode)
}
void
-nni_ep_stop(nni_ep *ep)
+nni_ep_close(nni_ep *ep)
{
+ // Abort any in-flight operations.
+ nni_aio_stop(&ep->ep_acc_aio);
+
nni_mtx_lock(&ep->ep_mtx);
if (ep->ep_closed == 0) {
ep->ep_closed = 1;
@@ -190,38 +144,33 @@ nni_ep_stop(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
}
-void
-nni_ep_close(nni_ep *ep)
-{
- nni_ep_stop(ep);
-}
-
-void
-nni_ep_remove(nni_ep *ep)
+static void
+nni_ep_reap(nni_ep *ep)
{
nni_pipe *pipe;
- nni_sock *sock = ep->ep_sock;
- nni_ep_stop(ep);
+ nni_ep_close(ep); // Extra sanity.
- nni_thr_wait(&ep->ep_thr);
+ // Take us off the sock list.
+ nni_sock_ep_remove(ep->ep_sock, ep);
- nni_mtx_lock(&ep->ep_mtx);
- NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
- nni_pipe_close(pipe);
- }
- nni_mtx_unlock(&ep->ep_mtx);
+ nni_ep_destroy(ep);
+}
+void
+nni_ep_stop(nni_ep *ep)
+{
nni_mtx_lock(&ep->ep_mtx);
- while (nni_list_first(&ep->ep_pipes) != NULL) {
- nni_cv_wait(&ep->ep_cv);
+
+ // Protection against recursion.
+ if (ep->ep_stop) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return;
}
+ ep->ep_stop = 1;
+ nni_taskq_ent_init(&ep->ep_reap_tqe, (nni_cb) nni_ep_reap, ep);
+ nni_taskq_dispatch(NULL, &ep->ep_reap_tqe);
nni_mtx_unlock(&ep->ep_mtx);
-
- nni_sock_ep_remove(sock, ep);
-
- nni_thr_fini(&ep->ep_thr);
- nni_objhash_unref(nni_eps, ep->ep_id);
}
static int
@@ -253,17 +202,20 @@ nni_ep_connect_sync(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
return (NNG_ECLOSED);
}
- rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran);
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
if (rv != 0) {
nni_mtx_unlock(&ep->ep_mtx);
return (rv);
}
+ pipe->p_ep = ep;
nni_list_append(&ep->ep_pipes, pipe);
nni_mtx_unlock(&ep->ep_mtx);
rv = nni_ep_connect_aio(ep, &pipe->p_tran_data);
if (rv != 0) {
- nni_pipe_stop(pipe);
+ if (rv != NNG_ECLOSED) { // HACK ALERT
+ nni_pipe_stop(pipe);
+ }
return (rv);
}
nni_pipe_start(pipe);
@@ -273,24 +225,6 @@ nni_ep_connect_sync(nni_ep *ep)
return (0);
}
-void
-nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
-{
- if (ep == NULL) {
- return;
- }
- nni_mtx_lock(&ep->ep_mtx);
- if (nni_list_active(&ep->ep_pipes, pipe)) {
- nni_list_remove(&ep->ep_pipes, pipe);
-
- if (ep->ep_pipe == pipe) {
- ep->ep_pipe = NULL;
- }
- nni_cv_wake(&ep->ep_cv);
- }
- nni_mtx_unlock(&ep->ep_mtx);
-}
-
// nni_dialer is the thread worker that dials in the background.
static void
nni_dialer(void *arg)
@@ -396,131 +330,81 @@ nni_ep_dial(nni_ep *ep, int flags)
return (rv);
}
-static int
-nni_ep_accept_aio(nni_ep *ep, void **pipep)
-{
- nni_aio aio;
- int rv;
+static void nni_ep_accept_start(nni_ep *ep);
- nni_aio_init(&aio, NULL, NULL);
- aio.a_endpt = ep->ep_data;
- ep->ep_ops.ep_accept(ep->ep_data, &aio);
- nni_aio_wait(&aio);
-
- if ((rv = nni_aio_result(&aio)) == 0) {
- *pipep = aio.a_pipe;
- }
- nni_aio_fini(&aio);
- return (rv);
-}
-
-static int
-nni_ep_accept_sync(nni_ep *ep)
+static void
+nni_ep_accept_done(void *arg)
{
- nni_pipe *pipe;
- int rv;
+ nni_ep * ep = arg;
+ nni_aio * aio = &ep->ep_acc_aio;
+ void * tpipe;
+ nni_pipe * pipe;
+ int rv;
+ const nni_tran_pipe *ops;
+
+ ops = ep->ep_tran->tran_pipe;
nni_mtx_lock(&ep->ep_mtx);
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ECLOSED);
+ if ((rv = nni_aio_result(aio)) != 0) {
+ goto done;
}
- rv = nni_pipe_create(&pipe, ep, ep->ep_sock, ep->ep_tran);
- if (rv != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
- nni_list_append(&ep->ep_pipes, pipe);
- nni_mtx_unlock(&ep->ep_mtx);
+ NNI_ASSERT((tpipe = aio->a_pipe) != NULL);
- rv = nni_ep_accept_aio(ep, &pipe->p_tran_data);
+ rv = nni_pipe_create(&pipe, ep->ep_sock, ep->ep_tran);
if (rv != 0) {
- nni_pipe_stop(pipe);
- return (rv);
+ ops->p_fini(tpipe);
+ goto done;
+ }
+
+done:
+
+ switch (rv) {
+ case 0:
+ pipe->p_tran_ops = *ops;
+ pipe->p_tran_data = tpipe;
+ nni_pipe_start(pipe);
+ nni_ep_accept_start(ep);
+ break;
+ case NNG_ECLOSED:
+ case NNG_ECANCELED:
+ // Canceled or closed, no furhter action.
+ break;
+ case NNG_ECONNABORTED:
+ case NNG_ECONNRESET:
+ // These are remote conditions, no cool down.
+ // cooldown = 0;
+ nni_ep_accept_start(ep);
+ break;
+ case NNG_ENOMEM:
+ // We're running low on memory, so its best to wait
+ // a whole second to give the system a chance to
+ // recover memory.
+ // cooldown = 1000000;
+ nni_ep_accept_start(ep);
+ break;
+ default:
+ // other cases... sleep a tiny bit then try again.
+ // cooldown = 1000; 10msec
+ // Add a timeout here instead to avoid spinning.
+ nni_ep_accept_start(ep);
+ break;
}
- nni_pipe_start(pipe);
- return (0);
+ nni_mtx_unlock(&ep->ep_mtx);
}
static void
-nni_listener(void *arg)
+nni_ep_accept_start(nni_ep *ep)
{
- nni_ep *ep = arg;
- int rv;
-
- for (;;) {
- nni_time cooldown;
- nni_mtx_lock(&ep->ep_mtx);
+ nni_aio *aio = &ep->ep_acc_aio;
- // If we didn't bind synchronously, do it now.
- while (!ep->ep_bound && !ep->ep_closed) {
- int rv;
-
- nni_mtx_unlock(&ep->ep_mtx);
- rv = ep->ep_ops.ep_bind(ep->ep_data);
- nni_mtx_lock(&ep->ep_mtx);
-
- if (rv == 0) {
- ep->ep_bound = 1;
- break;
- }
- // Invalid address? Out of memory? Who knows.
- // Try again in a bit (10ms).
- // XXX: PROPER BACKOFF NEEDED
- cooldown = 10000;
- cooldown += nni_clock();
- while (!ep->ep_closed) {
- rv = nni_cv_until(&ep->ep_cv, cooldown);
- if (rv == NNG_ETIMEDOUT) {
- break;
- }
- }
- }
- if (ep->ep_closed) {
- nni_mtx_unlock(&ep->ep_mtx);
- break;
- }
- nni_mtx_unlock(&ep->ep_mtx);
-
- if ((rv = nni_ep_accept_sync(ep)) == 0) {
- // Success! Loop around for the next one.
- continue;
- }
-
- switch (rv) {
- case NNG_ECLOSED:
- // This indicates the listening socket got closed.
- // We just bail.
- return;
-
- case NNG_ECONNABORTED:
- case NNG_ECONNRESET:
- // These are remote conditions, no cool down.
- cooldown = 0;
- break;
- case NNG_ENOMEM:
- // We're running low on memory, so its best to wait
- // a whole second to give the system a chance to
- // recover memory.
- cooldown = 1000000;
- break;
- default:
- // Other cases we sleep just a tiny bit to avoid
- // burning the cpu (e.g. out of files).
- cooldown = 1000; // 1 msec
- break;
- }
- cooldown += nni_clock();
- nni_mtx_lock(&ep->ep_mtx);
- while (!ep->ep_closed) {
- rv = nni_cv_until(&ep->ep_cv, cooldown);
- if (rv == NNG_ETIMEDOUT) {
- break;
- }
- }
- nni_mtx_unlock(&ep->ep_mtx);
+ // Call with the Endpoint lock held.
+ if (ep->ep_closed) {
+ return;
}
+
+ aio->a_endpt = ep->ep_data;
+ ep->ep_ops.ep_accept(ep->ep_data, aio);
}
int
@@ -542,26 +426,17 @@ nni_ep_listen(nni_ep *ep, int flags)
return (NNG_ECLOSED);
}
- if ((rv = nni_thr_init(&ep->ep_thr, nni_listener, ep)) != 0) {
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
- }
-
ep->ep_started = 1;
- if (flags & NNG_FLAG_SYNCH) {
+ rv = ep->ep_ops.ep_bind(ep->ep_data);
+ if (rv != 0) {
+ ep->ep_started = 0;
nni_mtx_unlock(&ep->ep_mtx);
- rv = ep->ep_ops.ep_bind(ep->ep_data);
- if (rv != 0) {
- nni_thr_fini(&ep->ep_thr);
- ep->ep_started = 0;
- return (rv);
- }
- nni_mtx_lock(&ep->ep_mtx);
- ep->ep_bound = 1;
+ return (rv);
}
+ ep->ep_bound = 1;
- nni_thr_run(&ep->ep_thr);
+ nni_ep_accept_start(ep);
nni_mtx_unlock(&ep->ep_mtx);
return (0);
diff --git a/src/core/endpt.h b/src/core/endpt.h
index f37ea7cc..a47586a0 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -29,12 +29,16 @@ struct nni_ep {
nni_thr ep_thr;
int ep_mode;
int ep_started;
+ int ep_stop;
int ep_closed; // full shutdown
int ep_bound; // true if we bound locally
nni_mtx ep_mtx;
nni_cv ep_cv;
nni_pipe * ep_pipe; // Connected pipe (dialers only)
nni_list ep_pipes;
+ nni_aio ep_acc_aio;
+ nni_aio ep_con_aio;
+ nni_taskq_ent ep_reap_tqe;
};
enum nni_ep_mode {
@@ -45,13 +49,10 @@ enum nni_ep_mode {
extern int nni_ep_sys_init(void);
extern void nni_ep_sys_fini(void);
extern int nni_ep_find(nni_ep **, uint32_t);
-extern void nni_ep_hold(nni_ep *);
-extern void nni_ep_rele(nni_ep *);
extern uint32_t nni_ep_id(nni_ep *);
extern int nni_ep_create(nni_ep **, nni_sock *, const char *, int);
extern void nni_ep_stop(nni_ep *);
extern void nni_ep_close(nni_ep *);
-extern void nni_ep_remove(nni_ep *);
extern int nni_ep_dial(nni_ep *, int);
extern int nni_ep_listen(nni_ep *, int);
extern void nni_ep_list_init(nni_list *);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 6fa9ed47..8f2099a9 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -22,8 +22,7 @@ nni_pipe_sys_init(void)
{
int rv;
- rv = nni_idhash_init(&nni_pipes);
- if (rv != 0) {
+ if ((rv = nni_idhash_init(&nni_pipes)) != 0) {
return (rv);
}
@@ -102,6 +101,9 @@ nni_pipe_close(nni_pipe *p)
}
p->p_reap = 1;
+ // abort any pending negotiation/start process.
+ nni_aio_stop(&p->p_start_aio);
+
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
@@ -120,9 +122,6 @@ nni_pipe_reap(nni_pipe *p)
// Transport close...
nni_pipe_close(p);
- // Unlink the endpoint and pipe.
- nni_ep_pipe_remove(p->p_ep, p);
-
// Tell the protocol to stop.
nni_sock_pipe_stop(p->p_sock, p);
@@ -140,9 +139,9 @@ nni_pipe_stop(nni_pipe *p)
return;
}
p->p_stop = 1;
- nni_mtx_unlock(&p->p_mtx);
nni_taskq_ent_init(&p->p_reap_tqe, (nni_cb) nni_pipe_reap, p);
nni_taskq_dispatch(NULL, &p->p_reap_tqe);
+ nni_mtx_unlock(&p->p_mtx);
}
uint16_t
@@ -173,7 +172,7 @@ nni_pipe_start_cb(void *arg)
}
int
-nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
+nni_pipe_create(nni_pipe **pp, nni_sock *sock, nni_tran *tran)
{
nni_pipe *p;
int rv;
@@ -202,7 +201,6 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep, nni_sock *sock, nni_tran *tran)
return (rv);
}
p->p_sock = sock;
- p->p_ep = ep;
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 2f56e788..990dac9d 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -60,7 +60,7 @@ extern void nni_pipe_stop(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, nni_ep *, nni_sock *, nni_tran *);
+extern int nni_pipe_create(nni_pipe **, nni_sock *, nni_tran *);
extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 4be5a856..3a4f36e5 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -109,7 +109,8 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
void
nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
{
- void *pdata;
+ void * pdata;
+ nni_ep *ep;
pdata = nni_pipe_get_proto_data(pipe);
@@ -119,10 +120,24 @@ nni_sock_pipe_stop(nni_sock *sock, nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
return;
}
+
+ // Break up the relationship between the EP and the pipe.
+ if ((ep = pipe->p_ep) != NULL) {
+ nni_mtx_lock(&ep->ep_mtx);
+ // During early init, the pipe might not have this set.
+ if (nni_list_active(&ep->ep_pipes, pipe)) {
+ nni_list_remove(&ep->ep_pipes, pipe);
+ }
+ pipe->p_ep = NULL;
+ ep->ep_pipe = NULL; // XXX: remove this soon
+ nni_cv_wake(&ep->ep_cv);
+ nni_mtx_unlock(&ep->ep_mtx);
+ }
+
sock->s_pipe_ops.pipe_stop(pdata);
if (nni_list_active(&sock->s_pipes, pipe)) {
nni_list_remove(&sock->s_pipes, pipe);
- if (sock->s_closing) {
+ if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
nni_cv_wake(&sock->s_cv);
}
}
@@ -459,10 +474,10 @@ nni_sock_shutdown(nni_sock *sock)
linger = nni_clock() + sock->s_linger;
}
- // Stop the EPs. This prevents new connections from forming but
+ // Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_stop(ep);
+ nni_ep_close(ep);
}
nni_mtx_unlock(&sock->s_mx);
@@ -492,15 +507,14 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, close the underlying transport.
+ // For each pipe, arrange for it to teardown hard. (Close, etc.).
NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
nni_pipe_stop(pipe);
}
- // For each ep, close it; this will also tell it to force any
- // of its pipes to close.
+ // For each ep, arrange for it to teardown hard.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_close(ep);
+ nni_ep_stop(ep);
}
// Wait for the pipes to be reaped (there should not be any because
@@ -511,14 +525,7 @@ nni_sock_shutdown(nni_sock *sock)
// Wait for the eps to be reaped.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
- nni_list_remove(&sock->s_eps, ep);
-
- // This has to be done without the lock held, as the remove
- // operation requires shutting down a thread which might be
- // trying to acquire the socket lock.
- nni_mtx_unlock(&sock->s_mx);
- nni_ep_remove(ep);
- nni_mtx_lock(&sock->s_mx);
+ nni_cv_wait(&sock->s_cv);
}
sock->s_sock_ops.sock_close(sock->s_data);
@@ -535,28 +542,10 @@ nni_sock_shutdown(nni_sock *sock)
return (0);
}
-// nni_sock_ep_add adds a newly created endpoint to the socket. The
-// caller must hold references on the sock and the ep, and not be holding
-// the socket lock. The ep acquires a reference against the sock,
-// which will be dropped later by nni_sock_rem_ep. The endpoint must not
-// already be associated with a socket. (Note, the ep holds the reference
-// on the socket, not the other way around.)
-int
-nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
-{
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
- return (0);
-}
-
void
nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
{
+ nni_pipe *pipe;
// If we're not on the list, then nothing to do. Be idempotent.
// Note that if the ep is not on a list, then we assume that we have
// exclusive access. Therefore the check for being active need not
@@ -564,8 +553,24 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
if ((sock == NULL) || (!nni_list_active(&sock->s_eps, ep))) {
return;
}
+
+ // This is done under the endpoints lock, although the remove
+ // is done under that as well, we also make sure that we hold
+ // the socket lock in the remove step.
+ nni_mtx_lock(&ep->ep_mtx);
+ NNI_LIST_FOREACH (&ep->ep_pipes, pipe) {
+ nni_pipe_stop(pipe);
+ }
+ while (!nni_list_empty(&ep->ep_pipes)) {
+ nni_cv_wait(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&ep->ep_mtx);
+
nni_mtx_lock(&sock->s_mx);
nni_list_remove(&sock->s_eps, ep);
+ if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
+ nni_cv_wake(&sock->s_cv);
+ }
nni_mtx_unlock(&sock->s_mx);
}
@@ -727,12 +732,16 @@ nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_DIAL)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
if ((rv = nni_ep_dial(ep, flags)) != 0) {
- nni_ep_remove(ep);
+ nni_ep_stop(ep);
} else if (epp != NULL) {
*epp = ep;
}
@@ -746,12 +755,16 @@ nni_sock_listen(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
nni_ep *ep;
int rv;
+ nni_mtx_lock(&sock->s_mx);
if ((rv = nni_ep_create(&ep, sock, addr, NNI_EP_MODE_LISTEN)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+ nni_list_append(&sock->s_eps, ep);
+ nni_mtx_unlock(&sock->s_mx);
if ((rv = nni_ep_listen(ep, flags)) != 0) {
- nni_ep_remove(ep);
+ nni_ep_stop(ep);
} else if (epp != NULL) {
*epp = ep;
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 76b57f09..2dc06009 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -84,7 +85,6 @@ extern void nni_sock_unlock(nni_sock *);
extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_sock_unnotify(nni_sock *, nni_notify *);
-extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
extern void nni_sock_pipe_stop(nni_sock *, nni_pipe *);
diff --git a/src/nng.c b/src/nng.c
index e66a3ddc..98442a83 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -257,14 +258,9 @@ nng_listen(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
int
nng_endpoint_close(nng_endpoint eid)
{
- int rv;
- nni_ep *ep;
-
- if ((rv = nni_ep_find(&ep, eid)) != 0) {
- return (rv);
- }
- nni_ep_close(ep);
- return (0);
+ // XXX: reimplement this properly.
+ NNI_ARG_UNUSED(eid);
+ return (NNG_ENOTSUP);
}
int
diff --git a/src/platform/windows/win_iocp.c b/src/platform/windows/win_iocp.c
index 2635a4fc..d0a39142 100644
--- a/src/platform/windows/win_iocp.c
+++ b/src/platform/windows/win_iocp.c
@@ -186,7 +186,7 @@ nni_win_event_init(nni_win_event *evt, nni_win_event_ops *ops, void *ptr)
int rv;
ZeroMemory(&evt->olpd, sizeof(evt->olpd));
- evt->olpd.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+ evt->olpd.hEvent = CreateEvent(NULL, TRUE, TRUE, NULL);
if (evt->olpd.hEvent == NULL) {
return (nni_win_error(GetLastError()));
}