diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-01-04 10:24:05 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2020-01-04 10:56:40 -0800 |
| commit | 382b4cff3abd5ccb282ba420ef1f7c7d171ec91a (patch) | |
| tree | 6860e1cceb52a7dab2763001eb27edf95a0e7246 /src/protocol/pubsub0 | |
| parent | bcc3814b58e9b198344bdaf6e7a916a354841275 (diff) | |
| download | nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.gz nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.tar.bz2 nng-382b4cff3abd5ccb282ba420ef1f7c7d171ec91a.zip | |
fixes #1105 pollable can be inlined, and use atomics
This also introduces an nni_atomic_cas64 to help with lock-free designs.
Some mechanical renaming was done in some of the protocols for spelling.
Diffstat (limited to 'src/protocol/pubsub0')
| -rw-r--r-- | src/protocol/pubsub0/pub.c | 2 | ||||
| -rw-r--r-- | src/protocol/pubsub0/sub.c | 174 |
2 files changed, 87 insertions, 89 deletions
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index 22195c52..a42e95ff 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -289,7 +289,7 @@ pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t) int fd; int rv; nni_mtx_lock(&sock->mtx); - // PUB sockets are *always* sendable. + // PUB sockets are *always* writable. nni_pollable_raise(sock->sendable); rv = nni_pollable_getfd(sock->sendable, &fd); nni_mtx_unlock(&sock->mtx); diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 0a367216..56da98f8 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -29,10 +29,10 @@ #endif // By default we accept 128 messages. -#define SUB0_DEFAULT_QLEN 128 +#define SUB0_DEFAULT_RECV_BUF_LEN 128 // By default, prefer new messages when the queue is full. -#define SUB0_DEFAULT_PREFNEW true +#define SUB0_DEFAULT_PREFER_NEW true typedef struct sub0_pipe sub0_pipe; typedef struct sub0_sock sub0_sock; @@ -53,21 +53,21 @@ struct sub0_topic { struct sub0_ctx { nni_list_node node; sub0_sock * sock; - nni_list topics; // XXX: Consider replacing with patricia trie - nni_list raios; // sub context could have multiple pending recvs + nni_list topics; // TODO: Consider patricia trie + nni_list recv_queue; // can have multiple pending receives bool closed; nni_lmq lmq; - bool prefnew; + bool prefer_new; }; // sub0_sock is our per-socket protocol private structure. struct sub0_sock { - nni_pollable *recvable; - sub0_ctx ctx; // default context - nni_list ctxs; // all contexts - size_t recvbuflen; - bool prefnew; - nni_mtx lk; + nni_pollable readable; + sub0_ctx master; // default context + nni_list contexts; // all contexts + size_t recv_buf_len; + bool prefer_new; + nni_mtx lk; }; // sub0_pipe is our per-pipe protocol private structure. @@ -83,8 +83,8 @@ sub0_ctx_cancel(nng_aio *aio, void *arg, int rv) sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; nni_mtx_lock(&sock->lk); - if (nni_list_active(&ctx->raios, aio)) { - nni_list_remove(&ctx->raios, aio); + if (nni_list_active(&ctx->recv_queue, aio)) { + nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&sock->lk); @@ -116,15 +116,15 @@ sub0_ctx_recv(void *arg, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - nni_list_append(&ctx->raios, aio); + nni_list_append(&ctx->recv_queue, aio); nni_mtx_unlock(&sock->lk); return; } (void) nni_lmq_getq(&ctx->lmq, &msg); - if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->ctx)) { - nni_pollable_clear(sock->recvable); + if (nni_lmq_empty(&ctx->lmq) && (ctx == &sock->master)) { + nni_pollable_clear(&sock->readable); } nni_aio_set_msg(aio, msg); nni_mtx_unlock(&sock->lk); @@ -149,8 +149,8 @@ sub0_ctx_close(void *arg) nni_mtx_lock(&sock->lk); ctx->closed = true; - while ((aio = nni_list_first(&ctx->raios)) != NULL) { - nni_list_remove(&ctx->raios, aio); + while ((aio = nni_list_first(&ctx->recv_queue)) != NULL) { + nni_list_remove(&ctx->recv_queue, aio); nni_aio_finish_error(aio, NNG_ECLOSED); } nni_mtx_unlock(&sock->lk); @@ -166,7 +166,7 @@ sub0_ctx_fini(void *arg) sub0_ctx_close(ctx); nni_mtx_lock(&sock->lk); - nni_list_remove(&sock->ctxs, ctx); + nni_list_remove(&sock->contexts, ctx); nni_mtx_unlock(&sock->lk); while ((topic = nni_list_first(&ctx->topics)) != 0) { @@ -179,29 +179,29 @@ sub0_ctx_fini(void *arg) } static int -sub0_ctx_init(void *carg, void *sarg) +sub0_ctx_init(void *ctx_arg, void *sock_arg) { - sub0_sock *sock = sarg; - sub0_ctx * ctx = carg; + sub0_sock *sock = sock_arg; + sub0_ctx * ctx = ctx_arg; size_t len; - bool prefnew; + bool prefer_new; int rv; nni_mtx_lock(&sock->lk); - len = sock->recvbuflen; - prefnew = sock->prefnew; + len = sock->recv_buf_len; + prefer_new = sock->prefer_new; if ((rv = nni_lmq_init(&ctx->lmq, len)) != 0) { return (rv); } - ctx->prefnew = prefnew; + ctx->prefer_new = prefer_new; - nni_aio_list_init(&ctx->raios); + nni_aio_list_init(&ctx->recv_queue); NNI_LIST_INIT(&ctx->topics, sub0_topic, node); ctx->sock = sock; - nni_list_append(&sock->ctxs, ctx); + nni_list_append(&sock->contexts, ctx); nni_mtx_unlock(&sock->lk); return (0); @@ -212,28 +212,26 @@ sub0_sock_fini(void *arg) { sub0_sock *sock = arg; - sub0_ctx_fini(&sock->ctx); - if (sock->recvable != NULL) { - nni_pollable_free(sock->recvable); - } + sub0_ctx_fini(&sock->master); + nni_pollable_fini(&sock->readable); nni_mtx_fini(&sock->lk); } static int -sub0_sock_init(void *arg, nni_sock *nsock) +sub0_sock_init(void *arg, nni_sock *unused) { sub0_sock *sock = arg; int rv; - NNI_ARG_UNUSED(nsock); + NNI_ARG_UNUSED(unused); - NNI_LIST_INIT(&sock->ctxs, sub0_ctx, node); + NNI_LIST_INIT(&sock->contexts, sub0_ctx, node); nni_mtx_init(&sock->lk); - sock->recvbuflen = SUB0_DEFAULT_QLEN; - sock->prefnew = SUB0_DEFAULT_PREFNEW; + sock->recv_buf_len = SUB0_DEFAULT_RECV_BUF_LEN; + sock->prefer_new = SUB0_DEFAULT_PREFER_NEW; + nni_pollable_init(&sock->readable); - if (((rv = sub0_ctx_init(&sock->ctx, sock)) != 0) || - ((rv = nni_pollable_alloc(&sock->recvable)) != 0)) { + if ((rv = sub0_ctx_init(&sock->master, sock)) != 0) { sub0_sock_fini(sock); return (rv); } @@ -251,7 +249,7 @@ static void sub0_sock_close(void *arg) { sub0_sock *sock = arg; - sub0_ctx_close(&sock->ctx); + sub0_ctx_close(&sock->master); } static void @@ -357,10 +355,10 @@ sub0_recv_cb(void *arg) nni_mtx_lock(&sock->lk); // Go through all contexts. We will try to send up. - NNI_LIST_FOREACH (&sock->ctxs, ctx) { + NNI_LIST_FOREACH (&sock->contexts, ctx) { nni_msg *dup; - if (nni_lmq_full(&ctx->lmq) && !ctx->prefnew) { + if (nni_lmq_full(&ctx->lmq) && !ctx->prefer_new) { // Cannot deliver here, as receive buffer is full. continue; } @@ -371,7 +369,7 @@ sub0_recv_cb(void *arg) // Special optimization (for the case where only one context), // including when no contexts are in use, we avoid duplication. - if (ctx == nni_list_last(&sock->ctxs)) { + if (ctx == nni_list_last(&sock->contexts)) { dup = msg; msg = NULL; } else if (nni_msg_dup(&dup, msg) != 0) { @@ -382,9 +380,9 @@ sub0_recv_cb(void *arg) // message and it is intended for us. submatch = true; - if (!nni_list_empty(&ctx->raios)) { - aio = nni_list_first(&ctx->raios); - nni_list_remove(&ctx->raios, aio); + if (!nni_list_empty(&ctx->recv_queue)) { + aio = nni_list_first(&ctx->recv_queue); + nni_list_remove(&ctx->recv_queue, aio); nni_aio_set_msg(aio, dup); // Save for synchronous completion @@ -414,14 +412,14 @@ sub0_recv_cb(void *arg) } if (submatch) { - nni_pollable_raise(sock->recvable); + nni_pollable_raise(&sock->readable); } nni_pipe_recv(p->pipe, p->aio_recv); } static int -sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) +sub0_ctx_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; @@ -434,7 +432,7 @@ sub0_ctx_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) } static int -sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; @@ -452,8 +450,8 @@ sub0_ctx_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) // If we change the socket, then this will change the queue for // any new contexts. (Previously constructed contexts are unaffected.) - if (&sock->ctx == ctx) { - sock->recvbuflen = (size_t) val; + if (&sock->master == ctx) { + sock->recv_buf_len = (size_t) val; } nni_mtx_unlock(&sock->lk); return (0); @@ -470,7 +468,7 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) sub0_ctx * ctx = arg; sub0_sock * sock = ctx->sock; sub0_topic *topic; - sub0_topic *newtopic; + sub0_topic *new_topic; NNI_ARG_UNUSED(t); nni_mtx_lock(&sock->lk); @@ -484,18 +482,18 @@ sub0_ctx_subscribe(void *arg, const void *buf, size_t sz, nni_type t) return (0); } } - if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { + if ((new_topic = NNI_ALLOC_STRUCT(new_topic)) == NULL) { nni_mtx_unlock(&sock->lk); return (NNG_ENOMEM); } - if ((sz > 0) && ((newtopic->buf = nni_alloc(sz)) == NULL)) { + if ((sz > 0) && ((new_topic->buf = nni_alloc(sz)) == NULL)) { nni_mtx_unlock(&sock->lk); - NNI_FREE_STRUCT(newtopic); + NNI_FREE_STRUCT(new_topic); return (NNG_ENOMEM); } - memcpy(newtopic->buf, buf, sz); - newtopic->len = sz; - nni_list_append(&ctx->topics, newtopic); + memcpy(new_topic->buf, buf, sz); + new_topic->len = sz; + nni_list_append(&ctx->topics, new_topic); nni_mtx_unlock(&sock->lk); return (0); } @@ -547,21 +545,21 @@ sub0_ctx_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) } static int -sub0_ctx_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) +sub0_ctx_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; bool val; nni_mtx_lock(&sock->lk); - val = ctx->prefnew; + val = ctx->prefer_new; nni_mtx_unlock(&sock->lk); return (nni_copyout_bool(val, buf, szp, t)); } static int -sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) +sub0_ctx_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { sub0_ctx * ctx = arg; sub0_sock *sock = ctx->sock; @@ -573,9 +571,9 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) } nni_mtx_lock(&sock->lk); - ctx->prefnew = val; - if (&sock->ctx == ctx) { - sock->prefnew = val; + ctx->prefer_new = val; + if (&sock->master == ctx) { + sock->prefer_new = val; } nni_mtx_unlock(&sock->lk); @@ -585,8 +583,8 @@ sub0_ctx_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) static nni_option sub0_ctx_options[] = { { .o_name = NNG_OPT_RECVBUF, - .o_get = sub0_ctx_get_recvbuf, - .o_set = sub0_ctx_set_recvbuf, + .o_get = sub0_ctx_get_recv_buf_len, + .o_set = sub0_ctx_set_recv_buf_len, }, { .o_name = NNG_OPT_SUB_SUBSCRIBE, @@ -598,8 +596,8 @@ static nni_option sub0_ctx_options[] = { }, { .o_name = NNG_OPT_SUB_PREFNEW, - .o_get = sub0_ctx_get_prefnew, - .o_set = sub0_ctx_set_prefnew, + .o_get = sub0_ctx_get_prefer_new, + .o_set = sub0_ctx_set_prefer_new, }, { .o_name = NULL, @@ -620,62 +618,62 @@ sub0_sock_recv(void *arg, nni_aio *aio) { sub0_sock *sock = arg; - sub0_ctx_recv(&sock->ctx, aio); + sub0_ctx_recv(&sock->master, aio); } static int -sub0_sock_get_recvfd(void *arg, void *buf, size_t *szp, nni_opt_type t) +sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) { sub0_sock *sock = arg; int rv; int fd; - if ((rv = nni_pollable_getfd(sock->recvable, &fd)) != 0) { + if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { return (rv); } return (nni_copyout_int(fd, buf, szp, t)); } static int -sub0_sock_get_recvbuf(void *arg, void *buf, size_t *szp, nni_type t) +sub0_sock_get_recv_buf_len(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_get_recvbuf(&sock->ctx, buf, szp, t)); + return (sub0_ctx_get_recv_buf_len(&sock->master, buf, szp, t)); } static int -sub0_sock_set_recvbuf(void *arg, const void *buf, size_t sz, nni_type t) +sub0_sock_set_recv_buf_len(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_set_recvbuf(&sock->ctx, buf, sz, t)); + return (sub0_ctx_set_recv_buf_len(&sock->master, buf, sz, t)); } static int sub0_sock_subscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_subscribe(&sock->ctx, buf, sz, t)); + return (sub0_ctx_subscribe(&sock->master, buf, sz, t)); } static int sub0_sock_unsubscribe(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_unsubscribe(&sock->ctx, buf, sz, t)); + return (sub0_ctx_unsubscribe(&sock->master, buf, sz, t)); } static int -sub0_sock_get_prefnew(void *arg, void *buf, size_t *szp, nni_type t) +sub0_sock_get_prefer_new(void *arg, void *buf, size_t *szp, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_get_prefnew(&sock->ctx, buf, szp, t)); + return (sub0_ctx_get_prefer_new(&sock->master, buf, szp, t)); } static int -sub0_sock_set_prefnew(void *arg, const void *buf, size_t sz, nni_type t) +sub0_sock_set_prefer_new(void *arg, const void *buf, size_t sz, nni_type t) { sub0_sock *sock = arg; - return (sub0_ctx_set_prefnew(&sock->ctx, buf, sz, t)); + return (sub0_ctx_set_prefer_new(&sock->master, buf, sz, t)); } // This is the global protocol structure -- our linkage to the core. @@ -709,17 +707,17 @@ static nni_option sub0_sock_options[] = { }, { .o_name = NNG_OPT_RECVFD, - .o_get = sub0_sock_get_recvfd, + .o_get = sub0_sock_get_recv_fd, }, { .o_name = NNG_OPT_RECVBUF, - .o_get = sub0_sock_get_recvbuf, - .o_set = sub0_sock_set_recvbuf, + .o_get = sub0_sock_get_recv_buf_len, + .o_set = sub0_sock_set_recv_buf_len, }, { .o_name = NNG_OPT_SUB_PREFNEW, - .o_get = sub0_sock_get_prefnew, - .o_set = sub0_sock_set_prefnew, + .o_get = sub0_sock_get_prefer_new, + .o_set = sub0_sock_set_prefer_new, }, // terminate list { @@ -749,7 +747,7 @@ static nni_proto sub0_proto = { }; int -nng_sub0_open(nng_socket *sidp) +nng_sub0_open(nng_socket *sock) { - return (nni_proto_open(sidp, &sub0_proto)); + return (nni_proto_open(sock, &sub0_proto)); } |
