diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-04 13:36:54 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-10 15:40:00 -0700 |
| commit | 5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 (patch) | |
| tree | 39debf4ecde234b2a0be19c9cb15628cc32c2edb /src/core | |
| parent | 56f1bf30e61c53646dd2f8425da7c7fa0d97b3e1 (diff) | |
| download | nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.gz nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.bz2 nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.zip | |
fixes #334 Separate context for state machines from sockets
This provides context support for REQ and REP sockets.
More discussion around this is in the issue itself.
Optionally we would like to extend this to the surveyor pattern.
Note that we specifically do not support pollable descriptors
for non-default contexts, and the results of using file descriptors
for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined.
In the future, it might be nice to figure out how to factor in
optional use of a message queue for users who want more buffering,
but we think there is little need for this with cooked mode.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 2 | ||||
| -rw-r--r-- | src/core/defs.h | 3 | ||||
| -rw-r--r-- | src/core/idhash.h | 2 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/pollable.c | 101 | ||||
| -rw-r--r-- | src/core/pollable.h | 26 | ||||
| -rw-r--r-- | src/core/protocol.h | 45 | ||||
| -rw-r--r-- | src/core/socket.c | 354 | ||||
| -rw-r--r-- | src/core/socket.h | 42 |
9 files changed, 532 insertions, 44 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 9cf04217..3027b0c1 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -324,6 +324,7 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } if (!aio->a_sleep) { + // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: aio->a_expire = NNI_TIME_ZERO; @@ -338,7 +339,6 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } } - // Convert the relative timeout to an absolute timeout. if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } diff --git a/src/core/defs.h b/src/core/defs.h index f64d3df7..1d656bb2 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -41,6 +41,7 @@ typedef struct nng_notify nni_notify; // These are our own names. typedef struct nni_socket nni_sock; +typedef struct nni_ctx nni_ctx; typedef struct nni_ep nni_ep; typedef struct nni_pipe nni_pipe; typedef struct nni_tran nni_tran; @@ -49,6 +50,8 @@ typedef struct nni_tran_ep_option nni_tran_ep_option; typedef struct nni_tran_pipe nni_tran_pipe; typedef struct nni_tran_pipe_option nni_tran_pipe_option; +typedef struct nni_proto_ctx_option nni_proto_ctx_option; +typedef struct nni_proto_ctx_ops nni_proto_ctx_ops; typedef struct nni_proto_sock_ops nni_proto_sock_ops; typedef struct nni_proto_pipe_ops nni_proto_pipe_ops; typedef struct nni_proto_sock_option nni_proto_sock_option; diff --git a/src/core/idhash.h b/src/core/idhash.h index c2cecf81..9dbdd45e 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -29,8 +29,6 @@ typedef struct nni_idhash_entry nni_idhash_entry; extern int nni_idhash_init(nni_idhash **); extern void nni_idhash_fini(nni_idhash *); extern void nni_idhash_set_limits(nni_idhash *, uint64_t, uint64_t, uint64_t); -extern int nni_idhash_create(nni_idhash **); -extern void nni_idhash_destroy(nni_idhash *); extern int nni_idhash_find(nni_idhash *, uint64_t, void **); extern int nni_idhash_remove(nni_idhash *, uint64_t); extern int nni_idhash_insert(nni_idhash *, uint64_t, void *); diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 5c750ec7..fdb2ce94 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -37,6 +37,7 @@ #include "core/msgqueue.h" #include "core/options.h" #include "core/panic.h" +#include "core/pollable.h" #include "core/protocol.h" #include "core/random.h" #include "core/reap.h" diff --git a/src/core/pollable.c b/src/core/pollable.c new file mode 100644 index 00000000..b5cecf37 --- /dev/null +++ b/src/core/pollable.c @@ -0,0 +1,101 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +struct nni_pollable { + int p_rfd; + int p_wfd; + nni_mtx p_lock; + bool p_raised; + bool p_open; +}; + +int +nni_pollable_alloc(nni_pollable **pp) +{ + nni_pollable *p; + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + p->p_open = false; + p->p_raised = false; + nni_mtx_init(&p->p_lock); + *pp = p; + return (0); +} + +void +nni_pollable_free(nni_pollable *p) +{ + if (p == NULL) { + return; + } + if (p->p_open) { + nni_plat_pipe_close(p->p_rfd, p->p_wfd); + } + nni_mtx_fini(&p->p_lock); + NNI_FREE_STRUCT(p); +} + +void +nni_pollable_raise(nni_pollable *p) +{ + if (p == NULL) { + return; + } + nni_mtx_lock(&p->p_lock); + p->p_raised = true; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_raise(p->p_wfd); + return; + } + nni_mtx_unlock(&p->p_lock); +} + +void +nni_pollable_clear(nni_pollable *p) +{ + if (p == NULL) { + return; + } + nni_mtx_lock(&p->p_lock); + p->p_raised = false; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_clear(p->p_rfd); + return; + } + nni_mtx_unlock(&p->p_lock); +} + +int +nni_pollable_getfd(nni_pollable *p, int *fdp) +{ + if (p == NULL) { + return (NNG_EINVAL); + } + nni_mtx_lock(&p->p_lock); + if (!p->p_open) { + int rv; + if ((rv = nni_plat_pipe_open(&p->p_wfd, &p->p_rfd)) != 0) { + nni_mtx_unlock(&p->p_lock); + return (rv); + } + p->p_open = true; + if (p->p_raised) { + nni_plat_pipe_raise(p->p_wfd); + } + } + nni_mtx_unlock(&p->p_lock); + *fdp = p->p_rfd; + return (0); +} diff --git a/src/core/pollable.h b/src/core/pollable.h new file mode 100644 index 00000000..50ec9bf6 --- /dev/null +++ b/src/core/pollable.h @@ -0,0 +1,26 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_POLLABLE_H +#define CORE_POLLABLE_H + +#include "core/defs.h" +#include "core/list.h" + +// For the sake of simplicity, we just maintain a single global timer thread. + +typedef struct nni_pollable nni_pollable; + +extern int nni_pollable_alloc(nni_pollable **); +extern void nni_pollable_free(nni_pollable *); +extern void nni_pollable_raise(nni_pollable *); +extern void nni_pollable_clear(nni_pollable *); +extern int nni_pollable_getfd(nni_pollable *, int *); + +#endif // CORE_POLLABLE_H diff --git a/src/core/protocol.h b/src/core/protocol.h index 9b241138..964aee1a 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -47,6 +47,39 @@ struct nni_proto_pipe_ops { void (*pipe_stop)(void *); }; +struct nni_proto_ctx_option { + const char *co_name; + int co_type; + int (*co_getopt)(void *, void *, size_t *, int); + int (*co_setopt)(void *, const void *, size_t, int); +}; + +struct nni_proto_ctx_ops { + // ctx_init creates a new context. The second argument is the + // protocol specific socket structure. + int (*ctx_init)(void **, void *); + + // ctx_fini destroys a context. + void (*ctx_fini)(void *); + + // ctx_recv is an asynchronous recv. + void (*ctx_recv)(void *, nni_aio *); + + // ctx_send is an asynchronous send. + void (*ctx_send)(void *, nni_aio *); + + // ctx_drain drains the context, signaling the aio when done. + // This should prevent any further receives from completing, + // and only sends that had already been submitted should be + // permitted to continue. It may be NULL for protocols where + // draining without an ability to receive makes no sense + // (e.g. REQ or SURVEY). + void (*ctx_drain)(void *, nni_aio *); + + // ctx_options array. + nni_proto_ctx_option *ctx_options; +}; + struct nni_proto_sock_option { const char *pso_name; int pso_type; @@ -87,6 +120,12 @@ struct nni_proto_sock_ops { // should return NULL, otherwise the message (possibly modified). nni_msg *(*sock_filter)(void *, nni_msg *); + // Socket draining is intended to permit protocols to "drain" + // before exiting. For protocols where draining makes no + // sense, this may be NULL. (Example: REQ and SURVEYOR should + // not drain, because they cannot receive a reply!) + void (*sock_drain)(void *, nni_aio *); + // Options. Must not be NULL. Final entry should have NULL name. nni_proto_sock_option *sock_options; }; @@ -103,6 +142,7 @@ struct nni_proto { uint32_t proto_flags; // Protocol flags const nni_proto_sock_ops *proto_sock_ops; // Per-socket opeations const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations. + const nni_proto_ctx_ops * proto_ctx_ops; // Context operations. // proto_init, if not NULL, provides a function that initializes // global values. The main purpose of this may be to initialize @@ -128,11 +168,14 @@ struct nni_proto { // These flags determine which operations make sense. We use them so that // we can reject attempts to create notification fds for operations that make // no sense. Also, we can detect raw mode, thereby providing handling for -// that at the socket layer (NNG_PROTO_FLAG_RAW). +// that at the socket layer (NNG_PROTO_FLAG_RAW). Finally, we provide the +// NNI_PROTO_FLAG_NOMSGQ flag for protocols that do not use the upper write +// or upper read queues. #define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv #define NNI_PROTO_FLAG_RAW 4 // Protocol is raw +#define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queues // nni_proto_open is called by the protocol to create a socket instance // with its ops vector. The intent is that applications will only see diff --git a/src/core/socket.c b/src/core/socket.c index 62950b1c..67a2991c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,19 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +static nni_idhash *nni_ctx_hash; + +struct nni_ctx { + nni_list_node c_node; + nni_sock * c_sock; + nni_proto_ctx_ops c_ops; + void * c_data; + bool c_closed; + unsigned c_refcnt; // protected by global lock + uint32_t c_id; + nng_duration c_sndtimeo; + nng_duration c_rcvtimeo; +}; typedef struct nni_socket_option { const char *so_name; @@ -53,6 +66,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; + nni_proto_ctx_ops s_ctx_ops; // options nni_duration s_linger; // linger time @@ -66,15 +80,19 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void nni_ctx_destroy(nni_ctx *); + static void nni_sock_can_send_cb(void *arg, int flags) { @@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags) } } -void -nni_sock_set_sendable(nni_sock *s, bool cansend) -{ - nni_notifyfd *fd = &s->s_send_fd; - if (fd->sn_init) { - if (cansend) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - -void -nni_sock_set_recvable(nni_sock *s, bool canrecv) -{ - nni_notifyfd *fd = &s->s_recv_fd; - if (fd->sn_init) { - if (canrecv) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) return (rv); } - nni_msgq_set_cb(mq, cb, fd); + // Only set the callback on the message queue if we are + // using it. The message queue automatically updates + // the pipe when the callback is first established. + // If we are not using the message queue, then we have + // to update the initial state explicitly ourselves. + + if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { + nni_msgq_set_cb(mq, cb, fd); + } fd->sn_init = 1; } @@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) s->s_sock_ops = *proto->proto_sock_ops; s->s_pipe_ops = *proto->proto_pipe_ops; + if (proto->proto_ctx_ops != NULL) { + s->s_ctx_ops = *proto->proto_ctx_ops; + } + NNI_ASSERT(s->s_sock_ops.sock_open != NULL); NNI_ASSERT(s->s_sock_ops.sock_close != NULL); @@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); + NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); + nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); @@ -613,19 +619,27 @@ nni_sock_sys_init(void) NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); nni_mtx_init(&nni_sock_lk); - if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) { + if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || + ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) { nni_sock_sys_fini(); - } else { - nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + return (rv); } - return (rv); + nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1); + return (0); } void nni_sock_sys_fini(void) { - nni_idhash_fini(nni_sock_hash); - nni_sock_hash = NULL; + if (nni_sock_hash != NULL) { + nni_idhash_fini(nni_sock_hash); + nni_sock_hash = NULL; + } + if (nni_ctx_hash != NULL) { + nni_idhash_fini(nni_ctx_hash); + nni_ctx_hash = NULL; + } nni_mtx_fini(&nni_sock_lk); } @@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) s->s_sock_ops.sock_open(s->s_data); *sockp = s; } + nni_mtx_unlock(&nni_sock_lk); + // Set the sockname. (void) snprintf( s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); - nni_mtx_unlock(&nni_sock_lk); return (rv); } @@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock) nni_ep * ep; nni_ep * nep; nni_time linger; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&sock->s_mx); + // We now mark any owned contexts as closing. + // XXX: Add context draining support here! + nni_mtx_lock(&nni_sock_lk); + nctx = nni_list_first(&sock->s_ctxs); + while ((ctx = nctx) != NULL) { + nctx = nni_list_next(&sock->s_ctxs, ctx); + ctx->c_closed = true; + if (ctx->c_refcnt == 0) { + // No open operations. So close it. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + nni_ctx_destroy(ctx); + } + // If still has a reference count, then wait for last + // reference to close before nuking it. + } + nni_mtx_unlock(&nni_sock_lk); + + // XXX: Add protocol specific drain here. This should replace the + // msgq_drain feature below. Probably msgq_drain will need to + // be changed to take an AIO for completion. + // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* - // until the linger time has expired. - nni_msgq_drain(sock->s_uwq, linger); + // until the linger time has expired. We only do this for sendable + // sockets that are actually using the message queue of course. + if ((nni_sock_flags(sock) & + (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) == + NNI_PROTO_FLAG_SND) { + nni_msgq_drain(sock->s_uwq, linger); + } // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it // a chance to do so gracefully. + + nni_mtx_lock(&nni_sock_lk); + while (!nni_list_empty(&sock->s_ctxs)) { + sock->s_ctxwait = true; + nni_cv_wait(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + nni_mtx_lock(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { @@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s) // Wait for all other references to drop. Note that we // have a reference already (from our caller). - while (s->s_refcnt > 1) { + s->s_ctxwait = true; + while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) { nni_cv_wait(&s->s_close_cv); } nni_mtx_unlock(&nni_sock_lk); @@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock) { return (sock->s_flags); } + +int +nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_mtx_lock(&nni_sock_lk); + if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) { + // We refuse a reference if either the socket is closed, + // or the context is closed. (If the socket is closed, + // and we are only getting the reference so we can close it, + // then we still allow. In the case the only valid operation + // will be to close the socket.) + if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) { + rv = NNG_ECLOSED; + } else { + ctx->c_refcnt++; + *ctxp = ctx; + } + } + nni_mtx_unlock(&nni_sock_lk); + + if (rv == NNG_ENOENT) { + rv = NNG_ECLOSED; + } + + return (rv); +} + +static void +nni_ctx_destroy(nni_ctx *ctx) +{ + if (ctx->c_data != NULL) { + ctx->c_ops.ctx_fini(ctx->c_data); + } + + // Let the socket go, our hold on it is done. + NNI_FREE_STRUCT(ctx); +} + +void +nni_ctx_rele(nni_ctx *ctx) +{ + nni_sock *sock = ctx->c_sock; + nni_mtx_lock(&nni_sock_lk); + ctx->c_refcnt--; + if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) { + // Either still have an active reference, or not actually + // closing yet. + nni_mtx_unlock(&nni_sock_lk); + return; + } + + // Remove us from the hash, so we can't be found any more. + // This allows our ID to be reused later, although the system + // tries to avoid ID reuse. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + if (sock->s_closed || sock->s_ctxwait) { + nni_cv_wake(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_destroy(ctx); +} + +int +nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) +{ + nni_ctx *ctx; + int rv; + uint64_t id; + + if (sock->s_ctx_ops.ctx_init == NULL) { + return (NNG_ENOTSUP); + } + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_lock(&nni_sock_lk); + if (sock->s_closed) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (NNG_ECLOSED); + } + if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + ctx->c_id = (uint32_t) id; + + if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + + ctx->c_closed = false; + ctx->c_refcnt = 1; // Caller implicitly gets a reference. + ctx->c_sock = sock; + ctx->c_ops = sock->s_ctx_ops; + ctx->c_rcvtimeo = sock->s_rcvtimeo; + ctx->c_sndtimeo = sock->s_sndtimeo; + + nni_list_append(&sock->s_ctxs, ctx); + nni_mtx_unlock(&nni_sock_lk); + + // Paranoia, fixing a possible race in close. Don't let us + // give back a context if the socket is being shutdown (it might + // not have reached the "closed" state yet.) + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + nni_ctx_rele(ctx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&sock->s_mx); + *ctxp = ctx; + + return (0); +} + +void +nni_ctx_close(nni_ctx *ctx) +{ + nni_mtx_lock(&nni_sock_lk); + ctx->c_closed = true; + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_rele(ctx); +} + +uint32_t +nni_ctx_id(nni_ctx *ctx) +{ + return (ctx->c_id); +} + +void +nni_ctx_send(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_sndtimeo); + ctx->c_ops.ctx_send(ctx->c_data, aio); +} + +void +nni_ctx_recv(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo); + ctx->c_ops.ctx_recv(ctx->c_data, aio); +} + +int +nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_getopt == NULL) { + rv = NNG_EWRITEONLY; + break; + } + rv = co->co_getopt(ctx->c_data, v, szp, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} + +int +nni_ctx_setopt( + nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_setopt == NULL) { + rv = NNG_EREADONLY; + break; + } + rv = co->co_setopt(ctx->c_data, v, sz, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} diff --git a/src/core/socket.h b/src/core/socket.h index 3af6bb9e..87bf1374 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -63,4 +63,46 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *); // nni_sock_flags returns the socket flags, used to indicate whether read // and or write are appropriate for the protocol. extern uint32_t nni_sock_flags(nni_sock *); + +// nni_ctx_open is used to open/create a new context structure. +// Contexts are not supported by most protocols, but for those that do, +// this can offer some improvements for massive concurrency/scalability. +// Returns NNG_ENOTSUP for protocols that lack context support. This adds +// another reference (hold) on the socket on success, and the newly +// created context is also held by the caller. Not supported by raw mode +// sockets (will also return NNG_ENOTSUP). +extern int nni_ctx_open(nni_ctx **, nni_sock *); + +// nni_ctx_find finds a context given its id. The last argument should +// be true if the context is acquired merely to close it, false otherwise. +// (If the socket for the context is being closed, then this will return +// NNG_ECLOSED unless the final argument is true.) +extern int nni_ctx_find(nni_ctx **, uint32_t, bool); + +// nni_ctx_rele is called to release a hold on the context. These holds +// are acquired by either nni_ctx_open or nni_ctx_find. If the context +// is being closed (nni_ctx_close was called), and this is the last reference, +// then the underlying context is freed, and the implicit socket hold +// by the context is also released. +extern void nni_ctx_rele(nni_ctx *); + +// nni_ctx_close is used to close the context. It also implictly releases +// the context. +extern void nni_ctx_close(nni_ctx *); + +// nni_ctx_id returns the context ID, which can be used with nni_ctx_find. +extern uint32_t nni_ctx_id(nni_ctx *); + +// nni_ctx_recv is an asychronous receive. +extern void nni_ctx_recv(nni_ctx *, nni_aio *); + +// nni_ctx_send is an asychronous receive. +extern void nni_ctx_send(nni_ctx *, nni_aio *); + +// nni_ctx_getopt is used to get a context option. +extern int nni_ctx_getopt(nni_ctx *, const char *, void *, size_t *, int); + +// nni_ctx_setopt is used to set a context option. +extern int nni_ctx_setopt(nni_ctx *, const char *, const void *, size_t, int); + #endif // CORE_SOCKET_H |
