diff options
| -rw-r--r-- | src/core/platform.h | 15 | ||||
| -rw-r--r-- | src/core/pollable.c | 103 | ||||
| -rw-r--r-- | src/core/pollable.h | 16 | ||||
| -rw-r--r-- | src/platform/posix/posix_atomic.c | 27 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 8 | ||||
| -rw-r--r-- | src/protocol/pair1/pair.c | 6 | ||||
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 2 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 174 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 39 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep_test.c | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep0/req.c | 392 | ||||
| -rw-r--r-- | src/protocol/survey0/respond.c | 47 | ||||
| -rw-r--r-- | src/protocol/survey0/survey.c | 27 |
13 files changed, 453 insertions, 405 deletions
diff --git a/src/core/platform.h b/src/core/platform.h index 355ef7eb..53d25137 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -1,5 +1,5 @@ // -// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -189,13 +189,18 @@ extern uint64_t nni_atomic_swap64(nni_atomic_u64 *, uint64_t); extern uint64_t nni_atomic_dec64_nv(nni_atomic_u64 *); extern void nni_atomic_inc64(nni_atomic_u64 *); +// nni_atomic_cas64 is a compare and swap. The second argument is the +// value to compare against, and the third is the new value. Returns +// true if the value was set. +extern bool nni_atomic_cas64(nni_atomic_u64 *, uint64_t, uint64_t); + // // Clock Support // // nn_plat_clock returns a number of milliseconds since some arbitrary time // in the past. The values returned by nni_clock must use the same base -// as the times used in nni_plat_cond_waituntil. The nni_plat_clock() must +// as the times used in nni_plat_cond_until. The nni_plat_clock() must // return values > 0, and must return values smaller than 2^63. (We could // relax this last constraint, but there is no reason to, and leaves us the // option of using negative values for other purposes in the future.) @@ -213,9 +218,9 @@ uint32_t nni_random(void); // nni_plat_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, -// and threadsafe, and will be called before any other API calls, and +// and thread-safe, and will be called before any other API calls, and // may be called at any point thereafter. It is permitted to return -// an error if some critical failure inializing the platform occurs, +// an error if some critical failure initializing the platform occurs, // but once this succeeds, all future calls must succeed as well, unless // nni_plat_fini has been called. // @@ -274,7 +279,7 @@ extern int nni_tcp_dialer_getopt( extern int nni_tcp_listener_init(nni_tcp_listener **); // nni_tcp_listener_fini frees the listener and all associated resources. -// It implictly closes the listener as well. +// It implicitly closes the listener as well. extern void nni_tcp_listener_fini(nni_tcp_listener *); // nni_tcp_listener_close closes the listener. This will unbind diff --git a/src/core/pollable.c b/src/core/pollable.c index a121ba3f..fb6af0f5 100644 --- a/src/core/pollable.c +++ b/src/core/pollable.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -10,13 +10,33 @@ #include "core/nng_impl.h" -struct nni_pollable { - int p_rfd; - int p_wfd; - nni_mtx p_lock; - bool p_raised; - bool p_open; -}; +// We pack the wfd and rfd into a uint64_t so that we can update the pair +// atomically and use nni_atomic_cas64, to be lock free. +#define WFD(fds) ((int) ((fds) &0xffffffffu)) +#define RFD(fds) ((int) (((fds) >> 32u) & 0xffffffffu)) +#define FD_JOIN(wfd, rfd) ((uint64_t)(wfd) + ((uint64_t)(rfd) << 32u)) + +void +nni_pollable_init(nni_pollable *p) +{ + nni_atomic_init_bool(&p->p_raised); + nni_atomic_set64(&p->p_fds, (uint64_t) -1); +} + +void +nni_pollable_fini(nni_pollable *p) +{ + uint64_t fds; + + fds = nni_atomic_get64(&p->p_fds); + if (fds != (uint64_t) -1) { + int rfd, wfd; + // Read in the high order, write in the low order. + rfd = RFD(fds); + wfd = WFD(fds); + nni_plat_pipe_close(rfd, wfd); + } +} int nni_pollable_alloc(nni_pollable **pp) @@ -25,9 +45,7 @@ nni_pollable_alloc(nni_pollable **pp) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - p->p_open = false; - p->p_raised = false; - nni_mtx_init(&p->p_lock); + nni_pollable_init(p); *pp = p; return (0); } @@ -38,10 +56,7 @@ nni_pollable_free(nni_pollable *p) if (p == NULL) { return; } - if (p->p_open) { - nni_plat_pipe_close(p->p_rfd, p->p_wfd); - } - nni_mtx_fini(&p->p_lock); + nni_pollable_fini(p); NNI_FREE_STRUCT(p); } @@ -51,16 +66,12 @@ nni_pollable_raise(nni_pollable *p) if (p == NULL) { return; } - nni_mtx_lock(&p->p_lock); - if (!p->p_raised) { - p->p_raised = true; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_raise(p->p_wfd); - return; + if (!nni_atomic_swap_bool(&p->p_raised, true)) { + uint64_t fds; + if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) { + nni_plat_pipe_raise(WFD(fds)); } } - nni_mtx_unlock(&p->p_lock); } void @@ -69,16 +80,12 @@ nni_pollable_clear(nni_pollable *p) if (p == NULL) { return; } - nni_mtx_lock(&p->p_lock); - if (p->p_raised) { - p->p_raised = false; - if (p->p_open) { - nni_mtx_unlock(&p->p_lock); - nni_plat_pipe_clear(p->p_rfd); - return; + if (nni_atomic_swap_bool(&p->p_raised, false)) { + uint64_t fds; + if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) { + nni_plat_pipe_clear(RFD(fds)); } } - nni_mtx_unlock(&p->p_lock); } int @@ -87,19 +94,31 @@ nni_pollable_getfd(nni_pollable *p, int *fdp) if (p == NULL) { return (NNG_EINVAL); } - nni_mtx_lock(&p->p_lock); - if (!p->p_open) { - int rv; - if ((rv = nni_plat_pipe_open(&p->p_wfd, &p->p_rfd)) != 0) { - nni_mtx_unlock(&p->p_lock); + + for (;;) { + int rfd; + int wfd; + int rv; + uint64_t fds; + + if ((fds = nni_atomic_get64(&p->p_fds)) != (uint64_t) -1) { + *fdp = RFD(fds); + return (0); + } + if ((rv = nni_plat_pipe_open(&wfd, &rfd)) != 0) { return (rv); } - p->p_open = true; - if (p->p_raised) { - nni_plat_pipe_raise(p->p_wfd); + fds = FD_JOIN(wfd, rfd); + + if (nni_atomic_cas64(&p->p_fds, (uint64_t) -1, fds)) { + if (nni_atomic_get_bool(&p->p_raised)) { + nni_plat_pipe_raise(wfd); + } + *fdp = rfd; + return (0); } + + // Someone beat us. Close ours, and try again. + nni_plat_pipe_close(wfd, rfd); } - nni_mtx_unlock(&p->p_lock); - *fdp = p->p_rfd; - return (0); } diff --git a/src/core/pollable.h b/src/core/pollable.h index 50ec9bf6..a71a9693 100644 --- a/src/core/pollable.h +++ b/src/core/pollable.h @@ -1,5 +1,5 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -13,8 +13,6 @@ #include "core/defs.h" #include "core/list.h" -// For the sake of simplicity, we just maintain a single global timer thread. - typedef struct nni_pollable nni_pollable; extern int nni_pollable_alloc(nni_pollable **); @@ -23,4 +21,16 @@ extern void nni_pollable_raise(nni_pollable *); extern void nni_pollable_clear(nni_pollable *); extern int nni_pollable_getfd(nni_pollable *, int *); +// nni_pollable implementation details are private. Only here for inlining. +// We have joined to the write and read file descriptors into a a single +// atomic 64 so we can update them together (and we can use cas to be sure +// that such updates are always safe.) +struct nni_pollable { + nni_atomic_u64 p_fds; + nni_atomic_bool p_raised; +}; + +extern void nni_pollable_init(nni_pollable *); +extern void nni_pollable_fini(nni_pollable *); + #endif // CORE_POLLABLE_H diff --git a/src/platform/posix/posix_atomic.c b/src/platform/posix/posix_atomic.c index b1183865..0d866071 100644 --- a/src/platform/posix/posix_atomic.c +++ b/src/platform/posix/posix_atomic.c @@ -17,6 +17,7 @@ #ifdef NNG_HAVE_STDATOMIC #include <stdatomic.h> + bool nni_atomic_flag_test_and_set(nni_atomic_flag *f) { @@ -45,7 +46,6 @@ bool nni_atomic_swap_bool(nni_atomic_bool *v, bool b) { return (atomic_exchange(&v->v, b)); - } void @@ -108,6 +108,16 @@ nni_atomic_dec64_nv(nni_atomic_u64 *v) return (ov - 1); } +bool +nni_atomic_cas64(nni_atomic_u64 *v, uint64_t comp, uint64_t new) +{ + // It's possible that uint_fast64_t is not the same type underneath + // as uint64_t. (Would be unusual!) + uint_fast64_t cv = (uint_fast64_t) comp; + uint_fast64_t nv = (uint_fast64_t) new; + return (atomic_compare_exchange_strong(&v->v, &cv, nv)); +} + #else #include <pthread.h> @@ -156,7 +166,7 @@ nni_atomic_swap_bool(nni_atomic_bool *b, bool n) { bool v; pthread_mutex_lock(&plat_atomic_lock); - v = b->b; + v = b->b; b->b = n; pthread_mutex_unlock(&plat_atomic_lock); return (v); @@ -238,6 +248,19 @@ nni_atomic_dec64_nv(nni_atomic_u64 *v) return (nv); } +bool +nni_atomic_cas64(nni_atomic_u64 *v, uint64_t comp, uint64_t new) +{ + bool result = false; + pthread_mutex_lock(&plat_atomic_lock); + if (v->v == comp) { + v->v = new; + result = true; + } + pthread_mutex_unlock(&plat_atomic_lock); + return (result); +} + #endif #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index 70ba73cb..0b77e078 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -213,6 +213,14 @@ nni_atomic_dec64_nv(nni_atomic_u64 *v) #endif } +bool +nni_atomic_cas64(nni_atomic_u64 *v, uint64_t comp, uint64_t new) +{ + uint64_t old; + old = InterlockedCompareExchange64(&v->v, (LONG64)new, (LONG64)comp); + return (old == comp); +} + static unsigned int __stdcall nni_plat_thr_main(void *arg) { nni_plat_thr *thr = arg; diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index df24d77b..2838cb5d 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -213,7 +213,7 @@ pair1_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); // Schedule a getq. In polyamorous mode we get on the per pipe - // sendq, as the socket distributes to us. In monogamous mode + // send_queue, as the socket distributes to us. In monogamous mode // we bypass and get from the upper writeq directly (saving a // set of context switches). if (s->poly) { @@ -343,7 +343,7 @@ pair1_sock_getq_cb(void *arg) // Try a non-blocking send. If this fails we just discard the // message. We have to do this to avoid head-of-line blocking // for messages sent to other pipes. Note that there is some - // buffering in the sendq. + // buffering in the send_queue. if (nni_msgq_tryput(p->sendq, msg) != 0) { nni_msg_free(msg); } @@ -426,7 +426,7 @@ pair1_pipe_send_cb(void *arg) return; } - // In polyamorous mode, we want to get from the sendq; in + // In polyamorous mode, we want to get from the send_queue; in // monogamous we get from upper writeq. nni_msgq_aio_get(s->poly ? p->sendq : s->uwq, p->aio_getq); } diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 22195c52..a42e95ff 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -289,7 +289,7 @@ pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t) int fd; int rv; nni_mtx_lock(&sock->mtx); - // PUB sockets are *always* sendable. + // PUB sockets are *always* writable. nni_pollable_raise(sock->sendable); rv = nni_pollable_getfd(sock->sendable, &fd); nni_mtx_unlock(&sock->mtx); diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 0a367216..56da98f8 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -29,10 +29,10 @@ #endif // By default we accept 128 messages. -#define SUB0_DEFAULT_QLEN 128 +#define SUB0_DEFAULT_RECV_BUF_LEN 128 // By default, prefer new messages when the queue is full. -#define SUB0_DEFAULT_PREFNEW true +#define SUB0_DEFAULT_PREFER_NEW true typedef struct sub0_pipe sub0_pipe; typedef struct sub0_sock sub0_sock; @@ -53,21 +53,21 @@ struct sub0_topic { struct sub0_ctx { nni_list_node node; sub0_sock * sock; - nni_list topics; // XXX: Consider replacing with patricia trie - nni_list raios; // sub context could have multiple pending recvs + nni_list topics; // TODO: Consider patricia trie + nni_list recv_queue; // can have multiple pending receives bool closed; nni_lmq lmq; - bool prefnew; + bool prefer_new; }; // sub0_sock is our per-socket protocol private structure. struct sub0_sock { - nni_pollable *recvable; - sub0_ctx ctx; // default context - nni_list ctxs; // all contexts - size_t recvbuflen; - bool prefnew; - nni_mtx lk; + nni_pollable readable; + sub0_ctx master; // default context + nni_list contexts; // all contexts + size_t recv_buf_len; + bool prefer_new; + nni_mtx lk; }; // sub0_pipe is our per-pipe protocol private structure. @@ -83,8 +83,8 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; nni_mtx_lock(&sock->lk); - if (nni_list_active(&ctx->raios, aio)) { - nni_list_remove(&ctx->raios, aio); + if (nni_list_active(&ctx->recv_queue, aio)) { + nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&sock->lk); @@ -116,15 +116,15 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - nni_list_append(&ctx->raios, aio); + nni_list_append(&ctx->recv_queue, aio); nni_mtx_unlock(&sock->lk); return; } (void) nni_lmq_getq(&ctx->lmq, &msg); - if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->ctx)) { - nni_pollable_clear(sock->recvable); + if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { + nni_pollable_clear(&sock->readable); } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); @@ -149,8 +149,8 @@ sub0_ctx_close(void *arg) nni_mtx_lock(&sock->lk); ctx->closed = true; - while ((aio = nni_list_first(&ctx->raios)) != NULL) { - nni_list_remove(&ctx->raios, aio); + while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&sock->lk); @@ -166,7 +166,7 @@ sub0_ctx_fini(void *arg) sub0_ctx_close(ctx); nni_mtx_lock(&sock->lk); - nni_list_remove(&sock->ctxs, ctx); + nni_list_remove(&sock->contexts, ctx); nni_mtx_unlock(&sock->lk); while ((topic = nni_list_first(&ctx->topics)) != 0) { @@ -179,29 +179,29 @@ sub0_ctx_fini(void *arg) } static int -sub0_ctx_init(void *carg, void *sarg) +sub0_ctx_init(void *ctx_arg, void *sock_arg) { - sub0_sock *sock = sarg; - sub0_ctx * ctx = carg; + sub0_sock *sock = sock_arg; + sub0_ctx * ctx = ctx_arg; size_t len; - bool prefnew; + bool prefer_new; int rv; nni_mtx_lock(&sock->lk); - len = sock->recvbuflen; - prefnew = sock->prefnew; + len = sock->recv_buf_len; + prefer_new = sock->prefer_new; if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { return (rv); } - ctx->prefnew = prefnew; + ctx->prefer_new = prefer_new; - nni_aio_list_init(&ctx->raios); + nni_aio_list_init(&ctx->recv_queue); NNI_LIST_INIT(&ctx->topics, sub0_topic, node); ctx->sock = sock; - nni_list_append(&sock->ctxs, ctx); + nni_list_append(&sock->contexts, ctx); nni_mtx_unlock(&sock->lk); return (0); @@ -212,28 +212,26 @@ sub0_sock_fini(void *arg) { sub0_sock *sock = arg; - sub0_ctx_fini(&sock->ctx); - if (sock->recvable != NULL) { - nni_pollable_free(sock->recvable); - } + sub0_ctx_fini(&sock->master); + nni_pollable_fini(&sock->readable); nni_mtx_fini(&sock->lk); } static int -sub0_sock_init(void *arg, nni_sock *nsock) +sub0_sock_init(void *arg, nni_sock *unused) { sub0_sock *sock = arg; int rv; - NNI_ARG_UNUSED(nsock); + NNI_ARG_UNUSED(unused); - NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node); + NNI_LIST_INIT(&sock->contexts, sub0_ctx, node); nni_mtx_init(&sock->lk); - sock->recvbuflen = SUB0_DEFAULT_QLEN; - sock->prefnew = SUB0_DEFAULT_PREFNEW; + sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN; + sock->prefer_new = SUB0_DEFAULT_PREFER_NEW; + nni_pollable_init(&sock->readable); - if (((rv = sub0_ctx_init(&sock->ctx, sock)) != 0) || - ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) { + if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) { sub0_sock_fini(sock); return (rv); } @@ -251,7 +249,7 @@ static void sub0_sock_close(void *arg) { sub0_sock *sock = arg; - sub0_ctx_close(&sock->ctx); + sub0_ctx_close(&sock->master); } static void @@ -357,10 +355,10 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. - NNI_LIST_FOREACH (&sock->ctxs, ctx) { + NNI_LIST_FOREACH (&sock->contexts, ctx) { nni_msg *dup; - if (nni_lmq_full(&ctx->lmq) && !ctx->prefnew) { + if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. continue; } @@ -371,7 +369,7 @@ sub0_recv_cb(void *arg) // Special optimization (for the case where only one context), // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->ctxs)) { + if (ctx == nni_list_last(&sock->contexts)) { dup = msg; msg = NULL; } else if (nni_msg_dup(&dup, msg) != 0) { @@ -382,9 +380,9 @@ sub0_recv_cb(void *arg) // message and it is intended for us. submatch = true; - if (!nni_list_empty(&ctx->raios)) { - aio = nni_list_first(&ctx->raios); - nni_list_remove(&ctx->raios, aio); + if (!nni_list_empty(&ctx->recv_queue)) { + aio = nni_list_first(&ctx->recv_queue); + nni_list_remove(&ctx->recv_queue, aio); nni_aio_set_msg(aio, dup); // Save for synchronous completion @@ -414,14 +412,14 @@ sub0_recv_cb(void *arg) } if (submatch) { - nni_pollable_raise(sock->recvable); + nni_pollable_raise(&sock->readable); } nni_pipe_recv(p->pipe, p->aio_recv); } static int -sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) +sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; @@ -434,7 +432,7 @@ sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) } static int -sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; @@ -452,8 +450,8 @@ sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) // If we change the socket, then this will change the queue for // any new contexts. (Previously constructed contexts are unaffected.) - if (&sock->ctx == ctx) { - sock->recvbuflen = (size_t) val; + if (&sock->master == ctx) { + sock->recv_buf_len = (size_t) val; } nni_mtx_unlock(&sock->lk); return (0); @@ -470,7 +468,7 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) sub0_ctx * ctx = arg; sub0_sock * sock = ctx->sock; sub0_topic *topic; - sub0_topic *newtopic; + sub0_topic *new_topic; NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); @@ -484,18 +482,18 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) return (0); } } - if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { + if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) { nni_mtx_unlock(&sock->lk); return (NNG_ENOMEM); } - if ((sz > 0) && ((newtopic->buf = nni_alloc(sz)) == NULL)) { + if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) { nni_mtx_unlock(&sock->lk); - NNI_FREE_STRUCT(newtopic); + NNI_FREE_STRUCT(new_topic); return (NNG_ENOMEM); } - memcpy(newtopic->buf, buf, sz); - newtopic->len = sz; - nni_list_append(&ctx->topics, newtopic); + memcpy(new_topic->buf, buf, sz); + new_topic->len = sz; + nni_list_append(&ctx->topics, new_topic); nni_mtx_unlock(&sock->lk); return (0); } @@ -547,21 +545,21 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) } static int -sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) +sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; bool val; nni_mtx_lock(&sock->lk); - val = ctx->prefnew; + val = ctx->prefer_new; nni_mtx_unlock(&sock->lk); return (nni_copyout_bool(val, buf, szp, t)); } static int -sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; @@ -573,9 +571,9 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) } nni_mtx_lock(&sock->lk); - ctx->prefnew = val; - if (&sock->ctx == ctx) { - sock->prefnew = val; + ctx->prefer_new = val; + if (&sock->master == ctx) { + sock->prefer_new = val; } nni_mtx_unlock(&sock->lk); @@ -585,8 +583,8 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) static nni_option sub0_ctx_options[] = { { .o_name = NNG_OPT_RECVBUF, - .o_get = sub0_ctx_get_recvbuf, - .o_set = sub0_ctx_set_recvbuf, + .o_get = sub0_ctx_get_recv_buf_len, + .o_set = sub0_ctx_set_recv_buf_len, }, { .o_name = NNG_OPT_SUB_SUBSCRIBE, @@ -598,8 +596,8 @@ static nni_option sub0_ctx_options[] = { }, { .o_name = NNG_OPT_SUB_PREFNEW, - .o_get = sub0_ctx_get_prefnew, - .o_set = sub0_ctx_set_prefnew, + .o_get = sub0_ctx_get_prefer_new, + .o_set = sub0_ctx_set_prefer_new, }, { .o_name = NULL, @@ -620,62 +618,62 @@ sub0_sock_recv(void *arg, nni_aio *aio) { sub0_sock *sock = arg; - sub0_ctx_recv(&sock->ctx, aio); + sub0_ctx_recv(&sock->master, aio); } static int -sub0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { sub0_sock *sock = arg; int rv; int fd; - if ((rv = nni_pollable_getfd(sock->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); } static int -sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) +sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_get_recvbuf(&sock->ctx, buf, szp, t)); + return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t)); } static int -sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) +sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_set_recvbuf(&sock->ctx, buf, sz, t)); + return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); } static int sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_subscribe(&sock->ctx, buf, sz, t)); + return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); } static int sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_unsubscribe(&sock->ctx, buf, sz, t)); + return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); } static int -sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) +sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_get_prefnew(&sock->ctx, buf, szp, t)); + return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t)); } static int -sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) +sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_set_prefnew(&sock->ctx, buf, sz, t)); + return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t)); } // This is the global protocol structure -- our linkage to the core. @@ -709,17 +707,17 @@ static nni_option sub0_sock_options[] = { }, { .o_name = NNG_OPT_RECVFD, - .o_get = sub0_sock_get_recvfd, + .o_get = sub0_sock_get_recv_fd, }, { .o_name = NNG_OPT_RECVBUF, - .o_get = sub0_sock_get_recvbuf, - .o_set = sub0_sock_set_recvbuf, + .o_get = sub0_sock_get_recv_buf_len, + .o_set = sub0_sock_set_recv_buf_len, }, { .o_name = NNG_OPT_SUB_PREFNEW, - .o_get = sub0_sock_get_prefnew, - .o_set = sub0_sock_set_prefnew, + .o_get = sub0_sock_get_prefer_new, + .o_set = sub0_sock_set_prefer_new, }, // terminate list { @@ -749,7 +747,7 @@ static nni_proto sub0_proto = { }; int -nng_sub0_open(nng_socket *sidp) +nng_sub0_open(nng_socket *sock) { - return (nni_proto_open(sidp, &sub0_proto)); + return (nni_proto_open(sock, &sub0_proto)); } diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 328babbc..a715ab59 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -36,13 +36,13 @@ static void rep0_pipe_fini(void *); struct rep0_ctx { rep0_sock * sock; - size_t btrace_len; uint32_t pipe_id; rep0_pipe * spipe; // send pipe nni_aio * saio; // send aio nni_aio * raio; // recv aio nni_list_node sqnode; nni_list_node rqnode; + size_t btrace_len; uint32_t btrace[256]; // backtrace buffer }; @@ -54,8 +54,8 @@ struct rep0_sock { nni_list recvpipes; // list of pipes with data to receive nni_list recvq; rep0_ctx ctx; - nni_pollable *recvable; - nni_pollable *sendable; + nni_pollable readable; + nni_pollable writable; }; // rep0_pipe is our per-pipe protocol private structure. @@ -167,7 +167,7 @@ rep0_ctx_send(void *arg, nni_aio *aio) // to send on the socket (root context). That's because // we will have finished (successfully or otherwise) the // reply for the single request we got. - nni_pollable_clear(s->sendable); + nni_pollable_clear(&s->writable); } if (len == 0) { @@ -220,8 +220,8 @@ rep0_sock_fini(void *arg) nni_idhash_fini(s->pipes); rep0_ctx_fini(&s->ctx); - nni_pollable_free(s->sendable); - nni_pollable_free(s->recvable); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->lk); } @@ -246,13 +246,10 @@ rep0_sock_init(void *arg, nni_sock *sock) (void) rep0_ctx_init(&s->ctx, s); - // We start off without being either readable or pollable. + // We start off without being either readable or writable. // Readability comes when there is something on the socket. - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - rep0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); return (0); } @@ -331,7 +328,7 @@ rep0_pipe_start(void *arg) return (rv); } // By definition, we have not received a request yet on this pipe, - // so it cannot cause us to become sendable. + // so it cannot cause us to become writable. nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -367,7 +364,7 @@ rep0_pipe_close(void *arg) if (p->id == s->ctx.pipe_id) { // We "can" send. (Well, not really, but we will happily // accept a message and discard it.) - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); nni_mtx_unlock(&s->lk); @@ -395,7 +392,7 @@ rep0_pipe_send_cb(void *arg) // Nothing else to send. if (p->id == s->ctx.pipe_id) { // Mark us ready for the other side to send! - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->lk); return; @@ -469,11 +466,11 @@ rep0_ctx_recv(void *arg, nni_aio *aio) nni_aio_set_msg(p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { - nni_pollable_clear(s->recvable); + nni_pollable_clear(&s->readable); } nni_pipe_recv(p->pipe, p->aio_recv); if ((ctx == &s->ctx) && !p->busy) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } len = nni_msg_header_len(msg); @@ -547,7 +544,7 @@ rep0_pipe_recv_cb(void *arg) if ((ctx = nni_list_first(&s->recvq)) == NULL) { // No one waiting to receive yet, holding pattern. nni_list_append(&s->recvpipes, p); - nni_pollable_raise(s->recvable); + nni_pollable_raise(&s->readable); nni_mtx_unlock(&s->lk); return; } @@ -557,7 +554,7 @@ rep0_pipe_recv_cb(void *arg) ctx->raio = NULL; nni_aio_set_msg(p->aio_recv, NULL); if ((ctx == &s->ctx) && !p->busy) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } // schedule another receive @@ -603,7 +600,7 @@ rep0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -616,7 +613,7 @@ rep0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 5fe9cb17..879d6ae4 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -74,7 +74,7 @@ test_rep_poll_writeable(void) // Still not writable. TEST_CHECK(testutil_pollfd(fd) == false); - // If we get a job, *then* we become writeable + // If we get a job, *then* we become writable TEST_NNG_SEND_STR(req, "abc"); TEST_NNG_RECV_STR(rep, "abc"); TEST_CHECK(testutil_pollfd(fd) == true); diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 4326f411..33629abc 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -29,7 +29,7 @@ typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; typedef struct req0_ctx req0_ctx; -static void req0_run_sendq(req0_sock *, nni_list *); +static void req0_run_send_queue(req0_sock *, nni_list *); static void req0_ctx_reset(req0_ctx *); static void req0_ctx_timeout(void *); static void req0_pipe_fini(void *); @@ -40,35 +40,35 @@ static int req0_ctx_init(void *, void *); // socket, but keeps track of its own outstanding replays, the request ID, // and so forth. struct req0_ctx { - nni_list_node snode; - nni_list_node sqnode; // node on the sendq - nni_list_node pnode; // node on the pipe list - uint32_t reqid; req0_sock * sock; - nni_aio * raio; // user aio waiting to receive - only one! - nni_aio * saio; - nng_msg * reqmsg; // request message - size_t reqlen; - nng_msg * repmsg; // reply message + nni_list_node sock_node; // node on the socket context list + nni_list_node send_node; // node on the send_queue + nni_list_node pipe_node; // node on the pipe list + uint32_t request_id; // request ID, without high bit set + nni_aio * recv_aio; // user aio waiting to recv - only one! + nni_aio * send_aio; // user aio waiting to send + nng_msg * req_msg; // request message + size_t req_len; // length of request message (for stats) + nng_msg * rep_msg; // reply message nni_timer_node timer; nni_duration retry; }; // A req0_sock is our per-socket protocol private structure. struct req0_sock { - nni_duration retry; - bool closed; - int ttl; - req0_ctx ctx; // base socket ctx - nni_list readypipes; - nni_list busypipes; - nni_list stoppipes; - nni_list ctxs; - nni_list sendq; // contexts waiting to send. - nni_idhash * reqids; // contexts by request ID - nni_pollable *recvable; - nni_pollable *sendable; - nni_mtx mtx; + nni_duration retry; + bool closed; + int ttl; + req0_ctx master; // base socket master + nni_list ready_pipes; + nni_list busy_pipes; + nni_list stop_pipes; + nni_list contexts; + nni_list send_queue; // contexts waiting to send. + nni_idhash * requests; // contexts by request ID + nni_pollable readable; + nni_pollable writable; + nni_mtx mtx; }; // A req0_pipe is our per-pipe protocol private structure. @@ -76,7 +76,7 @@ struct req0_pipe { nni_pipe * pipe; req0_sock * req; nni_list_node node; - nni_list ctxs; // ctxs with pending traffic + nni_list contexts; // contexts with pending traffic bool closed; nni_aio * aio_send; nni_aio * aio_recv; @@ -94,7 +94,7 @@ req0_sock_init(void *arg, nni_sock *sock) NNI_ARG_UNUSED(sock); - if ((rv = nni_idhash_init(&s->reqids)) != 0) { + if ((rv = nni_idhash_init(&s->requests)) != 0) { return (rv); } @@ -102,26 +102,23 @@ req0_sock_init(void *arg, nni_sock *sock) // We start at a random point, to minimize likelihood of // accidental collision across restarts. nni_idhash_set_limits( - s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); + s->requests, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); nni_mtx_init(&s->mtx); - NNI_LIST_INIT(&s->readypipes, req0_pipe, node); - NNI_LIST_INIT(&s->busypipes, req0_pipe, node); - NNI_LIST_INIT(&s->stoppipes, req0_pipe, node); - NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode); - NNI_LIST_INIT(&s->ctxs, req0_ctx, snode); + NNI_LIST_INIT(&s->ready_pipes, req0_pipe, node); + NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node); + NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node); + NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node); + NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node); // this is "semi random" start for request IDs. s->retry = NNI_SECOND * 60; - (void) req0_ctx_init(&s->ctx, s); + (void) req0_ctx_init(&s->master, s); - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - req0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); s->ttl = 8; return (0); @@ -141,10 +138,10 @@ req0_sock_close(void *arg) nni_mtx_lock(&s->mtx); s->closed = true; - NNI_LIST_FOREACH (&s->ctxs, ctx) { - if (ctx->raio != NULL) { - nni_aio_finish_error(ctx->raio, NNG_ECLOSED); - ctx->raio = NULL; + NNI_LIST_FOREACH (&s->contexts, ctx) { + if (ctx->recv_aio != NULL) { + nni_aio_finish_error(ctx->recv_aio, NNG_ECLOSED); + ctx->recv_aio = NULL; req0_ctx_reset(ctx); } } @@ -157,15 +154,15 @@ req0_sock_fini(void *arg) req0_sock *s = arg; nni_mtx_lock(&s->mtx); - NNI_ASSERT(nni_list_empty(&s->busypipes)); - NNI_ASSERT(nni_list_empty(&s->stoppipes)); - NNI_ASSERT(nni_list_empty(&s->readypipes)); + NNI_ASSERT(nni_list_empty(&s->busy_pipes)); + NNI_ASSERT(nni_list_empty(&s->stop_pipes)); + NNI_ASSERT(nni_list_empty(&s->ready_pipes)); nni_mtx_unlock(&s->mtx); - req0_ctx_fini(&s->ctx); - nni_pollable_free(s->recvable); - nni_pollable_free(s->sendable); - nni_idhash_fini(s->reqids); + req0_ctx_fini(&s->master); + nni_pollable_fini(&s->readable); + nni_pollable_fini(&s->writable); + nni_idhash_fini(s->requests); nni_mtx_fini(&s->mtx); } @@ -204,7 +201,7 @@ req0_pipe_init(void *arg, nni_pipe *pipe, void *s) } NNI_LIST_NODE_INIT(&p->node); - NNI_LIST_INIT(&p->ctxs, req0_ctx, pnode); + NNI_LIST_INIT(&p->contexts, req0_ctx, pipe_node); p->pipe = pipe; p->req = s; return (0); @@ -225,9 +222,9 @@ req0_pipe_start(void *arg) nni_mtx_unlock(&s->mtx); return (NNG_ECLOSED); } - nni_list_append(&s->readypipes, p); - nni_pollable_raise(s->sendable); - req0_run_sendq(s, NULL); + nni_list_append(&s->ready_pipes, p); + nni_pollable_raise(&s->writable); + req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); nni_pipe_recv(p->pipe, p->aio_recv); @@ -245,22 +242,22 @@ req0_pipe_close(void *arg) nni_aio_close(p->aio_send); nni_mtx_lock(&s->mtx); - // This removes the node from either busypipes or readypipes. + // This removes the node from either busy_pipes or ready_pipes. // It doesn't much matter which. We stick the pipe on the stop // list, so that we can wait for that to close down safely. p->closed = true; nni_list_node_remove(&p->node); - nni_list_append(&s->stoppipes, p); - if (nni_list_empty(&s->readypipes)) { - nni_pollable_clear(s->sendable); + nni_list_append(&s->stop_pipes, p); + if (nni_list_empty(&s->ready_pipes)) { + nni_pollable_clear(&s->writable); } - while ((ctx = nni_list_first(&p->ctxs)) != NULL) { - nni_list_remove(&p->ctxs, ctx); + while ((ctx = nni_list_first(&p->contexts)) != NULL) { + nni_list_remove(&p->contexts, ctx); // Reset the timer on this so it expires immediately. // This is actually easier than canceling the timer and - // running the sendq separately. (In particular, it avoids - // a potential deadlock on cancelling the timer.) + // running the send_queue separately. (In particular, it + // avoids a potential deadlock on cancelling the timer.) nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); } nni_mtx_unlock(&s->mtx); @@ -268,7 +265,7 @@ req0_pipe_close(void *arg) // For cooked mode, we use a context, and send out that way. This // completely bypasses the upper write queue. Each context keeps one -// message pending; these are "scheduled" via the sendq. The sendq +// message pending; these are "scheduled" via the send_queue. The send_queue // is ordered, so FIFO ordering between contexts is provided for. static void @@ -277,9 +274,9 @@ req0_send_cb(void *arg) req0_pipe *p = arg; req0_sock *s = p->req; nni_aio * aio; - nni_list aios; + nni_list send_list; - nni_aio_list_init(&aios); + nni_aio_list_init(&send_list); if (nni_aio_result(p->aio_send) != 0) { // We failed to send... clean up and deal with it. nni_msg_free(nni_aio_get_msg(p->aio_send)); @@ -289,7 +286,7 @@ req0_send_cb(void *arg) } // We completed a cooked send, so we need to reinsert ourselves - // in the ready list, and re-run the sendq. + // in the ready list, and re-run the send_queue. nni_mtx_lock(&s->mtx); if (p->closed || s->closed) { @@ -298,16 +295,16 @@ req0_send_cb(void *arg) nni_mtx_unlock(&s->mtx); return; } - nni_list_remove(&s->busypipes, p); - nni_list_append(&s->readypipes, p); - if (nni_list_empty(&s->sendq)) { - nni_pollable_raise(s->sendable); + nni_list_remove(&s->busy_pipes, p); + nni_list_append(&s->ready_pipes, p); + if (nni_list_empty(&s->send_queue)) { + nni_pollable_raise(&s->writable); } - req0_run_sendq(s, &aios); + req0_run_send_queue(s, &send_list); nni_mtx_unlock(&s->mtx); - while ((aio = nni_list_first(&aios)) != NULL) { - nni_list_remove(&aios, aio); + while ((aio = nni_list_first(&send_list)) != NULL) { + nni_list_remove(&send_list, aio); nni_aio_finish_synch(aio, 0, 0); } } @@ -350,8 +347,8 @@ req0_recv_cb(void *arg) nni_pipe_recv(p->pipe, p->aio_recv); // Look for a context to receive it. - if ((nni_idhash_find(s->reqids, id, (void **) &ctx) != 0) || - (ctx->saio != NULL) || (ctx->repmsg != NULL)) { + if ((nni_idhash_find(s->requests, id, (void **) &ctx) != 0) || + (ctx->send_aio != NULL) || (ctx->rep_msg != NULL)) { nni_mtx_unlock(&s->mtx); // No waiting context, we have not sent the request out to // the wire yet, or context already has a reply ready. @@ -361,25 +358,25 @@ req0_recv_cb(void *arg) } // We have our match, so we can remove this. - nni_list_node_remove(&ctx->sqnode); - nni_idhash_remove(s->reqids, id); - ctx->reqid = 0; - if (ctx->reqmsg != NULL) { - nni_msg_free(ctx->reqmsg); - ctx->reqmsg = NULL; + nni_list_node_remove(&ctx->send_node); + nni_idhash_remove(s->requests, id); + ctx->request_id = 0; + if (ctx->req_msg != NULL) { + nni_msg_free(ctx->req_msg); + ctx->req_msg = NULL; } // Is there an aio waiting for us? - if ((aio = ctx->raio) != NULL) { - ctx->raio = NULL; + if ((aio = ctx->recv_aio) != NULL) { + ctx->recv_aio = NULL; nni_mtx_unlock(&s->mtx); nni_aio_set_msg(aio, msg); nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); } else { // No AIO, so stash msg. Receive will pick it up later. - ctx->repmsg = msg; - if (ctx == &s->ctx) { - nni_pollable_raise(s->recvable); + ctx->rep_msg = msg; + if (ctx == &s->master) { + nni_pollable_raise(&s->readable); } nni_mtx_unlock(&s->mtx); } @@ -397,28 +394,28 @@ req0_ctx_timeout(void *arg) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if ((ctx->reqmsg != NULL) && (!s->closed)) { - if (!nni_list_node_active(&ctx->sqnode)) { - nni_list_append(&s->sendq, ctx); + if ((ctx->req_msg != NULL) && (!s->closed)) { + if (!nni_list_node_active(&ctx->send_node)) { + nni_list_append(&s->send_queue, ctx); } - req0_run_sendq(s, NULL); + req0_run_send_queue(s, NULL); } nni_mtx_unlock(&s->mtx); } static int -req0_ctx_init(void *carg, void *sarg) +req0_ctx_init(void *arg, void *sock) { - req0_sock *s = sarg; - req0_ctx * ctx = carg; + req0_sock *s = sock; + req0_ctx * ctx = arg; nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx); nni_mtx_lock(&s->mtx); - ctx->sock = s; - ctx->raio = NULL; - ctx->retry = s->retry; - nni_list_append(&s->ctxs, ctx); + ctx->sock = s; + ctx->recv_aio = NULL; + ctx->retry = s->retry; + nni_list_append(&s->contexts, ctx); nni_mtx_unlock(&s->mtx); return (0); @@ -432,18 +429,18 @@ req0_ctx_fini(void *arg) nni_aio * aio; nni_mtx_lock(&s->mtx); - if ((aio = ctx->raio) != NULL) { - ctx->raio = NULL; + if ((aio = ctx->recv_aio) != NULL) { + ctx->recv_aio = NULL; nni_aio_finish_error(aio, NNG_ECLOSED); } - if ((aio = ctx->saio) != NULL) { - ctx->saio = NULL; - nni_aio_set_msg(aio, ctx->reqmsg); - ctx->reqmsg = NULL; + if ((aio = ctx->send_aio) != NULL) { + ctx->send_aio = NULL; + nni_aio_set_msg(aio, ctx->req_msg); + ctx->req_msg = NULL; nni_aio_finish_error(aio, NNG_ECLOSED); } req0_ctx_reset(ctx); - nni_list_remove(&s->ctxs, ctx); + nni_list_remove(&s->contexts, ctx); nni_mtx_unlock(&s->mtx); nni_timer_cancel(&ctx->timer); @@ -451,31 +448,31 @@ req0_ctx_fini(void *arg) } static int -req0_ctx_set_resendtime(void *arg, const void *buf, size_t sz, nni_opt_type t) +req0_ctx_set_resend_time(void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_ctx *ctx = arg; return (nni_copyin_ms(&ctx->retry, buf, sz, t)); } static int -req0_ctx_get_resendtime(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_ctx_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_ctx *ctx = arg; return (nni_copyout_ms(ctx->retry, buf, szp, t)); } static void -req0_run_sendq(req0_sock *s, nni_list *aiolist) +req0_run_send_queue(req0_sock *s, nni_list *send_list) { req0_ctx *ctx; nni_aio * aio; // Note: This routine should be called with the socket lock held. - while ((ctx = nni_list_first(&s->sendq)) != NULL) { + while ((ctx = nni_list_first(&s->send_queue)) != NULL) { nni_msg * msg; req0_pipe *p; - if ((p = nni_list_first(&s->readypipes)) == NULL) { + if ((p = nni_list_first(&s->ready_pipes)) == NULL) { return; } @@ -484,18 +481,18 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist) // be dropped, we rely on the resend timer to pick it up. // We also notify the completion callback if this is the // first send attempt. - nni_list_remove(&s->sendq, ctx); + nni_list_remove(&s->send_queue, ctx); // Schedule a resubmit timer. We only do this if we got // a pipe to send to. Otherwise, we should get handled - // the next time that the sendq is run. We don't do this + // the next time that the send_queue is run. We don't do this // if the retry is "disabled" with NNG_DURATION_INFINITE. if (ctx->retry > 0) { nni_timer_schedule( &ctx->timer, nni_clock() + ctx->retry); } - if (nni_msg_dup(&msg, ctx->reqmsg) != 0) { + if (nni_msg_dup(&msg, ctx->req_msg) != 0) { // Oops. Well, keep trying each context; maybe // one of them will get lucky. continue; @@ -504,27 +501,27 @@ req0_run_sendq(req0_sock *s, nni_list *aiolist) // Put us on the pipe list of active contexts. // This gives the pipe a chance to kick a resubmit // if the pipe is removed. - nni_list_node_remove(&ctx->pnode); - nni_list_append(&p->ctxs, ctx); + nni_list_node_remove(&ctx->pipe_node); + nni_list_append(&p->contexts, ctx); - nni_list_remove(&s->readypipes, p); - nni_list_append(&s->busypipes, p); + nni_list_remove(&s->ready_pipes, p); + nni_list_append(&s->busy_pipes, p); - if ((aio = ctx->saio) != NULL) { - ctx->saio = NULL; - nni_aio_bump_count(aio, ctx->reqlen); + if ((aio = ctx->send_aio) != NULL) { + ctx->send_aio = NULL; + nni_aio_bump_count(aio, ctx->req_len); // If the list was passed in, we want to do a // synchronous completion later. - if (aiolist != NULL) { - nni_list_append(aiolist, aio); + if (send_list != NULL) { + nni_list_append(send_list, aio); } else { nni_aio_finish(aio, 0, 0); } - if (ctx == &s->ctx) { - if (nni_list_empty(&s->readypipes)) { - nni_pollable_clear(s->sendable); + if (ctx == &s->master) { + if (nni_list_empty(&s->ready_pipes)) { + nni_pollable_clear(&s->writable); } else { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } } } @@ -543,26 +540,26 @@ req0_ctx_reset(req0_ctx *ctx) // We cannot safely "wait" using nni_timer_cancel, but this removes // any scheduled timer activation. If the timeout is already running // concurrently, it will still run. It should do nothing, because - // we toss the reqmsg. There is still a very narrow race if the + // we toss the request. There is still a very narrow race if the // timeout fires, but doesn't actually start running before we // both finish this function, *and* manage to reschedule another // request. The consequence of that occurring is that the request // will be emitted on the wire twice. This is not actually tragic. nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER); - nni_list_node_remove(&ctx->pnode); - nni_list_node_remove(&ctx->sqnode); - if (ctx->reqid != 0) { - nni_idhash_remove(s->reqids, ctx->reqid); - ctx->reqid = 0; + nni_list_node_remove(&ctx->pipe_node); + nni_list_node_remove(&ctx->send_node); + if (ctx->request_id != 0) { + nni_idhash_remove(s->requests, ctx->request_id); + ctx->request_id = 0; } - if (ctx->reqmsg != NULL) { - nni_msg_free(ctx->reqmsg); - ctx->reqmsg = NULL; + if (ctx->req_msg != NULL) { + nni_msg_free(ctx->req_msg); + ctx->req_msg = NULL; } - if (ctx->repmsg != NULL) { - nni_msg_free(ctx->repmsg); - ctx->repmsg = NULL; + if (ctx->rep_msg != NULL) { + nni_msg_free(ctx->rep_msg); + ctx->rep_msg = NULL; } } @@ -573,12 +570,12 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (ctx->raio != aio) { + if (ctx->recv_aio != aio) { // already completed, ignore this. nni_mtx_unlock(&s->mtx); return; } - ctx->raio = NULL; + ctx->recv_aio = NULL; // Cancellation of a pending receive is treated as aborting the // entire state machine. This allows us to preserve the semantic of @@ -608,8 +605,8 @@ req0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((ctx->raio != NULL) || - ((ctx->reqmsg == NULL) && (ctx->repmsg == NULL))) { + if ((ctx->recv_aio != NULL) || + ((ctx->req_msg == NULL) && (ctx->rep_msg == NULL))) { // We have already got a pending receive or have not // tried to send a request yet. // Either of these violate our basic state assumptions. @@ -618,7 +615,7 @@ req0_ctx_recv(void *arg, nni_aio *aio) return; } - if ((msg = ctx->repmsg) == NULL) { + if ((msg = ctx->rep_msg) == NULL) { int rv; rv = nni_aio_schedule(aio, req0_ctx_cancel_recv, ctx); if (rv != 0) { @@ -626,17 +623,17 @@ req0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - ctx->raio = aio; + ctx->recv_aio = aio; nni_mtx_unlock(&s->mtx); return; } - ctx->repmsg = NULL; + ctx->rep_msg = NULL; // We have got a message to pass up, yay! nni_aio_set_msg(aio, msg); - if (ctx == &s->ctx) { - nni_pollable_clear(s->recvable); + if (ctx == &s->master) { + nni_pollable_clear(&s->readable); } nni_mtx_unlock(&s->mtx); nni_aio_finish(aio, 0, nni_msg_len(msg)); @@ -649,7 +646,7 @@ req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (ctx->saio != aio) { + if (ctx->send_aio != aio) { // already completed, ignore this. nni_mtx_unlock(&s->mtx); return; @@ -657,12 +654,12 @@ req0_ctx_cancel_send(nni_aio *aio, void *arg, int rv) // There should not be a pending reply, because we canceled // it while we were waiting. - NNI_ASSERT(ctx->raio == NULL); - ctx->saio = NULL; + NNI_ASSERT(ctx->recv_aio == NULL); + ctx->send_aio = NULL; // Restore the message back to the aio. - nni_aio_set_msg(aio, ctx->reqmsg); - nni_msg_header_clear(ctx->reqmsg); - ctx->reqmsg = NULL; + nni_aio_set_msg(aio, ctx->req_msg); + nni_msg_header_clear(ctx->req_msg); + ctx->req_msg = NULL; // Cancellation of a pending receive is treated as aborting the // entire state machine. This allows us to preserve the semantic of @@ -696,55 +693,55 @@ req0_ctx_send(void *arg, nni_aio *aio) } // Sending a new request cancels the old one, including any // outstanding reply. - if (ctx->raio != NULL) { - nni_aio_finish_error(ctx->raio, NNG_ECANCELED); - ctx->raio = NULL; + if (ctx->recv_aio != NULL) { + nni_aio_finish_error(ctx->recv_aio, NNG_ECANCELED); + ctx->recv_aio = NULL; } - if (ctx->saio != NULL) { - nni_aio_set_msg(ctx->saio, ctx->reqmsg); - nni_msg_header_clear(ctx->reqmsg); - ctx->reqmsg = NULL; - nni_aio_finish_error(ctx->saio, NNG_ECANCELED); - ctx->saio = NULL; - nni_list_remove(&s->sendq, ctx); + if (ctx->send_aio != NULL) { + nni_aio_set_msg(ctx->send_aio, ctx->req_msg); + nni_msg_header_clear(ctx->req_msg); + ctx->req_msg = NULL; + nni_aio_finish_error(ctx->send_aio, NNG_ECANCELED); + ctx->send_aio = NULL; + nni_list_remove(&s->send_queue, ctx); } // This resets the entire state machine. req0_ctx_reset(ctx); // Insert us on the per ID hash list, so that receives can find us. - if ((rv = nni_idhash_alloc(s->reqids, &id, ctx)) != 0) { + if ((rv = nni_idhash_alloc(s->requests, &id, ctx)) != 0) { nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } - ctx->reqid = (uint32_t) id; - if ((rv = nni_msg_header_append_u32(msg, ctx->reqid)) != 0) { - nni_idhash_remove(s->reqids, id); + ctx->request_id = (uint32_t) id; + if ((rv = nni_msg_header_append_u32(msg, ctx->request_id)) != 0) { + nni_idhash_remove(s->requests, id); nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } // If no pipes are ready, and the request was a poll (no background - // schedule), then fail it. Should be NNG_TIMEDOUT. + // schedule), then fail it. Should be NNG_ETIMEDOUT. rv = nni_aio_schedule(aio, req0_ctx_cancel_send, ctx); - if ((rv != 0) && (nni_list_empty(&s->readypipes))) { - nni_idhash_remove(s->reqids, id); + if ((rv != 0) && (nni_list_empty(&s->ready_pipes))) { + nni_idhash_remove(s->requests, id); nni_mtx_unlock(&s->mtx); nni_aio_finish_error(aio, rv); return; } - ctx->reqlen = nni_msg_len(msg); - ctx->reqmsg = msg; - ctx->saio = aio; + ctx->req_len = nni_msg_len(msg); + ctx->req_msg = msg; + ctx->send_aio = aio; nni_aio_set_msg(aio, NULL); - // Stick us on the sendq list. - nni_list_append(&s->sendq, ctx); + // Stick us on the send_queue list. + nni_list_append(&s->send_queue, ctx); - // Note that this will be synchronous if the readypipes list was + // Note that this will be synchronous if the ready_pipes list was // not empty. - req0_run_sendq(s, NULL); + req0_run_send_queue(s, NULL); nni_mtx_unlock(&s->mtx); } @@ -752,68 +749,69 @@ static void req0_sock_send(void *arg, nni_aio *aio) { req0_sock *s = arg; - req0_ctx_send(&s->ctx, aio); + req0_ctx_send(&s->master, aio); } static void req0_sock_recv(void *arg, nni_aio *aio) { req0_sock *s = arg; - req0_ctx_recv(&s->ctx, aio); + req0_ctx_recv(&s->master, aio); } static int -req0_sock_set_maxttl(void *arg, const void *buf, size_t sz, nni_opt_type t) +req0_sock_set_max_ttl(void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_sock *s = arg; return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, t)); } static int -req0_sock_get_maxttl(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_max_ttl(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; return (nni_copyout_int(s->ttl, buf, szp, t)); } static int -req0_sock_set_resendtime(void *arg, const void *buf, size_t sz, nni_opt_type t) +req0_sock_set_resend_time( + void *arg, const void *buf, size_t sz, nni_opt_type t) { req0_sock *s = arg; int rv; - rv = req0_ctx_set_resendtime(&s->ctx, buf, sz, t); - s->retry = s->ctx.retry; + rv = req0_ctx_set_resend_time(&s->master, buf, sz, t); + s->retry = s->master.retry; return (rv); } static int -req0_sock_get_resendtime(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; - return (req0_ctx_get_resendtime(&s->ctx, buf, szp, t)); + return (req0_ctx_get_resend_time(&s->master, buf, szp, t)); } static int -req0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); } static int -req0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +req0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { req0_sock *s = arg; int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } @@ -832,8 +830,8 @@ static nni_proto_pipe_ops req0_pipe_ops = { static nni_option req0_ctx_options[] = { { .o_name = NNG_OPT_REQ_RESENDTIME, - .o_get = req0_ctx_get_resendtime, - .o_set = req0_ctx_set_resendtime, + .o_get = req0_ctx_get_resend_time, + .o_set = req0_ctx_set_resend_time, }, { .o_name = NULL, @@ -852,21 +850,21 @@ static nni_proto_ctx_ops req0_ctx_ops = { static nni_option req0_sock_options[] = { { .o_name = NNG_OPT_MAXTTL, - .o_get = req0_sock_get_maxttl, - .o_set = req0_sock_set_maxttl, + .o_get = req0_sock_get_max_ttl, + .o_set = req0_sock_set_max_ttl, }, { .o_name = NNG_OPT_REQ_RESENDTIME, - .o_get = req0_sock_get_resendtime, - .o_set = req0_sock_set_resendtime, + .o_get = req0_sock_get_resend_time, + .o_set = req0_sock_set_resend_time, }, { .o_name = NNG_OPT_RECVFD, - .o_get = req0_sock_get_recvfd, + .o_get = req0_sock_get_recv_fd, }, { .o_name = NNG_OPT_SENDFD, - .o_get = req0_sock_get_sendfd, + .o_get = req0_sock_get_send_fd, }, // terminate list { @@ -896,7 +894,7 @@ static nni_proto req0_proto = { }; int -nng_req0_open(nng_socket *sidp) +nng_req0_open(nng_socket *sock) { - return (nni_proto_open(sidp, &req0_proto)); + return (nni_proto_open(sock, &req0_proto)); } diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index ccd25242..b4ffc917 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -48,14 +48,14 @@ struct resp0_ctx { // resp0_sock is our per-socket protocol private structure. struct resp0_sock { - nni_mtx mtx; - int ttl; - nni_idhash * pipes; - resp0_ctx ctx; - nni_list recvpipes; - nni_list recvq; - nni_pollable *recvable; - nni_pollable *sendable; + nni_mtx mtx; + int ttl; + nni_idhash * pipes; + resp0_ctx ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable readable; + nni_pollable writable; }; // resp0_pipe is our per-pipe protocol private structure. @@ -155,7 +155,7 @@ resp0_ctx_send(void *arg, nni_aio *aio) if (ctx == &s->ctx) { // We can't send anymore, because only one send per request. - nni_pollable_clear(s->sendable); + nni_pollable_clear(&s->writable); } nni_mtx_lock(&s->mtx); @@ -215,8 +215,8 @@ resp0_sock_fini(void *arg) nni_idhash_fini(s->pipes); resp0_ctx_fini(&s->ctx); - nni_pollable_free(s->sendable); - nni_pollable_free(s->recvable); + nni_pollable_fini(&s->writable); + nni_pollable_fini(&s->readable); nni_mtx_fini(&s->mtx); } @@ -241,13 +241,10 @@ resp0_sock_init(void *arg, nni_sock *nsock) (void) resp0_ctx_init(&s->ctx, s); - // We start off without being either readable or pollable. + // We start off without being either readable or writable. // Readability comes when there is something on the socket. - if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || - ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { - resp0_sock_fini(s); - return (rv); - } + nni_pollable_init(&s->writable); + nni_pollable_init(&s->readable); return (0); } @@ -357,7 +354,7 @@ resp0_pipe_close(void *arg) if (p->id == s->ctx.pipe_id) { // Make sure user space knows they can send a message to us, // which we will happily discard. - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_idhash_remove(s->pipes, p->id); nni_mtx_unlock(&s->mtx); @@ -385,7 +382,7 @@ resp0_pipe_send_cb(void *arg) // Nothing else to send. if (p->id == s->ctx.pipe_id) { // Mark us ready for the other side to send! - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->mtx); return; @@ -459,7 +456,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio) nni_aio_set_msg(p->aio_recv, NULL); nni_list_remove(&s->recvpipes, p); if (nni_list_empty(&s->recvpipes)) { - nni_pollable_clear(s->recvable); + nni_pollable_clear(&s->readable); } nni_pipe_recv(p->npipe, p->aio_recv); @@ -468,7 +465,7 @@ resp0_ctx_recv(void *arg, nni_aio *aio) ctx->btrace_len = len; ctx->pipe_id = p->id; if (ctx == &s->ctx) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->mtx); @@ -530,7 +527,7 @@ resp0_pipe_recv_cb(void *arg) if ((ctx = nni_list_first(&s->recvq)) == NULL) { // No one blocked in recv, stall. nni_list_append(&s->recvpipes, p); - nni_pollable_raise(s->recvable); + nni_pollable_raise(&s->readable); nni_mtx_unlock(&s->mtx); return; } @@ -549,7 +546,7 @@ resp0_pipe_recv_cb(void *arg) ctx->pipe_id = p->id; if ((ctx == &s->ctx) && (!p->busy)) { - nni_pollable_raise(s->sendable); + nni_pollable_raise(&s->writable); } nni_mtx_unlock(&s->mtx); @@ -584,7 +581,7 @@ resp0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -597,7 +594,7 @@ resp0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&s->readable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index be0ee55e..8aa05dd4 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -52,7 +52,7 @@ struct surv0_sock { nni_mtx mtx; surv0_ctx ctx; nni_idhash * surveys; - nni_pollable *sendable; + nni_pollable writable; }; // surv0_pipe is our per-pipe protocol private structure. @@ -221,7 +221,7 @@ surv0_sock_fini(void *arg) surv0_ctx_fini(&sock->ctx); nni_idhash_fini(sock->surveys); - nni_pollable_free(sock->sendable); + nni_pollable_fini(&sock->writable); nni_mtx_fini(&sock->mtx); } @@ -235,6 +235,9 @@ surv0_sock_init(void *arg, nni_sock *nsock) NNI_LIST_INIT(&sock->pipes, surv0_pipe, node); nni_mtx_init(&sock->mtx); + nni_pollable_init(&sock->writable); + // We are always writable. + nni_pollable_raise(&sock->writable); if (((rv = nni_idhash_init(&sock->surveys)) != 0) || ((rv = surv0_ctx_init(&sock->ctx, sock)) != 0)) { @@ -249,7 +252,7 @@ surv0_sock_init(void *arg, nni_sock *nsock) nni_random() | 0x80000000u); sock->ctx.survtime = NNI_SECOND; - sock->ttl = 8; + sock->ttl = 8; return (0); } @@ -478,17 +481,7 @@ surv0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_opt_type t) int rv; int fd; - nni_mtx_lock(&sock->mtx); - if (sock->sendable == NULL) { - if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { - nni_mtx_unlock(&sock->mtx); - return (rv); - } - // We are always sendable. - nni_pollable_raise(sock->sendable); - } - nni_mtx_unlock(&sock->mtx); - if ((rv = nni_pollable_getfd(sock->sendable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&sock->writable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); @@ -498,12 +491,12 @@ static int surv0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) { surv0_sock * sock = arg; - nni_pollable *recvable; + nni_pollable *readable; int rv; int fd; - if (((rv = nni_msgq_get_recvable(sock->ctx.rq, &recvable)) != 0) || - ((rv = nni_pollable_getfd(recvable, &fd)) != 0)) { + if (((rv = nni_msgq_get_recvable(sock->ctx.rq, &readable)) != 0) || + ((rv = nni_pollable_getfd(readable, &fd)) != 0)) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); |
