aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/aio.c1
-rw-r--r--src/core/msgqueue.c42
-rw-r--r--src/core/pipe.c167
-rw-r--r--src/core/pipe.h5
-rw-r--r--src/core/socket.c124
-rw-r--r--src/core/socket.h26
-rw-r--r--src/protocol/pair/pair.c39
-rw-r--r--src/protocol/reqrep/rep.c14
-rw-r--r--src/protocol/reqrep/req.c29
-rw-r--r--src/transport/inproc/inproc.c31
-rw-r--r--tests/CMakeLists.txt1
-rw-r--r--tests/scalability.c176
12 files changed, 500 insertions, 155 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index ffc5ac06..2f871f73 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -41,6 +41,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg)
void
nni_aio_fini(nni_aio *aio)
{
+ nni_taskq_cancel(&aio->a_tqe);
nni_cv_fini(&aio->a_cv);
nni_mtx_fini(&aio->a_lk);
}
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 9607f562..47b98629 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -388,6 +388,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_putq, aio);
nni_msgq_run_putq(mq);
nni_msgq_run_notify(mq);
@@ -406,6 +411,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio)
nni_time expire = aio->a_expire;
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ nni_mtx_unlock(&mq->mq_lock);
+ return;
+ }
nni_list_append(&mq->mq_aio_getq, aio);
nni_msgq_run_getq(mq);
nni_msgq_run_notify(mq);
@@ -428,6 +438,7 @@ nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio)
// the node from either the getq or the putq list.
if (nni_list_active(&mq->mq_aio_getq, aio)) {
nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, NNG_ECANCELED, 0);
}
nni_mtx_unlock(&mq->mq_lock);
}
@@ -437,6 +448,10 @@ int
nni_msgq_canput(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+ }
if ((mq->mq_len < mq->mq_cap) ||
(mq->mq_rwait != 0) || // XXX: REMOVE ME
(nni_list_first(&mq->mq_aio_getq) != NULL)) {
@@ -452,6 +467,10 @@ int
nni_msgq_canget(nni_msgq *mq)
{
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (0);
+ }
if ((mq->mq_len != 0) ||
(mq->mq_wwait != 0) ||
(nni_list_first(&mq->mq_aio_putq) != NULL)) {
@@ -470,6 +489,10 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
size_t len = nni_msg_len(msg);
nni_mtx_lock(&mq->mq_lock);
+ if (mq->mq_closed) {
+ nni_mtx_unlock(&mq->mq_lock);
+ return (NNG_ECLOSED);
+ }
// The presence of any blocked reader indicates that
// the queue is empty, otherwise it would have just taken
@@ -804,6 +827,9 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire)
void
nni_msgq_close(nni_msgq *mq)
{
+ nni_aio *aio;
+ nni_aio *naio;
+
nni_mtx_lock(&mq->mq_lock);
mq->mq_closed = 1;
mq->mq_wwait = 0;
@@ -821,6 +847,22 @@ nni_msgq_close(nni_msgq *mq)
mq->mq_len--;
nni_msg_free(msg);
}
+
+ // Let all pending blockers know we are closing the queue.
+ naio = nni_list_first(&mq->mq_aio_getq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_getq, aio);
+ nni_list_remove(&mq->mq_aio_getq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+
+ naio = nni_list_first(&mq->mq_aio_putq);
+ while ((aio = naio) != NULL) {
+ naio = nni_list_next(&mq->mq_aio_putq, aio);
+ nni_list_remove(&mq->mq_aio_putq, aio);
+ nni_aio_finish(aio, NNG_ECLOSED, 0);
+ }
+
nni_mtx_unlock(&mq->mq_lock);
}
diff --git a/src/core/pipe.c b/src/core/pipe.c
index a401e4e3..18c47c60 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -50,6 +50,30 @@ nni_pipe_aio_send(nni_pipe *p, nni_aio *aio)
}
+void
+nni_pipe_incref(nni_pipe *p)
+{
+ nni_mtx_lock(&p->p_mtx);
+ p->p_refcnt++;
+ nni_mtx_unlock(&p->p_mtx);
+}
+
+
+void
+nni_pipe_decref(nni_pipe *p)
+{
+ nni_mtx_lock(&p->p_mtx);
+ p->p_refcnt--;
+ if (p->p_refcnt == 0) {
+ nni_mtx_unlock(&p->p_mtx);
+
+ nni_pipe_destroy(p);
+ return;
+ }
+ nni_mtx_unlock(&p->p_mtx);
+}
+
+
// nni_pipe_close closes the underlying connection. It is expected that
// subsequent attempts receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
@@ -58,37 +82,34 @@ nni_pipe_close(nni_pipe *p)
{
nni_sock *sock = p->p_sock;
+ nni_mtx_lock(&p->p_mtx);
+ if (p->p_reap == 1) {
+ // We already did a close.
+ nni_mtx_unlock(&p->p_mtx);
+ return;
+ }
+ p->p_reap = 1;
+
+ // Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_close(p->p_tran_data);
}
- nni_mtx_lock(&sock->s_mx);
- if (!p->p_reap) {
- // schedule deferred reap/close
- p->p_reap = 1;
- nni_list_remove(&sock->s_pipes, p);
- nni_list_append(&sock->s_reaps, p);
- nni_cv_wake(&sock->s_cv);
+ // Unregister our ID so nobody else can find it.
+ if (p->p_id != 0) {
+ nni_mtx_lock(nni_idlock);
+ nni_idhash_remove(nni_pipes, p->p_id);
+ nni_mtx_unlock(nni_idlock);
+ p->p_id = 0;
}
- nni_mtx_unlock(&sock->s_mx);
-}
+ nni_mtx_unlock(&p->p_mtx);
-// nni_pipe_bail is a special version of close, that is used to abort
-// from nni_pipe_start, when it fails. It requires the lock to be held,
-// and this prevents us from dropping the lock, possibly leading to race
-// conditions. It's critical that this not be called after the pipe is
-// started, or deadlock will occur.
-static void
-nni_pipe_bail(nni_pipe *p)
-{
- nni_sock *sock = p->p_sock;
-
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_close(p->p_tran_data);
- }
+ // Let the socket (and endpoint) know we have closed.
+ nni_sock_pipe_closed(sock, p);
- nni_pipe_destroy(p);
+ // Drop a reference count, possibly doing deferred destroy.
+ nni_pipe_decref(p);
}
@@ -99,25 +120,6 @@ nni_pipe_peer(nni_pipe *p)
}
-void
-nni_pipe_destroy(nni_pipe *p)
-{
- int i;
-
- for (i = 0; i < NNI_MAXWORKERS; i++) {
- nni_thr_fini(&p->p_worker_thr[i]);
- }
-
- if (p->p_tran_data != NULL) {
- p->p_tran_ops.pipe_destroy(p->p_tran_data);
- }
- if (p->p_proto_data != NULL) {
- p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
- }
- NNI_FREE_STRUCT(p);
-}
-
-
int
nni_pipe_create(nni_pipe **pp, nni_ep *ep)
{
@@ -126,15 +128,17 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
const nni_proto_pipe_ops *ops = &sock->s_pipe_ops;
void *pdata;
int rv;
- int i;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
+ if ((rv = nni_mtx_init(&p->p_mtx)) != 0) {
+ NNI_FREE_STRUCT(p);
+ return (rv);
+ }
p->p_sock = sock;
p->p_tran_data = NULL;
p->p_proto_data = NULL;
- p->p_active = 0;
p->p_id = 0;
NNI_LIST_NODE_INIT(&p->p_node);
@@ -143,30 +147,37 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
p->p_tran_ops = *ep->ep_tran->tran_pipe;
if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) {
+ nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
return (rv);
}
p->p_proto_data = pdata;
-
- for (i = 0; i < NNI_MAXWORKERS; i++) {
- nni_worker fn = ops->pipe_worker[i];
- rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata);
- if (rv != 0) {
- while (i > 0) {
- i--;
- nni_thr_fini(&p->p_worker_thr[i]);
- }
- ops->pipe_fini(pdata);
- NNI_FREE_STRUCT(p);
- return (rv);
- }
- }
+ nni_sock_pipe_add(sock, p);
*pp = p;
return (0);
}
+void
+nni_pipe_destroy(nni_pipe *p)
+{
+ NNI_ASSERT(p->p_refcnt == 0);
+
+ // The caller is responsible for ensuring that the pipe
+ // is not in use by any other consumers. It must not be started
+ if (p->p_tran_data != NULL) {
+ p->p_tran_ops.pipe_destroy(p->p_tran_data);
+ }
+ if (p->p_proto_data != NULL) {
+ p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
+ }
+ nni_sock_pipe_rem(p->p_sock, p);
+ nni_mtx_fini(&p->p_mtx);
+ NNI_FREE_STRUCT(p);
+}
+
+
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
@@ -179,55 +190,27 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
int
-nni_pipe_start(nni_pipe *pipe)
+nni_pipe_start(nni_pipe *p)
{
int rv;
- int i;
- nni_sock *sock = pipe->p_sock;
-
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if (nni_pipe_peer(pipe) != sock->s_peer) {
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_EPROTO);
- }
+ nni_pipe_incref(p);
nni_mtx_lock(nni_idlock);
- rv = nni_idhash_alloc(nni_pipes, &pipe->p_id, pipe);
+ rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
nni_mtx_unlock(nni_idlock);
if (rv != 0) {
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(p);
return (rv);
}
- if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
- nni_mtx_lock(nni_idlock);
- nni_idhash_remove(nni_pipes, pipe->p_id);
- pipe->p_id = 0;
- nni_mtx_unlock(nni_idlock);
-
- nni_pipe_bail(pipe);
- nni_mtx_unlock(&sock->s_mx);
+ if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) {
+ nni_pipe_close(p);
return (rv);
}
- pipe->p_active = 1;
- nni_list_append(&sock->s_pipes, pipe);
-
- for (i = 0; i < NNI_MAXWORKERS; i++) {
- nni_thr_run(&pipe->p_worker_thr[i]);
- }
-
// XXX: Publish event
- nni_mtx_unlock(&sock->s_mx);
return (0);
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 3ec4a7a3..6cabf4e7 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -28,7 +28,8 @@ struct nni_pipe {
nni_ep * p_ep;
int p_reap;
int p_active;
- nni_thr p_worker_thr[NNI_MAXWORKERS];
+ nni_mtx p_mtx;
+ int p_refcnt;
};
// AIO
@@ -40,6 +41,8 @@ extern int nni_pipe_recv(nni_pipe *, nng_msg **);
extern int nni_pipe_send(nni_pipe *, nng_msg *);
extern uint32_t nni_pipe_id(nni_pipe *);
extern void nni_pipe_close(nni_pipe *);
+extern void nni_pipe_incref(nni_pipe *);
+extern void nni_pipe_decref(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
diff --git a/src/core/socket.c b/src/core/socket.c
index d58e64ba..40fb42bc 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -128,12 +128,113 @@ nni_sock_held_close(nni_sock *sock)
}
+void
+nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
+{
+ nni_mtx_lock(&sock->s_mx);
+ nni_list_append(&sock->s_pipes, pipe);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+int
+nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
+{
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+ if (nni_pipe_peer(pipe) != sock->s_peer) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (NNG_EPROTO);
+ }
+
+ if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+ }
+
+ pipe->p_active = 1;
+
+ nni_list_remove(&sock->s_idles, pipe);
+ nni_list_append(&sock->s_pipes, pipe);
+
+ nni_mtx_unlock(&sock->s_mx);
+
+ return (0);
+}
+
+
+void
+nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
+{
+ nni_ep *ep;
+
+ nni_mtx_lock(&sock->s_mx);
+
+ // NB: nni_list_remove doesn't really care *which* list the pipe
+ // is on, and so if the pipe is already on the idle list these
+ // two statements are effectively a no-op.
+ nni_list_remove(&sock->s_pipes, pipe);
+ nni_list_append(&sock->s_idles, pipe);
+
+ if (pipe->p_active) {
+ pipe->p_active = 0;
+ sock->s_pipe_ops.pipe_rem(pipe->p_proto_data);
+ }
+
+ // Notify the endpoint that the pipe has closed.
+ if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
+ ep->ep_pipe = NULL;
+ nni_cv_wake(&ep->ep_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+void
+nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
+{
+ nni_ep *ep;
+
+ nni_mtx_lock(&sock->s_mx);
+ nni_list_remove(&sock->s_idles, pipe);
+
+ // Notify the endpoint that the pipe has closed - if not already done.
+ if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
+ ep->ep_pipe = NULL;
+ nni_cv_wake(&ep->ep_cv);
+ }
+ nni_cv_wake(&sock->s_cv);
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
+void
+nni_sock_lock(nni_sock *sock)
+{
+ nni_mtx_lock(&sock->s_mx);
+}
+
+
+void
+nni_sock_unlock(nni_sock *sock)
+{
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
static void
nni_reaper(void *arg)
{
+#if 0
nni_sock *sock = arg;
for (;;) {
@@ -183,6 +284,7 @@ nni_reaper(void *arg)
nni_cv_wait(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
}
+#endif
}
@@ -301,7 +403,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sock->s_reapexit = 0;
sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
- NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
+ NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
@@ -512,15 +614,14 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // For each pipe, close the underlying transport, and move it to
- // deathrow (the reaplist).
+ // For each pipe, close the underlying transport. Also move it
+ // to the idle list so we won't keep looping.
while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) {
- if (pipe->p_tran_data != NULL) {
- pipe->p_tran_ops.pipe_close(pipe->p_tran_data);
- }
- pipe->p_reap = 1;
- nni_list_remove(&sock->s_pipes, pipe);
- nni_list_append(&sock->s_reaps, pipe);
+ nni_pipe_incref(pipe);
+ nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
+ nni_pipe_decref(pipe);
+ nni_mtx_lock(&sock->s_mx);
}
sock->s_sock_ops.sock_close(sock->s_data);
@@ -528,6 +629,11 @@ nni_sock_shutdown(nni_sock *sock)
sock->s_reapexit = 1;
nni_cv_wake(&sock->s_notify_cv);
nni_cv_wake(&sock->s_cv);
+
+ while ((nni_list_first(&sock->s_idles) != NULL) ||
+ (nni_list_first(&sock->s_pipes) != NULL)) {
+ nni_cv_wait(&sock->s_cv);
+ }
nni_mtx_unlock(&sock->s_mx);
// Wait for the threads to exit.
diff --git a/src/core/socket.h b/src/core/socket.h
index d7a7eb5e..22873c3c 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -42,7 +42,8 @@ struct nni_socket {
nni_duration s_reconnmax; // max reconnect time
nni_list s_eps; // active endpoints
- nni_list s_pipes; // pipes for this socket
+ nni_list s_pipes; // ready pipes (started)
+ nni_list s_idles; // idle pipes (not ready)
nni_list s_events; // pending events
nni_list s_notify; // event watchers
nni_cv s_notify_cv; // wakes notify thread
@@ -89,6 +90,29 @@ extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int);
extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int);
extern uint32_t nni_sock_id(nni_sock *);
+extern void nni_sock_lock(nni_sock *);
+extern void nni_sock_unlock(nni_sock *);
+
+// nni_sock_pipe_add is called by the pipe to register the pipe with
+// with the socket. The pipe is added to the idle list.
+extern void nni_sock_pipe_add(nni_sock *, nni_pipe *);
+
+// nni_sock_pipe_rem deregisters the pipe from the socket. The socket
+// will block during close if there are registered pipes outstanding.
+extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *);
+
+// nni_sock_pipe_ready lets the socket know the pipe is ready for
+// business. This also calls the socket/protocol specific add function,
+// and it may return an error. A reference count on the pipe is incremented
+// on success. The reference count should be dropped by nni_sock_pipe_closed.
+extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *);
+
+// nni_sock_pipe_closed lets the socket know that the pipe is closed.
+// This keeps the socket from trying to schedule traffic to it. It
+// also lets the endpoint know about it, to possibly restart a dial
+// operation.
+extern void nni_sock_pipe_closed(nni_sock *, nni_pipe *);
+
// Set error codes for applications. These are only ever
// called from the filter functions in protocols, and thus
// already have the socket lock held.
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index e5e2e17b..65eabd87 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -23,6 +23,7 @@ static void nni_pair_send_cb(void *);
static void nni_pair_recv_cb(void *);
static void nni_pair_getq_cb(void *);
static void nni_pair_putq_cb(void *);
+static void nni_pair_pipe_fini(void *);
// An nni_pair_sock is our per-socket protocol private structure.
struct nni_pair_sock {
@@ -44,11 +45,10 @@ struct nni_pair_pipe {
nni_aio aio_recv;
nni_aio aio_getq;
nni_aio aio_putq;
+ int busy;
+ int closed;
};
-static void nni_pair_receiver(void *);
-static void nni_pair_sender(void *);
-
static int
nni_pair_sock_init(void **sp, nni_sock *nsock)
{
@@ -90,22 +90,22 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock)
rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe);
if (rv != 0) {
- nni_pair_sock_fini(ppipe);
+ nni_pair_pipe_fini(ppipe);
return (rv);
}
ppipe->npipe = npipe;
@@ -120,13 +120,12 @@ nni_pair_pipe_fini(void *arg)
{
nni_pair_pipe *ppipe = arg;
- if (ppipe != NULL) {
- nni_aio_fini(&ppipe->aio_send);
- nni_aio_fini(&ppipe->aio_recv);
- nni_aio_fini(&ppipe->aio_putq);
- nni_aio_fini(&ppipe->aio_getq);
- NNI_FREE_STRUCT(ppipe);
- }
+ NNI_ASSERT(ppipe->busy >= 0);
+ nni_aio_fini(&ppipe->aio_send);
+ nni_aio_fini(&ppipe->aio_recv);
+ nni_aio_fini(&ppipe->aio_putq);
+ nni_aio_fini(&ppipe->aio_getq);
+ NNI_FREE_STRUCT(ppipe);
}
@@ -142,7 +141,10 @@ nni_pair_pipe_add(void *arg)
psock->ppipe = ppipe;
// Schedule a getq on the upper, and a read from the pipe.
+ // Each of these also sets up another hold on the pipe itself.
+ nni_pipe_incref(ppipe->npipe);
nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq);
+ nni_pipe_incref(ppipe->npipe);
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
return (0);
@@ -155,9 +157,10 @@ nni_pair_pipe_rem(void *arg)
nni_pair_pipe *ppipe = arg;
nni_pair_sock *psock = ppipe->psock;
+ nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq);
+ nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
+
if (psock->ppipe == ppipe) {
- nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq);
- nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq);
psock->ppipe = NULL;
}
}
@@ -171,6 +174,7 @@ nni_pair_recv_cb(void *arg)
if (nni_aio_result(&ppipe->aio_recv) != 0) {
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
@@ -189,6 +193,7 @@ nni_pair_putq_cb(void *arg)
nni_msg_free(ppipe->aio_putq.a_msg);
ppipe->aio_putq.a_msg = NULL;
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv);
@@ -204,6 +209,7 @@ nni_pair_getq_cb(void *arg)
if (nni_aio_result(&ppipe->aio_getq) != 0) {
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
@@ -223,6 +229,7 @@ nni_pair_send_cb(void *arg)
nni_msg_free(ppipe->aio_send.a_msg);
ppipe->aio_send.a_msg = NULL;
nni_pipe_close(ppipe->npipe);
+ nni_pipe_decref(ppipe->npipe);
return;
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 2c658ae8..751a851b 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -182,7 +182,9 @@ nni_rep_pipe_add(void *arg)
return (rv);
}
+ nni_pipe_incref(rp->pipe);
nni_msgq_aio_get(rp->sendq, &rp->aio_getq);
+ nni_pipe_incref(rp->pipe);
nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
return (0);
}
@@ -194,7 +196,7 @@ nni_rep_pipe_rem(void *arg)
nni_rep_pipe *rp = arg;
nni_rep_sock *rep = rp->rep;
- nni_msgq_aio_cancel(rp->sendq, &rp->aio_getq);
+ nni_msgq_close(rp->sendq);
nni_msgq_aio_cancel(rep->urq, &rp->aio_putq);
nni_idhash_remove(&rep->pipes, nni_pipe_id(rp->pipe));
}
@@ -205,7 +207,6 @@ nni_rep_sock_getq_cb(void *arg)
{
nni_rep_sock *rep = arg;
nni_msgq *uwq = rep->uwq;
- nni_mtx *mx = nni_sock_mtx(rep->sock);
nni_msg *msg;
uint8_t *header;
uint32_t id;
@@ -241,12 +242,12 @@ nni_rep_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
- nni_mtx_lock(mx);
+ nni_sock_lock(rep->sock);
rv = nni_idhash_find(&rep->pipes, id, (void **) &rp);
if (rv == 0) {
rv = nni_msgq_tryput(rp->sendq, msg);
}
- nni_mtx_unlock(mx);
+ nni_sock_unlock(rep->sock);
if (rv != 0) {
nni_msg_free(msg);
}
@@ -263,6 +264,7 @@ nni_rep_pipe_getq_cb(void *arg)
if (nni_aio_result(&rp->aio_getq) != 0) {
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
@@ -282,6 +284,7 @@ nni_rep_pipe_send_cb(void *arg)
nni_msg_free(rp->aio_send.a_msg);
rp->aio_send.a_msg = NULL;
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
@@ -303,6 +306,7 @@ nni_rep_pipe_recv_cb(void *arg)
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
@@ -353,6 +357,7 @@ malformed:
// Failures here are bad enough to warrant to dropping the conn.
nni_msg_free(msg);
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
}
@@ -365,6 +370,7 @@ nni_rep_pipe_putq_cb(void *arg)
nni_msg_free(rp->aio_putq.a_msg);
rp->aio_putq.a_msg = NULL;
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index f28db1df..553ef0bf 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -188,7 +188,9 @@ nni_req_pipe_add(void *arg)
nni_req_resend(req);
}
+ nni_pipe_incref(rp->pipe);
nni_msgq_aio_get(req->uwq, &rp->aio_getq);
+ nni_pipe_incref(rp->pipe);
nni_pipe_aio_recv(rp->pipe, &rp->aio_recv);
return (0);
}
@@ -282,10 +284,12 @@ nni_req_getq_cb(void *arg)
// We should be in RAW mode. Cooked mode traffic bypasses
// the upper write queue entirely, and should never end up here.
// If the mode changes, we may briefly deliver a message, but
- // that's ok (there's an inherent race anyway).
+ // that's ok (there's an inherent race anyway). (One minor
+ // exception: we wind up here in error state when the uwq is closed.)
if (nni_aio_result(&rp->aio_getq) != 0) {
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
@@ -301,6 +305,15 @@ static void
nni_req_sendraw_cb(void *arg)
{
nni_req_pipe *rp = arg;
+ nni_msg *msg;
+
+ if (nni_aio_result(&rp->aio_sendraw) != 0) {
+ nni_msg_free(rp->aio_sendraw.a_msg);
+ rp->aio_sendraw.a_msg = NULL;
+ nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
+ return;
+ }
// Sent a message so we just need to look for another one.
nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq);
@@ -314,6 +327,17 @@ nni_req_sendcooked_cb(void *arg)
nni_req_sock *req = rp->req;
nni_mtx *mx = nni_sock_mtx(req->sock);
+ if (nni_aio_result(&rp->aio_sendcooked) != 0) {
+ // We failed to send... clean up and deal with it.
+ // We leave ourselves on the busy list for now, which
+ // means no new asynchronous traffic can occur here.
+ nni_msg_free(rp->aio_sendcooked.a_msg);
+ rp->aio_sendcooked.a_msg = NULL;
+ nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
+ return;
+ }
+
// Cooked mode. We completed a cooked send, so we need to
// reinsert ourselves in the ready list, and possibly schedule
// a resend.
@@ -335,6 +359,7 @@ nni_req_putq_cb(void *arg)
if (nni_aio_result(&rp->aio_putq) != 0) {
nni_msg_free(rp->aio_putq.a_msg);
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
rp->aio_putq.a_msg = NULL;
@@ -351,6 +376,7 @@ nni_req_recv_cb(void *arg)
if (nni_aio_result(&rp->aio_recv) != 0) {
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
return;
}
@@ -381,6 +407,7 @@ nni_req_recv_cb(void *arg)
malformed:
nni_msg_free(msg);
nni_pipe_close(rp->pipe);
+ nni_pipe_decref(rp->pipe);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 0cc208d4..4e329d10 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -160,35 +160,6 @@ nni_inproc_pipe_aio_recv(void *arg, nni_aio *aio)
}
-static int
-nni_inproc_pipe_send(void *arg, nni_msg *msg)
-{
- nni_inproc_pipe *pipe = arg;
- char *h;
- size_t l;
-
- // We need to move any header data to the body, because the other
- // side won't know what to do otherwise.
- h = nni_msg_header(msg);
- l = nni_msg_header_len(msg);
- if (nni_msg_prepend(msg, h, l) != 0) {
- nni_msg_free(msg);
- return (0); // Pretend we sent it.
- }
- nni_msg_trunc_header(msg, l);
- return (nni_msgq_put(pipe->wq, msg));
-}
-
-
-static int
-nni_inproc_pipe_recv(void *arg, nni_msg **msgp)
-{
- nni_inproc_pipe *pipe = arg;
-
- return (nni_msgq_get(pipe->rq, msgp));
-}
-
-
static uint16_t
nni_inproc_pipe_peer(void *arg)
{
@@ -433,8 +404,6 @@ nni_inproc_ep_accept(void *arg, void **pipep)
static nni_tran_pipe nni_inproc_pipe_ops = {
.pipe_destroy = nni_inproc_pipe_destroy,
- .pipe_send = nni_inproc_pipe_send,
- .pipe_recv = nni_inproc_pipe_recv,
.pipe_aio_send = nni_inproc_pipe_aio_send,
.pipe_aio_recv = nni_inproc_pipe_aio_recv,
.pipe_close = nni_inproc_pipe_close,
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index ae6401ce..2b0d1427 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -84,6 +84,7 @@ add_nng_test(pubsub 5)
add_nng_test(sock 5)
add_nng_test(survey 5)
add_nng_test(tcp 5)
+add_nng_test(scalability 5)
# compatbility tests
add_nng_compat_test(compat_block 5)
diff --git a/tests/scalability.c b/tests/scalability.c
new file mode 100644
index 00000000..16f4ba8c
--- /dev/null
+++ b/tests/scalability.c
@@ -0,0 +1,176 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "convey.h"
+#include "nng.h"
+
+#include <string.h>
+
+static int count = 1;
+static int nthrs = 100;
+static char *addr = "inproc:///atscale";
+
+static void
+client(void *arg)
+{
+ int *result = arg;
+ nng_socket s;
+ int rv;
+ uint64_t timeo;
+ nng_msg *msg;
+ int i;
+
+ *result = 0;
+
+ if ((rv = nng_open(&s, NNG_PROTO_REQ)) != 0) {
+ *result = rv;
+ return;
+ }
+
+ if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) {
+ *result = rv;
+ nng_close(s);
+ return;
+ }
+
+ timeo = 40000; // 4 seconds
+ if (((rv = nng_setopt(s, NNG_OPT_RCVTIMEO, &timeo, sizeof (timeo))) != 0) ||
+ ((rv = nng_setopt(s, NNG_OPT_SNDTIMEO, &timeo, sizeof (timeo))) != 0)) {
+ *result = rv;
+ nng_close(s);
+ return;
+ }
+
+ // Sleep for up to a second before issuing requests to avoid saturating
+ // the CPU with bazillions of requests at the same time.
+
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ *result = rv;
+ nng_close(s);
+ return;
+ }
+ if ((rv = nng_msg_append(msg, "abc", strlen("abc"))) != 0) {
+ *result = rv;
+ nng_msg_free(msg);
+ nng_close(s);
+ return;
+ }
+
+ for (i = 0; i < count; i++) {
+ // Sleep for up to a 1ms before issuing requests to
+ // avoid saturating the CPU with bazillions of requests at
+ // the same time.
+ nng_usleep(rand() % 1000);
+
+ // Reusing the same message causes problems as a result of
+ // header reuse.
+ if ((rv = nng_msg_alloc(&msg, 0)) != 0) {
+ *result = rv;
+ nng_close(s);
+ return;
+ }
+
+ if ((rv = nng_sendmsg(s, msg, 0)) != 0) {
+ *result = rv;
+ nng_msg_free(msg);
+ nng_close(s);
+ return;
+ }
+
+ if ((rv = nng_recvmsg(s, &msg, 0)) != 0) {
+ *result = rv;
+ nng_close(s);
+ return;
+ }
+
+ nng_msg_free(msg);
+ }
+
+ nng_close(s);
+ *result = 0;
+}
+
+void
+serve(void *arg)
+{
+ nng_socket rep = *(nng_socket *)arg;
+ nng_msg *msg;
+
+ for (;;) {
+ if (nng_recvmsg(rep, &msg, 0) != 0) {
+ nng_close(rep);
+ return;
+ }
+
+ if (nng_sendmsg(rep, msg, 0) != 0) {
+ nng_close(rep);
+ return;
+ }
+ }
+}
+
+Main({
+ int rv;
+ void **clients;
+ void *server;
+ int *results;
+
+ clients = calloc(nthrs, sizeof (void *));
+ results = calloc(nthrs, sizeof (int));
+
+ Test("Scalability", {
+
+ Convey("Given a server socket", {
+ nng_socket rep;
+ int depth = 256;
+
+ So(nng_open(&rep, NNG_PROTO_REP) == 0);
+
+ Reset({
+ nng_close(rep);
+ })
+
+ So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof (depth)) == 0);
+ So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof (depth)) == 0);
+ So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0);
+
+ So(nng_thread_create(&server, serve, &rep) == 0);
+
+ nng_usleep(100000);
+
+ Convey("We can run many many clients", {
+ int fails = 0;
+ int i;
+ for (i = 0; i < nthrs; i++) {
+ if ((rv = nng_thread_create(&clients[i], client, &results[i])) != 0) {
+ printf("thread create failed: %s", nng_strerror(rv));
+ break;
+ }
+ }
+ So(i == nthrs);
+
+ for (i = 0; i < nthrs; i++) {
+ nng_thread_destroy(clients[i]);
+ fails += (results[i] == 0 ? 0 : 1);
+ if (results[i] != 0) {
+ printf("%d (%d): %s\n",
+ fails, i,
+ nng_strerror(results[i]));
+ }
+ }
+ So(fails == 0);
+
+ nng_shutdown(rep);
+
+ nng_thread_destroy(server);
+ })
+ })
+
+ })
+})