aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c2
-rw-r--r--src/core/defs.h3
-rw-r--r--src/core/idhash.h2
-rw-r--r--src/core/nng_impl.h1
-rw-r--r--src/core/pollable.c101
-rw-r--r--src/core/pollable.h26
-rw-r--r--src/core/protocol.h45
-rw-r--r--src/core/socket.c354
-rw-r--r--src/core/socket.h42
9 files changed, 532 insertions, 44 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index 9cf04217..3027b0c1 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -324,6 +324,7 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
}
if (!aio->a_sleep) {
+ // Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
aio->a_expire = NNI_TIME_ZERO;
@@ -338,7 +339,6 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
}
}
- // Convert the relative timeout to an absolute timeout.
if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
diff --git a/src/core/defs.h b/src/core/defs.h
index f64d3df7..1d656bb2 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -41,6 +41,7 @@ typedef struct nng_notify nni_notify;
// These are our own names.
typedef struct nni_socket nni_sock;
+typedef struct nni_ctx nni_ctx;
typedef struct nni_ep nni_ep;
typedef struct nni_pipe nni_pipe;
typedef struct nni_tran nni_tran;
@@ -49,6 +50,8 @@ typedef struct nni_tran_ep_option nni_tran_ep_option;
typedef struct nni_tran_pipe nni_tran_pipe;
typedef struct nni_tran_pipe_option nni_tran_pipe_option;
+typedef struct nni_proto_ctx_option nni_proto_ctx_option;
+typedef struct nni_proto_ctx_ops nni_proto_ctx_ops;
typedef struct nni_proto_sock_ops nni_proto_sock_ops;
typedef struct nni_proto_pipe_ops nni_proto_pipe_ops;
typedef struct nni_proto_sock_option nni_proto_sock_option;
diff --git a/src/core/idhash.h b/src/core/idhash.h
index c2cecf81..9dbdd45e 100644
--- a/src/core/idhash.h
+++ b/src/core/idhash.h
@@ -29,8 +29,6 @@ typedef struct nni_idhash_entry nni_idhash_entry;
extern int nni_idhash_init(nni_idhash **);
extern void nni_idhash_fini(nni_idhash *);
extern void nni_idhash_set_limits(nni_idhash *, uint64_t, uint64_t, uint64_t);
-extern int nni_idhash_create(nni_idhash **);
-extern void nni_idhash_destroy(nni_idhash *);
extern int nni_idhash_find(nni_idhash *, uint64_t, void **);
extern int nni_idhash_remove(nni_idhash *, uint64_t);
extern int nni_idhash_insert(nni_idhash *, uint64_t, void *);
diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h
index 5c750ec7..fdb2ce94 100644
--- a/src/core/nng_impl.h
+++ b/src/core/nng_impl.h
@@ -37,6 +37,7 @@
#include "core/msgqueue.h"
#include "core/options.h"
#include "core/panic.h"
+#include "core/pollable.h"
#include "core/protocol.h"
#include "core/random.h"
#include "core/reap.h"
diff --git a/src/core/pollable.c b/src/core/pollable.c
new file mode 100644
index 00000000..b5cecf37
--- /dev/null
+++ b/src/core/pollable.c
@@ -0,0 +1,101 @@
+//
+// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2018 Capitar IT Group BV <info@capitar.com>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#include "core/nng_impl.h"
+
+struct nni_pollable {
+ int p_rfd;
+ int p_wfd;
+ nni_mtx p_lock;
+ bool p_raised;
+ bool p_open;
+};
+
+int
+nni_pollable_alloc(nni_pollable **pp)
+{
+ nni_pollable *p;
+ if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ p->p_open = false;
+ p->p_raised = false;
+ nni_mtx_init(&p->p_lock);
+ *pp = p;
+ return (0);
+}
+
+void
+nni_pollable_free(nni_pollable *p)
+{
+ if (p == NULL) {
+ return;
+ }
+ if (p->p_open) {
+ nni_plat_pipe_close(p->p_rfd, p->p_wfd);
+ }
+ nni_mtx_fini(&p->p_lock);
+ NNI_FREE_STRUCT(p);
+}
+
+void
+nni_pollable_raise(nni_pollable *p)
+{
+ if (p == NULL) {
+ return;
+ }
+ nni_mtx_lock(&p->p_lock);
+ p->p_raised = true;
+ if (p->p_open) {
+ nni_mtx_unlock(&p->p_lock);
+ nni_plat_pipe_raise(p->p_wfd);
+ return;
+ }
+ nni_mtx_unlock(&p->p_lock);
+}
+
+void
+nni_pollable_clear(nni_pollable *p)
+{
+ if (p == NULL) {
+ return;
+ }
+ nni_mtx_lock(&p->p_lock);
+ p->p_raised = false;
+ if (p->p_open) {
+ nni_mtx_unlock(&p->p_lock);
+ nni_plat_pipe_clear(p->p_rfd);
+ return;
+ }
+ nni_mtx_unlock(&p->p_lock);
+}
+
+int
+nni_pollable_getfd(nni_pollable *p, int *fdp)
+{
+ if (p == NULL) {
+ return (NNG_EINVAL);
+ }
+ nni_mtx_lock(&p->p_lock);
+ if (!p->p_open) {
+ int rv;
+ if ((rv = nni_plat_pipe_open(&p->p_wfd, &p->p_rfd)) != 0) {
+ nni_mtx_unlock(&p->p_lock);
+ return (rv);
+ }
+ p->p_open = true;
+ if (p->p_raised) {
+ nni_plat_pipe_raise(p->p_wfd);
+ }
+ }
+ nni_mtx_unlock(&p->p_lock);
+ *fdp = p->p_rfd;
+ return (0);
+}
diff --git a/src/core/pollable.h b/src/core/pollable.h
new file mode 100644
index 00000000..50ec9bf6
--- /dev/null
+++ b/src/core/pollable.h
@@ -0,0 +1,26 @@
+//
+// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
+
+#ifndef CORE_POLLABLE_H
+#define CORE_POLLABLE_H
+
+#include "core/defs.h"
+#include "core/list.h"
+
+// For the sake of simplicity, we just maintain a single global timer thread.
+
+typedef struct nni_pollable nni_pollable;
+
+extern int nni_pollable_alloc(nni_pollable **);
+extern void nni_pollable_free(nni_pollable *);
+extern void nni_pollable_raise(nni_pollable *);
+extern void nni_pollable_clear(nni_pollable *);
+extern int nni_pollable_getfd(nni_pollable *, int *);
+
+#endif // CORE_POLLABLE_H
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 9b241138..964aee1a 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -47,6 +47,39 @@ struct nni_proto_pipe_ops {
void (*pipe_stop)(void *);
};
+struct nni_proto_ctx_option {
+ const char *co_name;
+ int co_type;
+ int (*co_getopt)(void *, void *, size_t *, int);
+ int (*co_setopt)(void *, const void *, size_t, int);
+};
+
+struct nni_proto_ctx_ops {
+ // ctx_init creates a new context. The second argument is the
+ // protocol specific socket structure.
+ int (*ctx_init)(void **, void *);
+
+ // ctx_fini destroys a context.
+ void (*ctx_fini)(void *);
+
+ // ctx_recv is an asynchronous recv.
+ void (*ctx_recv)(void *, nni_aio *);
+
+ // ctx_send is an asynchronous send.
+ void (*ctx_send)(void *, nni_aio *);
+
+ // ctx_drain drains the context, signaling the aio when done.
+ // This should prevent any further receives from completing,
+ // and only sends that had already been submitted should be
+ // permitted to continue. It may be NULL for protocols where
+ // draining without an ability to receive makes no sense
+ // (e.g. REQ or SURVEY).
+ void (*ctx_drain)(void *, nni_aio *);
+
+ // ctx_options array.
+ nni_proto_ctx_option *ctx_options;
+};
+
struct nni_proto_sock_option {
const char *pso_name;
int pso_type;
@@ -87,6 +120,12 @@ struct nni_proto_sock_ops {
// should return NULL, otherwise the message (possibly modified).
nni_msg *(*sock_filter)(void *, nni_msg *);
+ // Socket draining is intended to permit protocols to "drain"
+ // before exiting. For protocols where draining makes no
+ // sense, this may be NULL. (Example: REQ and SURVEYOR should
+ // not drain, because they cannot receive a reply!)
+ void (*sock_drain)(void *, nni_aio *);
+
// Options. Must not be NULL. Final entry should have NULL name.
nni_proto_sock_option *sock_options;
};
@@ -103,6 +142,7 @@ struct nni_proto {
uint32_t proto_flags; // Protocol flags
const nni_proto_sock_ops *proto_sock_ops; // Per-socket opeations
const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations.
+ const nni_proto_ctx_ops * proto_ctx_ops; // Context operations.
// proto_init, if not NULL, provides a function that initializes
// global values. The main purpose of this may be to initialize
@@ -128,11 +168,14 @@ struct nni_proto {
// These flags determine which operations make sense. We use them so that
// we can reject attempts to create notification fds for operations that make
// no sense. Also, we can detect raw mode, thereby providing handling for
-// that at the socket layer (NNG_PROTO_FLAG_RAW).
+// that at the socket layer (NNG_PROTO_FLAG_RAW). Finally, we provide the
+// NNI_PROTO_FLAG_NOMSGQ flag for protocols that do not use the upper write
+// or upper read queues.
#define NNI_PROTO_FLAG_RCV 1 // Protocol can receive
#define NNI_PROTO_FLAG_SND 2 // Protocol can send
#define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv
#define NNI_PROTO_FLAG_RAW 4 // Protocol is raw
+#define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queues
// nni_proto_open is called by the protocol to create a socket instance
// with its ops vector. The intent is that applications will only see
diff --git a/src/core/socket.c b/src/core/socket.c
index 62950b1c..67a2991c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -18,6 +18,19 @@
static nni_list nni_sock_list;
static nni_idhash *nni_sock_hash;
static nni_mtx nni_sock_lk;
+static nni_idhash *nni_ctx_hash;
+
+struct nni_ctx {
+ nni_list_node c_node;
+ nni_sock * c_sock;
+ nni_proto_ctx_ops c_ops;
+ void * c_data;
+ bool c_closed;
+ unsigned c_refcnt; // protected by global lock
+ uint32_t c_id;
+ nng_duration c_sndtimeo;
+ nng_duration c_rcvtimeo;
+};
typedef struct nni_socket_option {
const char *so_name;
@@ -53,6 +66,7 @@ struct nni_socket {
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
+ nni_proto_ctx_ops s_ctx_ops;
// options
nni_duration s_linger; // linger time
@@ -66,15 +80,19 @@ struct nni_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
nni_notifyfd s_send_fd;
nni_notifyfd s_recv_fd;
};
+static void nni_ctx_destroy(nni_ctx *);
+
static void
nni_sock_can_send_cb(void *arg, int flags)
{
@@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags)
}
}
-void
-nni_sock_set_sendable(nni_sock *s, bool cansend)
-{
- nni_notifyfd *fd = &s->s_send_fd;
- if (fd->sn_init) {
- if (cansend) {
- nni_plat_pipe_raise(fd->sn_wfd);
- } else {
- nni_plat_pipe_clear(fd->sn_rfd);
- }
- }
-}
-
-void
-nni_sock_set_recvable(nni_sock *s, bool canrecv)
-{
- nni_notifyfd *fd = &s->s_recv_fd;
- if (fd->sn_init) {
- if (canrecv) {
- nni_plat_pipe_raise(fd->sn_wfd);
- } else {
- nni_plat_pipe_clear(fd->sn_rfd);
- }
- }
-}
-
static int
nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
{
@@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
return (rv);
}
- nni_msgq_set_cb(mq, cb, fd);
+ // Only set the callback on the message queue if we are
+ // using it. The message queue automatically updates
+ // the pipe when the callback is first established.
+ // If we are not using the message queue, then we have
+ // to update the initial state explicitly ourselves.
+
+ if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) {
+ nni_msgq_set_cb(mq, cb, fd);
+ }
fd->sn_init = 1;
}
@@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
s->s_sock_ops = *proto->proto_sock_ops;
s->s_pipe_ops = *proto->proto_pipe_ops;
+ if (proto->proto_ctx_ops != NULL) {
+ s->s_ctx_ops = *proto->proto_ctx_ops;
+ }
+
NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
@@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
+ NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
+
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
nni_mtx_init(&s->s_mx);
@@ -613,19 +619,27 @@ nni_sock_sys_init(void)
NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
nni_mtx_init(&nni_sock_lk);
- if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) {
+ if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) ||
+ ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) {
nni_sock_sys_fini();
- } else {
- nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
+ return (rv);
}
- return (rv);
+ nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
+ nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1);
+ return (0);
}
void
nni_sock_sys_fini(void)
{
- nni_idhash_fini(nni_sock_hash);
- nni_sock_hash = NULL;
+ if (nni_sock_hash != NULL) {
+ nni_idhash_fini(nni_sock_hash);
+ nni_sock_hash = NULL;
+ }
+ if (nni_ctx_hash != NULL) {
+ nni_idhash_fini(nni_ctx_hash);
+ nni_ctx_hash = NULL;
+ }
nni_mtx_fini(&nni_sock_lk);
}
@@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
s->s_sock_ops.sock_open(s->s_data);
*sockp = s;
}
+ nni_mtx_unlock(&nni_sock_lk);
+
// Set the sockname.
(void) snprintf(
s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id);
- nni_mtx_unlock(&nni_sock_lk);
return (rv);
}
@@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock)
nni_ep * ep;
nni_ep * nep;
nni_time linger;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&sock->s_mx);
+ // We now mark any owned contexts as closing.
+ // XXX: Add context draining support here!
+ nni_mtx_lock(&nni_sock_lk);
+ nctx = nni_list_first(&sock->s_ctxs);
+ while ((ctx = nctx) != NULL) {
+ nctx = nni_list_next(&sock->s_ctxs, ctx);
+ ctx->c_closed = true;
+ if (ctx->c_refcnt == 0) {
+ // No open operations. So close it.
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_list_remove(&sock->s_ctxs, ctx);
+ nni_ctx_destroy(ctx);
+ }
+ // If still has a reference count, then wait for last
+ // reference to close before nuking it.
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // XXX: Add protocol specific drain here. This should replace the
+ // msgq_drain feature below. Probably msgq_drain will need to
+ // be changed to take an AIO for completion.
+
// We drain the upper write queue. This is just like closing it,
// except that the protocol gets a chance to get the messages and
// push them down to the transport. This operation can *block*
- // until the linger time has expired.
- nni_msgq_drain(sock->s_uwq, linger);
+ // until the linger time has expired. We only do this for sendable
+ // sockets that are actually using the message queue of course.
+ if ((nni_sock_flags(sock) &
+ (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) ==
+ NNI_PROTO_FLAG_SND) {
+ nni_msgq_drain(sock->s_uwq, linger);
+ }
// Generally, unless the protocol is blocked trying to perform
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
// a chance to do so gracefully.
+
+ nni_mtx_lock(&nni_sock_lk);
+ while (!nni_list_empty(&sock->s_ctxs)) {
+ sock->s_ctxwait = true;
+ nni_cv_wait(&sock->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
nni_mtx_lock(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
@@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s)
// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
- while (s->s_refcnt > 1) {
+ s->s_ctxwait = true;
+ while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
nni_mtx_unlock(&nni_sock_lk);
@@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock)
{
return (sock->s_flags);
}
+
+int
+nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
+{
+ int rv;
+ nni_ctx *ctx;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&nni_sock_lk);
+ if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) {
+ // We refuse a reference if either the socket is closed,
+ // or the context is closed. (If the socket is closed,
+ // and we are only getting the reference so we can close it,
+ // then we still allow. In the case the only valid operation
+ // will be to close the socket.)
+ if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
+ rv = NNG_ECLOSED;
+ } else {
+ ctx->c_refcnt++;
+ *ctxp = ctx;
+ }
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ if (rv == NNG_ENOENT) {
+ rv = NNG_ECLOSED;
+ }
+
+ return (rv);
+}
+
+static void
+nni_ctx_destroy(nni_ctx *ctx)
+{
+ if (ctx->c_data != NULL) {
+ ctx->c_ops.ctx_fini(ctx->c_data);
+ }
+
+ // Let the socket go, our hold on it is done.
+ NNI_FREE_STRUCT(ctx);
+}
+
+void
+nni_ctx_rele(nni_ctx *ctx)
+{
+ nni_sock *sock = ctx->c_sock;
+ nni_mtx_lock(&nni_sock_lk);
+ ctx->c_refcnt--;
+ if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) {
+ // Either still have an active reference, or not actually
+ // closing yet.
+ nni_mtx_unlock(&nni_sock_lk);
+ return;
+ }
+
+ // Remove us from the hash, so we can't be found any more.
+ // This allows our ID to be reused later, although the system
+ // tries to avoid ID reuse.
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_list_remove(&sock->s_ctxs, ctx);
+ if (sock->s_closed || sock->s_ctxwait) {
+ nni_cv_wake(&sock->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ nni_ctx_destroy(ctx);
+}
+
+int
+nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
+{
+ nni_ctx *ctx;
+ int rv;
+ uint64_t id;
+
+ if (sock->s_ctx_ops.ctx_init == NULL) {
+ return (NNG_ENOTSUP);
+ }
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_lock(&nni_sock_lk);
+ if (sock->s_closed) {
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (NNG_ECLOSED);
+ }
+ if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) {
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (rv);
+ }
+ ctx->c_id = (uint32_t) id;
+
+ if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (rv);
+ }
+
+ ctx->c_closed = false;
+ ctx->c_refcnt = 1; // Caller implicitly gets a reference.
+ ctx->c_sock = sock;
+ ctx->c_ops = sock->s_ctx_ops;
+ ctx->c_rcvtimeo = sock->s_rcvtimeo;
+ ctx->c_sndtimeo = sock->s_sndtimeo;
+
+ nni_list_append(&sock->s_ctxs, ctx);
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // Paranoia, fixing a possible race in close. Don't let us
+ // give back a context if the socket is being shutdown (it might
+ // not have reached the "closed" state yet.)
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ctx_rele(ctx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+ *ctxp = ctx;
+
+ return (0);
+}
+
+void
+nni_ctx_close(nni_ctx *ctx)
+{
+ nni_mtx_lock(&nni_sock_lk);
+ ctx->c_closed = true;
+ nni_mtx_unlock(&nni_sock_lk);
+
+ nni_ctx_rele(ctx);
+}
+
+uint32_t
+nni_ctx_id(nni_ctx *ctx)
+{
+ return (ctx->c_id);
+}
+
+void
+nni_ctx_send(nni_ctx *ctx, nni_aio *aio)
+{
+ nni_aio_normalize_timeout(aio, ctx->c_sndtimeo);
+ ctx->c_ops.ctx_send(ctx->c_data, aio);
+}
+
+void
+nni_ctx_recv(nni_ctx *ctx, nni_aio *aio)
+{
+ nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo);
+ ctx->c_ops.ctx_recv(ctx->c_data, aio);
+}
+
+int
+nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ)
+{
+ nni_sock * sock = ctx->c_sock;
+ nni_proto_ctx_option *co;
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+ if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
+ rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ);
+ } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
+ rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ);
+ } else {
+ rv = NNG_ENOTSUP;
+ for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) {
+ if (strcmp(opt, co->co_name) != 0) {
+ continue;
+ }
+ if (co->co_getopt == NULL) {
+ rv = NNG_EWRITEONLY;
+ break;
+ }
+ rv = co->co_getopt(ctx->c_data, v, szp, typ);
+ break;
+ }
+ }
+
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+}
+
+int
+nni_ctx_setopt(
+ nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ)
+{
+ nni_sock * sock = ctx->c_sock;
+ nni_proto_ctx_option *co;
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+ if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
+ rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ);
+ } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
+ rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ);
+ } else {
+ rv = NNG_ENOTSUP;
+ for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) {
+ if (strcmp(opt, co->co_name) != 0) {
+ continue;
+ }
+ if (co->co_setopt == NULL) {
+ rv = NNG_EREADONLY;
+ break;
+ }
+ rv = co->co_setopt(ctx->c_data, v, sz, typ);
+ break;
+ }
+ }
+
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 3af6bb9e..87bf1374 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -63,4 +63,46 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *);
// nni_sock_flags returns the socket flags, used to indicate whether read
// and or write are appropriate for the protocol.
extern uint32_t nni_sock_flags(nni_sock *);
+
+// nni_ctx_open is used to open/create a new context structure.
+// Contexts are not supported by most protocols, but for those that do,
+// this can offer some improvements for massive concurrency/scalability.
+// Returns NNG_ENOTSUP for protocols that lack context support. This adds
+// another reference (hold) on the socket on success, and the newly
+// created context is also held by the caller. Not supported by raw mode
+// sockets (will also return NNG_ENOTSUP).
+extern int nni_ctx_open(nni_ctx **, nni_sock *);
+
+// nni_ctx_find finds a context given its id. The last argument should
+// be true if the context is acquired merely to close it, false otherwise.
+// (If the socket for the context is being closed, then this will return
+// NNG_ECLOSED unless the final argument is true.)
+extern int nni_ctx_find(nni_ctx **, uint32_t, bool);
+
+// nni_ctx_rele is called to release a hold on the context. These holds
+// are acquired by either nni_ctx_open or nni_ctx_find. If the context
+// is being closed (nni_ctx_close was called), and this is the last reference,
+// then the underlying context is freed, and the implicit socket hold
+// by the context is also released.
+extern void nni_ctx_rele(nni_ctx *);
+
+// nni_ctx_close is used to close the context. It also implictly releases
+// the context.
+extern void nni_ctx_close(nni_ctx *);
+
+// nni_ctx_id returns the context ID, which can be used with nni_ctx_find.
+extern uint32_t nni_ctx_id(nni_ctx *);
+
+// nni_ctx_recv is an asychronous receive.
+extern void nni_ctx_recv(nni_ctx *, nni_aio *);
+
+// nni_ctx_send is an asychronous receive.
+extern void nni_ctx_send(nni_ctx *, nni_aio *);
+
+// nni_ctx_getopt is used to get a context option.
+extern int nni_ctx_getopt(nni_ctx *, const char *, void *, size_t *, int);
+
+// nni_ctx_setopt is used to set a context option.
+extern int nni_ctx_setopt(nni_ctx *, const char *, const void *, size_t, int);
+
#endif // CORE_SOCKET_H