diff options
| -rw-r--r-- | src/core/aio.c | 1 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 42 | ||||
| -rw-r--r-- | src/core/pipe.c | 167 | ||||
| -rw-r--r-- | src/core/pipe.h | 5 | ||||
| -rw-r--r-- | src/core/socket.c | 124 | ||||
| -rw-r--r-- | src/core/socket.h | 26 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 39 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 14 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 29 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 31 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/scalability.c | 176 |
12 files changed, 500 insertions, 155 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index ffc5ac06..2f871f73 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -41,6 +41,7 @@ nni_aio_init(nni_aio *aio, nni_cb cb, void *arg) void nni_aio_fini(nni_aio *aio) { + nni_taskq_cancel(&aio->a_tqe); nni_cv_fini(&aio->a_cv); nni_mtx_fini(&aio->a_lk); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 9607f562..47b98629 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -388,6 +388,11 @@ nni_msgq_aio_put(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_putq, aio); nni_msgq_run_putq(mq); nni_msgq_run_notify(mq); @@ -406,6 +411,11 @@ nni_msgq_aio_get(nni_msgq *mq, nni_aio *aio) nni_time expire = aio->a_expire; nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_aio_finish(aio, NNG_ECLOSED, 0); + nni_mtx_unlock(&mq->mq_lock); + return; + } nni_list_append(&mq->mq_aio_getq, aio); nni_msgq_run_getq(mq); nni_msgq_run_notify(mq); @@ -428,6 +438,7 @@ nni_msgq_aio_cancel(nni_msgq *mq, nni_aio *aio) // the node from either the getq or the putq list. if (nni_list_active(&mq->mq_aio_getq, aio)) { nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, NNG_ECANCELED, 0); } nni_mtx_unlock(&mq->mq_lock); } @@ -437,6 +448,10 @@ int nni_msgq_canput(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + return (0); + } if ((mq->mq_len < mq->mq_cap) || (mq->mq_rwait != 0) || // XXX: REMOVE ME (nni_list_first(&mq->mq_aio_getq) != NULL)) { @@ -452,6 +467,10 @@ int nni_msgq_canget(nni_msgq *mq) { nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + return (0); + } if ((mq->mq_len != 0) || (mq->mq_wwait != 0) || (nni_list_first(&mq->mq_aio_putq) != NULL)) { @@ -470,6 +489,10 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) size_t len = nni_msg_len(msg); nni_mtx_lock(&mq->mq_lock); + if (mq->mq_closed) { + nni_mtx_unlock(&mq->mq_lock); + return (NNG_ECLOSED); + } // The presence of any blocked reader indicates that // the queue is empty, otherwise it would have just taken @@ -804,6 +827,9 @@ nni_msgq_drain(nni_msgq *mq, nni_time expire) void nni_msgq_close(nni_msgq *mq) { + nni_aio *aio; + nni_aio *naio; + nni_mtx_lock(&mq->mq_lock); mq->mq_closed = 1; mq->mq_wwait = 0; @@ -821,6 +847,22 @@ nni_msgq_close(nni_msgq *mq) mq->mq_len--; nni_msg_free(msg); } + + // Let all pending blockers know we are closing the queue. + naio = nni_list_first(&mq->mq_aio_getq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_getq, aio); + nni_list_remove(&mq->mq_aio_getq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + + naio = nni_list_first(&mq->mq_aio_putq); + while ((aio = naio) != NULL) { + naio = nni_list_next(&mq->mq_aio_putq, aio); + nni_list_remove(&mq->mq_aio_putq, aio); + nni_aio_finish(aio, NNG_ECLOSED, 0); + } + nni_mtx_unlock(&mq->mq_lock); } diff --git a/src/core/pipe.c b/src/core/pipe.c index a401e4e3..18c47c60 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -50,6 +50,30 @@ nni_pipe_aio_send(nni_pipe *p, nni_aio *aio) } +void +nni_pipe_incref(nni_pipe *p) +{ + nni_mtx_lock(&p->p_mtx); + p->p_refcnt++; + nni_mtx_unlock(&p->p_mtx); +} + + +void +nni_pipe_decref(nni_pipe *p) +{ + nni_mtx_lock(&p->p_mtx); + p->p_refcnt--; + if (p->p_refcnt == 0) { + nni_mtx_unlock(&p->p_mtx); + + nni_pipe_destroy(p); + return; + } + nni_mtx_unlock(&p->p_mtx); +} + + // nni_pipe_close closes the underlying connection. It is expected that // subsequent attempts receive or send (including any waiting receive) will // simply return NNG_ECLOSED. @@ -58,37 +82,34 @@ nni_pipe_close(nni_pipe *p) { nni_sock *sock = p->p_sock; + nni_mtx_lock(&p->p_mtx); + if (p->p_reap == 1) { + // We already did a close. + nni_mtx_unlock(&p->p_mtx); + return; + } + p->p_reap = 1; + + // Close the underlying transport. if (p->p_tran_data != NULL) { p->p_tran_ops.pipe_close(p->p_tran_data); } - nni_mtx_lock(&sock->s_mx); - if (!p->p_reap) { - // schedule deferred reap/close - p->p_reap = 1; - nni_list_remove(&sock->s_pipes, p); - nni_list_append(&sock->s_reaps, p); - nni_cv_wake(&sock->s_cv); + // Unregister our ID so nobody else can find it. + if (p->p_id != 0) { + nni_mtx_lock(nni_idlock); + nni_idhash_remove(nni_pipes, p->p_id); + nni_mtx_unlock(nni_idlock); + p->p_id = 0; } - nni_mtx_unlock(&sock->s_mx); -} + nni_mtx_unlock(&p->p_mtx); -// nni_pipe_bail is a special version of close, that is used to abort -// from nni_pipe_start, when it fails. It requires the lock to be held, -// and this prevents us from dropping the lock, possibly leading to race -// conditions. It's critical that this not be called after the pipe is -// started, or deadlock will occur. -static void -nni_pipe_bail(nni_pipe *p) -{ - nni_sock *sock = p->p_sock; - - if (p->p_tran_data != NULL) { - p->p_tran_ops.pipe_close(p->p_tran_data); - } + // Let the socket (and endpoint) know we have closed. + nni_sock_pipe_closed(sock, p); - nni_pipe_destroy(p); + // Drop a reference count, possibly doing deferred destroy. + nni_pipe_decref(p); } @@ -99,25 +120,6 @@ nni_pipe_peer(nni_pipe *p) } -void -nni_pipe_destroy(nni_pipe *p) -{ - int i; - - for (i = 0; i < NNI_MAXWORKERS; i++) { - nni_thr_fini(&p->p_worker_thr[i]); - } - - if (p->p_tran_data != NULL) { - p->p_tran_ops.pipe_destroy(p->p_tran_data); - } - if (p->p_proto_data != NULL) { - p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data); - } - NNI_FREE_STRUCT(p); -} - - int nni_pipe_create(nni_pipe **pp, nni_ep *ep) { @@ -126,15 +128,17 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) const nni_proto_pipe_ops *ops = &sock->s_pipe_ops; void *pdata; int rv; - int i; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_mtx_init(&p->p_mtx)) != 0) { + NNI_FREE_STRUCT(p); + return (rv); + } p->p_sock = sock; p->p_tran_data = NULL; p->p_proto_data = NULL; - p->p_active = 0; p->p_id = 0; NNI_LIST_NODE_INIT(&p->p_node); @@ -143,30 +147,37 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) p->p_tran_ops = *ep->ep_tran->tran_pipe; if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) { + nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); return (rv); } p->p_proto_data = pdata; - - for (i = 0; i < NNI_MAXWORKERS; i++) { - nni_worker fn = ops->pipe_worker[i]; - rv = nni_thr_init(&p->p_worker_thr[i], fn, pdata); - if (rv != 0) { - while (i > 0) { - i--; - nni_thr_fini(&p->p_worker_thr[i]); - } - ops->pipe_fini(pdata); - NNI_FREE_STRUCT(p); - return (rv); - } - } + nni_sock_pipe_add(sock, p); *pp = p; return (0); } +void +nni_pipe_destroy(nni_pipe *p) +{ + NNI_ASSERT(p->p_refcnt == 0); + + // The caller is responsible for ensuring that the pipe + // is not in use by any other consumers. It must not be started + if (p->p_tran_data != NULL) { + p->p_tran_ops.pipe_destroy(p->p_tran_data); + } + if (p->p_proto_data != NULL) { + p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data); + } + nni_sock_pipe_rem(p->p_sock, p); + nni_mtx_fini(&p->p_mtx); + NNI_FREE_STRUCT(p); +} + + int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) { @@ -179,55 +190,27 @@ nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) int -nni_pipe_start(nni_pipe *pipe) +nni_pipe_start(nni_pipe *p) { int rv; - int i; - nni_sock *sock = pipe->p_sock; - - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_pipe_bail(pipe); - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if (nni_pipe_peer(pipe) != sock->s_peer) { - nni_pipe_bail(pipe); - nni_mtx_unlock(&sock->s_mx); - return (NNG_EPROTO); - } + nni_pipe_incref(p); nni_mtx_lock(nni_idlock); - rv = nni_idhash_alloc(nni_pipes, &pipe->p_id, pipe); + rv = nni_idhash_alloc(nni_pipes, &p->p_id, p); nni_mtx_unlock(nni_idlock); if (rv != 0) { - nni_pipe_bail(pipe); - nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(p); return (rv); } - if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { - nni_mtx_lock(nni_idlock); - nni_idhash_remove(nni_pipes, pipe->p_id); - pipe->p_id = 0; - nni_mtx_unlock(nni_idlock); - - nni_pipe_bail(pipe); - nni_mtx_unlock(&sock->s_mx); + if ((rv = nni_sock_pipe_ready(p->p_sock, p)) != 0) { + nni_pipe_close(p); return (rv); } - pipe->p_active = 1; - nni_list_append(&sock->s_pipes, pipe); - - for (i = 0; i < NNI_MAXWORKERS; i++) { - nni_thr_run(&pipe->p_worker_thr[i]); - } - // XXX: Publish event - nni_mtx_unlock(&sock->s_mx); return (0); } diff --git a/src/core/pipe.h b/src/core/pipe.h index 3ec4a7a3..6cabf4e7 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -28,7 +28,8 @@ struct nni_pipe { nni_ep * p_ep; int p_reap; int p_active; - nni_thr p_worker_thr[NNI_MAXWORKERS]; + nni_mtx p_mtx; + int p_refcnt; }; // AIO @@ -40,6 +41,8 @@ extern int nni_pipe_recv(nni_pipe *, nng_msg **); extern int nni_pipe_send(nni_pipe *, nng_msg *); extern uint32_t nni_pipe_id(nni_pipe *); extern void nni_pipe_close(nni_pipe *); +extern void nni_pipe_incref(nni_pipe *); +extern void nni_pipe_decref(nni_pipe *); // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. diff --git a/src/core/socket.c b/src/core/socket.c index d58e64ba..40fb42bc 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -128,12 +128,113 @@ nni_sock_held_close(nni_sock *sock) } +void +nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe) +{ + nni_mtx_lock(&sock->s_mx); + nni_list_append(&sock->s_pipes, pipe); + nni_mtx_unlock(&sock->s_mx); +} + + +int +nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe) +{ + int rv; + + nni_mtx_lock(&sock->s_mx); + + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_ECLOSED); + } + if (nni_pipe_peer(pipe) != sock->s_peer) { + nni_mtx_unlock(&sock->s_mx); + return (NNG_EPROTO); + } + + if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) { + nni_mtx_unlock(&sock->s_mx); + return (rv); + } + + pipe->p_active = 1; + + nni_list_remove(&sock->s_idles, pipe); + nni_list_append(&sock->s_pipes, pipe); + + nni_mtx_unlock(&sock->s_mx); + + return (0); +} + + +void +nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe) +{ + nni_ep *ep; + + nni_mtx_lock(&sock->s_mx); + + // NB: nni_list_remove doesn't really care *which* list the pipe + // is on, and so if the pipe is already on the idle list these + // two statements are effectively a no-op. + nni_list_remove(&sock->s_pipes, pipe); + nni_list_append(&sock->s_idles, pipe); + + if (pipe->p_active) { + pipe->p_active = 0; + sock->s_pipe_ops.pipe_rem(pipe->p_proto_data); + } + + // Notify the endpoint that the pipe has closed. + if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { + ep->ep_pipe = NULL; + nni_cv_wake(&ep->ep_cv); + } + nni_mtx_unlock(&sock->s_mx); +} + + +void +nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe) +{ + nni_ep *ep; + + nni_mtx_lock(&sock->s_mx); + nni_list_remove(&sock->s_idles, pipe); + + // Notify the endpoint that the pipe has closed - if not already done. + if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) { + ep->ep_pipe = NULL; + nni_cv_wake(&ep->ep_cv); + } + nni_cv_wake(&sock->s_cv); + nni_mtx_unlock(&sock->s_mx); +} + + +void +nni_sock_lock(nni_sock *sock) +{ + nni_mtx_lock(&sock->s_mx); +} + + +void +nni_sock_unlock(nni_sock *sock) +{ + nni_mtx_unlock(&sock->s_mx); +} + + // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. static void nni_reaper(void *arg) { +#if 0 nni_sock *sock = arg; for (;;) { @@ -183,6 +284,7 @@ nni_reaper(void *arg) nni_cv_wait(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); } +#endif } @@ -301,7 +403,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) sock->s_reapexit = 0; sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); - NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); + NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node); NNI_LIST_INIT(&sock->s_events, nni_event, e_node); @@ -512,15 +614,14 @@ nni_sock_shutdown(nni_sock *sock) nni_msgq_close(sock->s_urq); nni_msgq_close(sock->s_uwq); - // For each pipe, close the underlying transport, and move it to - // deathrow (the reaplist). + // For each pipe, close the underlying transport. Also move it + // to the idle list so we won't keep looping. while ((pipe = nni_list_first(&sock->s_pipes)) != NULL) { - if (pipe->p_tran_data != NULL) { - pipe->p_tran_ops.pipe_close(pipe->p_tran_data); - } - pipe->p_reap = 1; - nni_list_remove(&sock->s_pipes, pipe); - nni_list_append(&sock->s_reaps, pipe); + nni_pipe_incref(pipe); + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + nni_pipe_decref(pipe); + nni_mtx_lock(&sock->s_mx); } sock->s_sock_ops.sock_close(sock->s_data); @@ -528,6 +629,11 @@ nni_sock_shutdown(nni_sock *sock) sock->s_reapexit = 1; nni_cv_wake(&sock->s_notify_cv); nni_cv_wake(&sock->s_cv); + + while ((nni_list_first(&sock->s_idles) != NULL) || + (nni_list_first(&sock->s_pipes) != NULL)) { + nni_cv_wait(&sock->s_cv); + } nni_mtx_unlock(&sock->s_mx); // Wait for the threads to exit. diff --git a/src/core/socket.h b/src/core/socket.h index d7a7eb5e..22873c3c 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -42,7 +42,8 @@ struct nni_socket { nni_duration s_reconnmax; // max reconnect time nni_list s_eps; // active endpoints - nni_list s_pipes; // pipes for this socket + nni_list s_pipes; // ready pipes (started) + nni_list s_idles; // idle pipes (not ready) nni_list s_events; // pending events nni_list s_notify; // event watchers nni_cv s_notify_cv; // wakes notify thread @@ -89,6 +90,29 @@ extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int); extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int); extern uint32_t nni_sock_id(nni_sock *); +extern void nni_sock_lock(nni_sock *); +extern void nni_sock_unlock(nni_sock *); + +// nni_sock_pipe_add is called by the pipe to register the pipe with +// with the socket. The pipe is added to the idle list. +extern void nni_sock_pipe_add(nni_sock *, nni_pipe *); + +// nni_sock_pipe_rem deregisters the pipe from the socket. The socket +// will block during close if there are registered pipes outstanding. +extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *); + +// nni_sock_pipe_ready lets the socket know the pipe is ready for +// business. This also calls the socket/protocol specific add function, +// and it may return an error. A reference count on the pipe is incremented +// on success. The reference count should be dropped by nni_sock_pipe_closed. +extern int nni_sock_pipe_ready(nni_sock *, nni_pipe *); + +// nni_sock_pipe_closed lets the socket know that the pipe is closed. +// This keeps the socket from trying to schedule traffic to it. It +// also lets the endpoint know about it, to possibly restart a dial +// operation. +extern void nni_sock_pipe_closed(nni_sock *, nni_pipe *); + // Set error codes for applications. These are only ever // called from the filter functions in protocols, and thus // already have the socket lock held. diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index e5e2e17b..65eabd87 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -23,6 +23,7 @@ static void nni_pair_send_cb(void *); static void nni_pair_recv_cb(void *); static void nni_pair_getq_cb(void *); static void nni_pair_putq_cb(void *); +static void nni_pair_pipe_fini(void *); // An nni_pair_sock is our per-socket protocol private structure. struct nni_pair_sock { @@ -44,11 +45,10 @@ struct nni_pair_pipe { nni_aio aio_recv; nni_aio aio_getq; nni_aio aio_putq; + int busy; + int closed; }; -static void nni_pair_receiver(void *); -static void nni_pair_sender(void *); - static int nni_pair_sock_init(void **sp, nni_sock *nsock) { @@ -90,22 +90,22 @@ nni_pair_pipe_init(void **pp, nni_pipe *npipe, void *psock) rv = nni_aio_init(&ppipe->aio_send, nni_pair_send_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_recv, nni_pair_recv_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_getq, nni_pair_getq_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } rv = nni_aio_init(&ppipe->aio_putq, nni_pair_putq_cb, ppipe); if (rv != 0) { - nni_pair_sock_fini(ppipe); + nni_pair_pipe_fini(ppipe); return (rv); } ppipe->npipe = npipe; @@ -120,13 +120,12 @@ nni_pair_pipe_fini(void *arg) { nni_pair_pipe *ppipe = arg; - if (ppipe != NULL) { - nni_aio_fini(&ppipe->aio_send); - nni_aio_fini(&ppipe->aio_recv); - nni_aio_fini(&ppipe->aio_putq); - nni_aio_fini(&ppipe->aio_getq); - NNI_FREE_STRUCT(ppipe); - } + NNI_ASSERT(ppipe->busy >= 0); + nni_aio_fini(&ppipe->aio_send); + nni_aio_fini(&ppipe->aio_recv); + nni_aio_fini(&ppipe->aio_putq); + nni_aio_fini(&ppipe->aio_getq); + NNI_FREE_STRUCT(ppipe); } @@ -142,7 +141,10 @@ nni_pair_pipe_add(void *arg) psock->ppipe = ppipe; // Schedule a getq on the upper, and a read from the pipe. + // Each of these also sets up another hold on the pipe itself. + nni_pipe_incref(ppipe->npipe); nni_msgq_aio_get(psock->uwq, &ppipe->aio_getq); + nni_pipe_incref(ppipe->npipe); nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); return (0); @@ -155,9 +157,10 @@ nni_pair_pipe_rem(void *arg) nni_pair_pipe *ppipe = arg; nni_pair_sock *psock = ppipe->psock; + nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); + nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); + if (psock->ppipe == ppipe) { - nni_msgq_aio_cancel(psock->uwq, &ppipe->aio_getq); - nni_msgq_aio_cancel(psock->urq, &ppipe->aio_putq); psock->ppipe = NULL; } } @@ -171,6 +174,7 @@ nni_pair_recv_cb(void *arg) if (nni_aio_result(&ppipe->aio_recv) != 0) { nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } @@ -189,6 +193,7 @@ nni_pair_putq_cb(void *arg) nni_msg_free(ppipe->aio_putq.a_msg); ppipe->aio_putq.a_msg = NULL; nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } nni_pipe_aio_recv(ppipe->npipe, &ppipe->aio_recv); @@ -204,6 +209,7 @@ nni_pair_getq_cb(void *arg) if (nni_aio_result(&ppipe->aio_getq) != 0) { nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } @@ -223,6 +229,7 @@ nni_pair_send_cb(void *arg) nni_msg_free(ppipe->aio_send.a_msg); ppipe->aio_send.a_msg = NULL; nni_pipe_close(ppipe->npipe); + nni_pipe_decref(ppipe->npipe); return; } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 2c658ae8..751a851b 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -182,7 +182,9 @@ nni_rep_pipe_add(void *arg) return (rv); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(rp->sendq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -194,7 +196,7 @@ nni_rep_pipe_rem(void *arg) nni_rep_pipe *rp = arg; nni_rep_sock *rep = rp->rep; - nni_msgq_aio_cancel(rp->sendq, &rp->aio_getq); + nni_msgq_close(rp->sendq); nni_msgq_aio_cancel(rep->urq, &rp->aio_putq); nni_idhash_remove(&rep->pipes, nni_pipe_id(rp->pipe)); } @@ -205,7 +207,6 @@ nni_rep_sock_getq_cb(void *arg) { nni_rep_sock *rep = arg; nni_msgq *uwq = rep->uwq; - nni_mtx *mx = nni_sock_mtx(rep->sock); nni_msg *msg; uint8_t *header; uint32_t id; @@ -241,12 +242,12 @@ nni_rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. - nni_mtx_lock(mx); + nni_sock_lock(rep->sock); rv = nni_idhash_find(&rep->pipes, id, (void **) &rp); if (rv == 0) { rv = nni_msgq_tryput(rp->sendq, msg); } - nni_mtx_unlock(mx); + nni_sock_unlock(rep->sock); if (rv != 0) { nni_msg_free(msg); } @@ -263,6 +264,7 @@ nni_rep_pipe_getq_cb(void *arg) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -282,6 +284,7 @@ nni_rep_pipe_send_cb(void *arg) nni_msg_free(rp->aio_send.a_msg); rp->aio_send.a_msg = NULL; nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -303,6 +306,7 @@ nni_rep_pipe_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -353,6 +357,7 @@ malformed: // Failures here are bad enough to warrant to dropping the conn. nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } @@ -365,6 +370,7 @@ nni_rep_pipe_putq_cb(void *arg) nni_msg_free(rp->aio_putq.a_msg); rp->aio_putq.a_msg = NULL; nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index f28db1df..553ef0bf 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -188,7 +188,9 @@ nni_req_pipe_add(void *arg) nni_req_resend(req); } + nni_pipe_incref(rp->pipe); nni_msgq_aio_get(req->uwq, &rp->aio_getq); + nni_pipe_incref(rp->pipe); nni_pipe_aio_recv(rp->pipe, &rp->aio_recv); return (0); } @@ -282,10 +284,12 @@ nni_req_getq_cb(void *arg) // We should be in RAW mode. Cooked mode traffic bypasses // the upper write queue entirely, and should never end up here. // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). + // that's ok (there's an inherent race anyway). (One minor + // exception: we wind up here in error state when the uwq is closed.) if (nni_aio_result(&rp->aio_getq) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -301,6 +305,15 @@ static void nni_req_sendraw_cb(void *arg) { nni_req_pipe *rp = arg; + nni_msg *msg; + + if (nni_aio_result(&rp->aio_sendraw) != 0) { + nni_msg_free(rp->aio_sendraw.a_msg); + rp->aio_sendraw.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } // Sent a message so we just need to look for another one. nni_msgq_aio_get(rp->req->uwq, &rp->aio_getq); @@ -314,6 +327,17 @@ nni_req_sendcooked_cb(void *arg) nni_req_sock *req = rp->req; nni_mtx *mx = nni_sock_mtx(req->sock); + if (nni_aio_result(&rp->aio_sendcooked) != 0) { + // We failed to send... clean up and deal with it. + // We leave ourselves on the busy list for now, which + // means no new asynchronous traffic can occur here. + nni_msg_free(rp->aio_sendcooked.a_msg); + rp->aio_sendcooked.a_msg = NULL; + nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); + return; + } + // Cooked mode. We completed a cooked send, so we need to // reinsert ourselves in the ready list, and possibly schedule // a resend. @@ -335,6 +359,7 @@ nni_req_putq_cb(void *arg) if (nni_aio_result(&rp->aio_putq) != 0) { nni_msg_free(rp->aio_putq.a_msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } rp->aio_putq.a_msg = NULL; @@ -351,6 +376,7 @@ nni_req_recv_cb(void *arg) if (nni_aio_result(&rp->aio_recv) != 0) { nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); return; } @@ -381,6 +407,7 @@ nni_req_recv_cb(void *arg) malformed: nni_msg_free(msg); nni_pipe_close(rp->pipe); + nni_pipe_decref(rp->pipe); } diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 0cc208d4..4e329d10 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -160,35 +160,6 @@ nni_inproc_pipe_aio_recv(void *arg, nni_aio *aio) } -static int -nni_inproc_pipe_send(void *arg, nni_msg *msg) -{ - nni_inproc_pipe *pipe = arg; - char *h; - size_t l; - - // We need to move any header data to the body, because the other - // side won't know what to do otherwise. - h = nni_msg_header(msg); - l = nni_msg_header_len(msg); - if (nni_msg_prepend(msg, h, l) != 0) { - nni_msg_free(msg); - return (0); // Pretend we sent it. - } - nni_msg_trunc_header(msg, l); - return (nni_msgq_put(pipe->wq, msg)); -} - - -static int -nni_inproc_pipe_recv(void *arg, nni_msg **msgp) -{ - nni_inproc_pipe *pipe = arg; - - return (nni_msgq_get(pipe->rq, msgp)); -} - - static uint16_t nni_inproc_pipe_peer(void *arg) { @@ -433,8 +404,6 @@ nni_inproc_ep_accept(void *arg, void **pipep) static nni_tran_pipe nni_inproc_pipe_ops = { .pipe_destroy = nni_inproc_pipe_destroy, - .pipe_send = nni_inproc_pipe_send, - .pipe_recv = nni_inproc_pipe_recv, .pipe_aio_send = nni_inproc_pipe_aio_send, .pipe_aio_recv = nni_inproc_pipe_aio_recv, .pipe_close = nni_inproc_pipe_close, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ae6401ce..2b0d1427 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -84,6 +84,7 @@ add_nng_test(pubsub 5) add_nng_test(sock 5) add_nng_test(survey 5) add_nng_test(tcp 5) +add_nng_test(scalability 5) # compatbility tests add_nng_compat_test(compat_block 5) diff --git a/tests/scalability.c b/tests/scalability.c new file mode 100644 index 00000000..16f4ba8c --- /dev/null +++ b/tests/scalability.c @@ -0,0 +1,176 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" + +#include <string.h> + +static int count = 1; +static int nthrs = 100; +static char *addr = "inproc:///atscale"; + +static void +client(void *arg) +{ + int *result = arg; + nng_socket s; + int rv; + uint64_t timeo; + nng_msg *msg; + int i; + + *result = 0; + + if ((rv = nng_open(&s, NNG_PROTO_REQ)) != 0) { + *result = rv; + return; + } + + if ((rv = nng_dial(s, addr, NULL, NNG_FLAG_SYNCH)) != 0) { + *result = rv; + nng_close(s); + return; + } + + timeo = 40000; // 4 seconds + if (((rv = nng_setopt(s, NNG_OPT_RCVTIMEO, &timeo, sizeof (timeo))) != 0) || + ((rv = nng_setopt(s, NNG_OPT_SNDTIMEO, &timeo, sizeof (timeo))) != 0)) { + *result = rv; + nng_close(s); + return; + } + + // Sleep for up to a second before issuing requests to avoid saturating + // the CPU with bazillions of requests at the same time. + + if ((rv = nng_msg_alloc(&msg, 0)) != 0) { + *result = rv; + nng_close(s); + return; + } + if ((rv = nng_msg_append(msg, "abc", strlen("abc"))) != 0) { + *result = rv; + nng_msg_free(msg); + nng_close(s); + return; + } + + for (i = 0; i < count; i++) { + // Sleep for up to a 1ms before issuing requests to + // avoid saturating the CPU with bazillions of requests at + // the same time. + nng_usleep(rand() % 1000); + + // Reusing the same message causes problems as a result of + // header reuse. + if ((rv = nng_msg_alloc(&msg, 0)) != 0) { + *result = rv; + nng_close(s); + return; + } + + if ((rv = nng_sendmsg(s, msg, 0)) != 0) { + *result = rv; + nng_msg_free(msg); + nng_close(s); + return; + } + + if ((rv = nng_recvmsg(s, &msg, 0)) != 0) { + *result = rv; + nng_close(s); + return; + } + + nng_msg_free(msg); + } + + nng_close(s); + *result = 0; +} + +void +serve(void *arg) +{ + nng_socket rep = *(nng_socket *)arg; + nng_msg *msg; + + for (;;) { + if (nng_recvmsg(rep, &msg, 0) != 0) { + nng_close(rep); + return; + } + + if (nng_sendmsg(rep, msg, 0) != 0) { + nng_close(rep); + return; + } + } +} + +Main({ + int rv; + void **clients; + void *server; + int *results; + + clients = calloc(nthrs, sizeof (void *)); + results = calloc(nthrs, sizeof (int)); + + Test("Scalability", { + + Convey("Given a server socket", { + nng_socket rep; + int depth = 256; + + So(nng_open(&rep, NNG_PROTO_REP) == 0); + + Reset({ + nng_close(rep); + }) + + So(nng_setopt(rep, NNG_OPT_RCVBUF, &depth, sizeof (depth)) == 0); + So(nng_setopt(rep, NNG_OPT_SNDBUF, &depth, sizeof (depth)) == 0); + So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); + + So(nng_thread_create(&server, serve, &rep) == 0); + + nng_usleep(100000); + + Convey("We can run many many clients", { + int fails = 0; + int i; + for (i = 0; i < nthrs; i++) { + if ((rv = nng_thread_create(&clients[i], client, &results[i])) != 0) { + printf("thread create failed: %s", nng_strerror(rv)); + break; + } + } + So(i == nthrs); + + for (i = 0; i < nthrs; i++) { + nng_thread_destroy(clients[i]); + fails += (results[i] == 0 ? 0 : 1); + if (results[i] != 0) { + printf("%d (%d): %s\n", + fails, i, + nng_strerror(results[i])); + } + } + So(fails == 0); + + nng_shutdown(rep); + + nng_thread_destroy(server); + }) + }) + + }) +}) |
