diff options
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/aio.c | 2 | ||||
| -rw-r--r-- | src/core/defs.h | 3 | ||||
| -rw-r--r-- | src/core/idhash.h | 2 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/pollable.c | 101 | ||||
| -rw-r--r-- | src/core/pollable.h | 26 | ||||
| -rw-r--r-- | src/core/protocol.h | 45 | ||||
| -rw-r--r-- | src/core/socket.c | 354 | ||||
| -rw-r--r-- | src/core/socket.h | 42 |
9 files changed, 532 insertions, 44 deletions
diff --git a/src/core/aio.c b/src/core/aio.c index 9cf04217..3027b0c1 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -324,6 +324,7 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } if (!aio->a_sleep) { + // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: aio->a_expire = NNI_TIME_ZERO; @@ -338,7 +339,6 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } } - // Convert the relative timeout to an absolute timeout. if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } diff --git a/src/core/defs.h b/src/core/defs.h index f64d3df7..1d656bb2 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -41,6 +41,7 @@ typedef struct nng_notify nni_notify; // These are our own names. typedef struct nni_socket nni_sock; +typedef struct nni_ctx nni_ctx; typedef struct nni_ep nni_ep; typedef struct nni_pipe nni_pipe; typedef struct nni_tran nni_tran; @@ -49,6 +50,8 @@ typedef struct nni_tran_ep_option nni_tran_ep_option; typedef struct nni_tran_pipe nni_tran_pipe; typedef struct nni_tran_pipe_option nni_tran_pipe_option; +typedef struct nni_proto_ctx_option nni_proto_ctx_option; +typedef struct nni_proto_ctx_ops nni_proto_ctx_ops; typedef struct nni_proto_sock_ops nni_proto_sock_ops; typedef struct nni_proto_pipe_ops nni_proto_pipe_ops; typedef struct nni_proto_sock_option nni_proto_sock_option; diff --git a/src/core/idhash.h b/src/core/idhash.h index c2cecf81..9dbdd45e 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -29,8 +29,6 @@ typedef struct nni_idhash_entry nni_idhash_entry; extern int nni_idhash_init(nni_idhash **); extern void nni_idhash_fini(nni_idhash *); extern void nni_idhash_set_limits(nni_idhash *, uint64_t, uint64_t, uint64_t); -extern int nni_idhash_create(nni_idhash **); -extern void nni_idhash_destroy(nni_idhash *); extern int nni_idhash_find(nni_idhash *, uint64_t, void **); extern int nni_idhash_remove(nni_idhash *, uint64_t); extern int nni_idhash_insert(nni_idhash *, uint64_t, void *); diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 5c750ec7..fdb2ce94 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -37,6 +37,7 @@ #include "core/msgqueue.h" #include "core/options.h" #include "core/panic.h" +#include "core/pollable.h" #include "core/protocol.h" #include "core/random.h" #include "core/reap.h" diff --git a/src/core/pollable.c b/src/core/pollable.c new file mode 100644 index 00000000..b5cecf37 --- /dev/null +++ b/src/core/pollable.c @@ -0,0 +1,101 @@ +// +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +struct nni_pollable { + int p_rfd; + int p_wfd; + nni_mtx p_lock; + bool p_raised; + bool p_open; +}; + +int +nni_pollable_alloc(nni_pollable **pp) +{ + nni_pollable *p; + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + p->p_open = false; + p->p_raised = false; + nni_mtx_init(&p->p_lock); + *pp = p; + return (0); +} + +void +nni_pollable_free(nni_pollable *p) +{ + if (p == NULL) { + return; + } + if (p->p_open) { + nni_plat_pipe_close(p->p_rfd, p->p_wfd); + } + nni_mtx_fini(&p->p_lock); + NNI_FREE_STRUCT(p); +} + +void +nni_pollable_raise(nni_pollable *p) +{ + if (p == NULL) { + return; + } + nni_mtx_lock(&p->p_lock); + p->p_raised = true; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_raise(p->p_wfd); + return; + } + nni_mtx_unlock(&p->p_lock); +} + +void +nni_pollable_clear(nni_pollable *p) +{ + if (p == NULL) { + return; + } + nni_mtx_lock(&p->p_lock); + p->p_raised = false; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_clear(p->p_rfd); + return; + } + nni_mtx_unlock(&p->p_lock); +} + +int +nni_pollable_getfd(nni_pollable *p, int *fdp) +{ + if (p == NULL) { + return (NNG_EINVAL); + } + nni_mtx_lock(&p->p_lock); + if (!p->p_open) { + int rv; + if ((rv = nni_plat_pipe_open(&p->p_wfd, &p->p_rfd)) != 0) { + nni_mtx_unlock(&p->p_lock); + return (rv); + } + p->p_open = true; + if (p->p_raised) { + nni_plat_pipe_raise(p->p_wfd); + } + } + nni_mtx_unlock(&p->p_lock); + *fdp = p->p_rfd; + return (0); +} diff --git a/src/core/pollable.h b/src/core/pollable.h new file mode 100644 index 00000000..50ec9bf6 --- /dev/null +++ b/src/core/pollable.h @@ -0,0 +1,26 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_POLLABLE_H +#define CORE_POLLABLE_H + +#include "core/defs.h" +#include "core/list.h" + +// For the sake of simplicity, we just maintain a single global timer thread. + +typedef struct nni_pollable nni_pollable; + +extern int nni_pollable_alloc(nni_pollable **); +extern void nni_pollable_free(nni_pollable *); +extern void nni_pollable_raise(nni_pollable *); +extern void nni_pollable_clear(nni_pollable *); +extern int nni_pollable_getfd(nni_pollable *, int *); + +#endif // CORE_POLLABLE_H diff --git a/src/core/protocol.h b/src/core/protocol.h index 9b241138..964aee1a 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -47,6 +47,39 @@ struct nni_proto_pipe_ops { void (*pipe_stop)(void *); }; +struct nni_proto_ctx_option { + const char *co_name; + int co_type; + int (*co_getopt)(void *, void *, size_t *, int); + int (*co_setopt)(void *, const void *, size_t, int); +}; + +struct nni_proto_ctx_ops { + // ctx_init creates a new context. The second argument is the + // protocol specific socket structure. + int (*ctx_init)(void **, void *); + + // ctx_fini destroys a context. + void (*ctx_fini)(void *); + + // ctx_recv is an asynchronous recv. + void (*ctx_recv)(void *, nni_aio *); + + // ctx_send is an asynchronous send. + void (*ctx_send)(void *, nni_aio *); + + // ctx_drain drains the context, signaling the aio when done. + // This should prevent any further receives from completing, + // and only sends that had already been submitted should be + // permitted to continue. It may be NULL for protocols where + // draining without an ability to receive makes no sense + // (e.g. REQ or SURVEY). + void (*ctx_drain)(void *, nni_aio *); + + // ctx_options array. + nni_proto_ctx_option *ctx_options; +}; + struct nni_proto_sock_option { const char *pso_name; int pso_type; @@ -87,6 +120,12 @@ struct nni_proto_sock_ops { // should return NULL, otherwise the message (possibly modified). nni_msg *(*sock_filter)(void *, nni_msg *); + // Socket draining is intended to permit protocols to "drain" + // before exiting. For protocols where draining makes no + // sense, this may be NULL. (Example: REQ and SURVEYOR should + // not drain, because they cannot receive a reply!) + void (*sock_drain)(void *, nni_aio *); + // Options. Must not be NULL. Final entry should have NULL name. nni_proto_sock_option *sock_options; }; @@ -103,6 +142,7 @@ struct nni_proto { uint32_t proto_flags; // Protocol flags const nni_proto_sock_ops *proto_sock_ops; // Per-socket opeations const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations. + const nni_proto_ctx_ops * proto_ctx_ops; // Context operations. // proto_init, if not NULL, provides a function that initializes // global values. The main purpose of this may be to initialize @@ -128,11 +168,14 @@ struct nni_proto { // These flags determine which operations make sense. We use them so that // we can reject attempts to create notification fds for operations that make // no sense. Also, we can detect raw mode, thereby providing handling for -// that at the socket layer (NNG_PROTO_FLAG_RAW). +// that at the socket layer (NNG_PROTO_FLAG_RAW). Finally, we provide the +// NNI_PROTO_FLAG_NOMSGQ flag for protocols that do not use the upper write +// or upper read queues. #define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv #define NNI_PROTO_FLAG_RAW 4 // Protocol is raw +#define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queues // nni_proto_open is called by the protocol to create a socket instance // with its ops vector. The intent is that applications will only see diff --git a/src/core/socket.c b/src/core/socket.c index 62950b1c..67a2991c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,19 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +static nni_idhash *nni_ctx_hash; + +struct nni_ctx { + nni_list_node c_node; + nni_sock * c_sock; + nni_proto_ctx_ops c_ops; + void * c_data; + bool c_closed; + unsigned c_refcnt; // protected by global lock + uint32_t c_id; + nng_duration c_sndtimeo; + nng_duration c_rcvtimeo; +}; typedef struct nni_socket_option { const char *so_name; @@ -53,6 +66,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; + nni_proto_ctx_ops s_ctx_ops; // options nni_duration s_linger; // linger time @@ -66,15 +80,19 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void nni_ctx_destroy(nni_ctx *); + static void nni_sock_can_send_cb(void *arg, int flags) { @@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags) } } -void -nni_sock_set_sendable(nni_sock *s, bool cansend) -{ - nni_notifyfd *fd = &s->s_send_fd; - if (fd->sn_init) { - if (cansend) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - -void -nni_sock_set_recvable(nni_sock *s, bool canrecv) -{ - nni_notifyfd *fd = &s->s_recv_fd; - if (fd->sn_init) { - if (canrecv) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) return (rv); } - nni_msgq_set_cb(mq, cb, fd); + // Only set the callback on the message queue if we are + // using it. The message queue automatically updates + // the pipe when the callback is first established. + // If we are not using the message queue, then we have + // to update the initial state explicitly ourselves. + + if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { + nni_msgq_set_cb(mq, cb, fd); + } fd->sn_init = 1; } @@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) s->s_sock_ops = *proto->proto_sock_ops; s->s_pipe_ops = *proto->proto_pipe_ops; + if (proto->proto_ctx_ops != NULL) { + s->s_ctx_ops = *proto->proto_ctx_ops; + } + NNI_ASSERT(s->s_sock_ops.sock_open != NULL); NNI_ASSERT(s->s_sock_ops.sock_close != NULL); @@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); + NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); + nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); @@ -613,19 +619,27 @@ nni_sock_sys_init(void) NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); nni_mtx_init(&nni_sock_lk); - if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) { + if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || + ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) { nni_sock_sys_fini(); - } else { - nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + return (rv); } - return (rv); + nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1); + return (0); } void nni_sock_sys_fini(void) { - nni_idhash_fini(nni_sock_hash); - nni_sock_hash = NULL; + if (nni_sock_hash != NULL) { + nni_idhash_fini(nni_sock_hash); + nni_sock_hash = NULL; + } + if (nni_ctx_hash != NULL) { + nni_idhash_fini(nni_ctx_hash); + nni_ctx_hash = NULL; + } nni_mtx_fini(&nni_sock_lk); } @@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) s->s_sock_ops.sock_open(s->s_data); *sockp = s; } + nni_mtx_unlock(&nni_sock_lk); + // Set the sockname. (void) snprintf( s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); - nni_mtx_unlock(&nni_sock_lk); return (rv); } @@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock) nni_ep * ep; nni_ep * nep; nni_time linger; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&sock->s_mx); + // We now mark any owned contexts as closing. + // XXX: Add context draining support here! + nni_mtx_lock(&nni_sock_lk); + nctx = nni_list_first(&sock->s_ctxs); + while ((ctx = nctx) != NULL) { + nctx = nni_list_next(&sock->s_ctxs, ctx); + ctx->c_closed = true; + if (ctx->c_refcnt == 0) { + // No open operations. So close it. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + nni_ctx_destroy(ctx); + } + // If still has a reference count, then wait for last + // reference to close before nuking it. + } + nni_mtx_unlock(&nni_sock_lk); + + // XXX: Add protocol specific drain here. This should replace the + // msgq_drain feature below. Probably msgq_drain will need to + // be changed to take an AIO for completion. + // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* - // until the linger time has expired. - nni_msgq_drain(sock->s_uwq, linger); + // until the linger time has expired. We only do this for sendable + // sockets that are actually using the message queue of course. + if ((nni_sock_flags(sock) & + (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) == + NNI_PROTO_FLAG_SND) { + nni_msgq_drain(sock->s_uwq, linger); + } // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it // a chance to do so gracefully. + + nni_mtx_lock(&nni_sock_lk); + while (!nni_list_empty(&sock->s_ctxs)) { + sock->s_ctxwait = true; + nni_cv_wait(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + nni_mtx_lock(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { @@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s) // Wait for all other references to drop. Note that we // have a reference already (from our caller). - while (s->s_refcnt > 1) { + s->s_ctxwait = true; + while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) { nni_cv_wait(&s->s_close_cv); } nni_mtx_unlock(&nni_sock_lk); @@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock) { return (sock->s_flags); } + +int +nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_mtx_lock(&nni_sock_lk); + if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) { + // We refuse a reference if either the socket is closed, + // or the context is closed. (If the socket is closed, + // and we are only getting the reference so we can close it, + // then we still allow. In the case the only valid operation + // will be to close the socket.) + if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) { + rv = NNG_ECLOSED; + } else { + ctx->c_refcnt++; + *ctxp = ctx; + } + } + nni_mtx_unlock(&nni_sock_lk); + + if (rv == NNG_ENOENT) { + rv = NNG_ECLOSED; + } + + return (rv); +} + +static void +nni_ctx_destroy(nni_ctx *ctx) +{ + if (ctx->c_data != NULL) { + ctx->c_ops.ctx_fini(ctx->c_data); + } + + // Let the socket go, our hold on it is done. + NNI_FREE_STRUCT(ctx); +} + +void +nni_ctx_rele(nni_ctx *ctx) +{ + nni_sock *sock = ctx->c_sock; + nni_mtx_lock(&nni_sock_lk); + ctx->c_refcnt--; + if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) { + // Either still have an active reference, or not actually + // closing yet. + nni_mtx_unlock(&nni_sock_lk); + return; + } + + // Remove us from the hash, so we can't be found any more. + // This allows our ID to be reused later, although the system + // tries to avoid ID reuse. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + if (sock->s_closed || sock->s_ctxwait) { + nni_cv_wake(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_destroy(ctx); +} + +int +nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) +{ + nni_ctx *ctx; + int rv; + uint64_t id; + + if (sock->s_ctx_ops.ctx_init == NULL) { + return (NNG_ENOTSUP); + } + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_lock(&nni_sock_lk); + if (sock->s_closed) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (NNG_ECLOSED); + } + if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + ctx->c_id = (uint32_t) id; + + if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + + ctx->c_closed = false; + ctx->c_refcnt = 1; // Caller implicitly gets a reference. + ctx->c_sock = sock; + ctx->c_ops = sock->s_ctx_ops; + ctx->c_rcvtimeo = sock->s_rcvtimeo; + ctx->c_sndtimeo = sock->s_sndtimeo; + + nni_list_append(&sock->s_ctxs, ctx); + nni_mtx_unlock(&nni_sock_lk); + + // Paranoia, fixing a possible race in close. Don't let us + // give back a context if the socket is being shutdown (it might + // not have reached the "closed" state yet.) + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + nni_ctx_rele(ctx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&sock->s_mx); + *ctxp = ctx; + + return (0); +} + +void +nni_ctx_close(nni_ctx *ctx) +{ + nni_mtx_lock(&nni_sock_lk); + ctx->c_closed = true; + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_rele(ctx); +} + +uint32_t +nni_ctx_id(nni_ctx *ctx) +{ + return (ctx->c_id); +} + +void +nni_ctx_send(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_sndtimeo); + ctx->c_ops.ctx_send(ctx->c_data, aio); +} + +void +nni_ctx_recv(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo); + ctx->c_ops.ctx_recv(ctx->c_data, aio); +} + +int +nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_getopt == NULL) { + rv = NNG_EWRITEONLY; + break; + } + rv = co->co_getopt(ctx->c_data, v, szp, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} + +int +nni_ctx_setopt( + nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_setopt == NULL) { + rv = NNG_EREADONLY; + break; + } + rv = co->co_setopt(ctx->c_data, v, sz, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} diff --git a/src/core/socket.h b/src/core/socket.h index 3af6bb9e..87bf1374 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -63,4 +63,46 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *); // nni_sock_flags returns the socket flags, used to indicate whether read // and or write are appropriate for the protocol. extern uint32_t nni_sock_flags(nni_sock *); + +// nni_ctx_open is used to open/create a new context structure. +// Contexts are not supported by most protocols, but for those that do, +// this can offer some improvements for massive concurrency/scalability. +// Returns NNG_ENOTSUP for protocols that lack context support. This adds +// another reference (hold) on the socket on success, and the newly +// created context is also held by the caller. Not supported by raw mode +// sockets (will also return NNG_ENOTSUP). +extern int nni_ctx_open(nni_ctx **, nni_sock *); + +// nni_ctx_find finds a context given its id. The last argument should +// be true if the context is acquired merely to close it, false otherwise. +// (If the socket for the context is being closed, then this will return +// NNG_ECLOSED unless the final argument is true.) +extern int nni_ctx_find(nni_ctx **, uint32_t, bool); + +// nni_ctx_rele is called to release a hold on the context. These holds +// are acquired by either nni_ctx_open or nni_ctx_find. If the context +// is being closed (nni_ctx_close was called), and this is the last reference, +// then the underlying context is freed, and the implicit socket hold +// by the context is also released. +extern void nni_ctx_rele(nni_ctx *); + +// nni_ctx_close is used to close the context. It also implictly releases +// the context. +extern void nni_ctx_close(nni_ctx *); + +// nni_ctx_id returns the context ID, which can be used with nni_ctx_find. +extern uint32_t nni_ctx_id(nni_ctx *); + +// nni_ctx_recv is an asychronous receive. +extern void nni_ctx_recv(nni_ctx *, nni_aio *); + +// nni_ctx_send is an asychronous receive. +extern void nni_ctx_send(nni_ctx *, nni_aio *); + +// nni_ctx_getopt is used to get a context option. +extern int nni_ctx_getopt(nni_ctx *, const char *, void *, size_t *, int); + +// nni_ctx_setopt is used to set a context option. +extern int nni_ctx_setopt(nni_ctx *, const char *, const void *, size_t, int); + #endif // CORE_SOCKET_H |
