aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-04-04 13:36:54 -0700
committerGarrett D'Amore <garrett@damore.org>2018-04-10 15:40:00 -0700
commit5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 (patch)
tree39debf4ecde234b2a0be19c9cb15628cc32c2edb /src/core/socket.c
parent56f1bf30e61c53646dd2f8425da7c7fa0d97b3e1 (diff)
downloadnng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.gz
nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.bz2
nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.zip
fixes #334 Separate context for state machines from sockets
This provides context support for REQ and REP sockets. More discussion around this is in the issue itself. Optionally we would like to extend this to the surveyor pattern. Note that we specifically do not support pollable descriptors for non-default contexts, and the results of using file descriptors for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined. In the future, it might be nice to figure out how to factor in optional use of a message queue for users who want more buffering, but we think there is little need for this with cooked mode.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c354
1 files changed, 314 insertions, 40 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 62950b1c..67a2991c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -18,6 +18,19 @@
static nni_list nni_sock_list;
static nni_idhash *nni_sock_hash;
static nni_mtx nni_sock_lk;
+static nni_idhash *nni_ctx_hash;
+
+struct nni_ctx {
+ nni_list_node c_node;
+ nni_sock * c_sock;
+ nni_proto_ctx_ops c_ops;
+ void * c_data;
+ bool c_closed;
+ unsigned c_refcnt; // protected by global lock
+ uint32_t c_id;
+ nng_duration c_sndtimeo;
+ nng_duration c_rcvtimeo;
+};
typedef struct nni_socket_option {
const char *so_name;
@@ -53,6 +66,7 @@ struct nni_socket {
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
+ nni_proto_ctx_ops s_ctx_ops;
// options
nni_duration s_linger; // linger time
@@ -66,15 +80,19 @@ struct nni_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
nni_notifyfd s_send_fd;
nni_notifyfd s_recv_fd;
};
+static void nni_ctx_destroy(nni_ctx *);
+
static void
nni_sock_can_send_cb(void *arg, int flags)
{
@@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags)
}
}
-void
-nni_sock_set_sendable(nni_sock *s, bool cansend)
-{
- nni_notifyfd *fd = &s->s_send_fd;
- if (fd->sn_init) {
- if (cansend) {
- nni_plat_pipe_raise(fd->sn_wfd);
- } else {
- nni_plat_pipe_clear(fd->sn_rfd);
- }
- }
-}
-
-void
-nni_sock_set_recvable(nni_sock *s, bool canrecv)
-{
- nni_notifyfd *fd = &s->s_recv_fd;
- if (fd->sn_init) {
- if (canrecv) {
- nni_plat_pipe_raise(fd->sn_wfd);
- } else {
- nni_plat_pipe_clear(fd->sn_rfd);
- }
- }
-}
-
static int
nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
{
@@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
return (rv);
}
- nni_msgq_set_cb(mq, cb, fd);
+ // Only set the callback on the message queue if we are
+ // using it. The message queue automatically updates
+ // the pipe when the callback is first established.
+ // If we are not using the message queue, then we have
+ // to update the initial state explicitly ourselves.
+
+ if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) {
+ nni_msgq_set_cb(mq, cb, fd);
+ }
fd->sn_init = 1;
}
@@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
s->s_sock_ops = *proto->proto_sock_ops;
s->s_pipe_ops = *proto->proto_pipe_ops;
+ if (proto->proto_ctx_ops != NULL) {
+ s->s_ctx_ops = *proto->proto_ctx_ops;
+ }
+
NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
@@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
+ NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
+
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
nni_mtx_init(&s->s_mx);
@@ -613,19 +619,27 @@ nni_sock_sys_init(void)
NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
nni_mtx_init(&nni_sock_lk);
- if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) {
+ if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) ||
+ ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) {
nni_sock_sys_fini();
- } else {
- nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
+ return (rv);
}
- return (rv);
+ nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
+ nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1);
+ return (0);
}
void
nni_sock_sys_fini(void)
{
- nni_idhash_fini(nni_sock_hash);
- nni_sock_hash = NULL;
+ if (nni_sock_hash != NULL) {
+ nni_idhash_fini(nni_sock_hash);
+ nni_sock_hash = NULL;
+ }
+ if (nni_ctx_hash != NULL) {
+ nni_idhash_fini(nni_ctx_hash);
+ nni_ctx_hash = NULL;
+ }
nni_mtx_fini(&nni_sock_lk);
}
@@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
s->s_sock_ops.sock_open(s->s_data);
*sockp = s;
}
+ nni_mtx_unlock(&nni_sock_lk);
+
// Set the sockname.
(void) snprintf(
s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id);
- nni_mtx_unlock(&nni_sock_lk);
return (rv);
}
@@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock)
nni_ep * ep;
nni_ep * nep;
nni_time linger;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&sock->s_mx);
+ // We now mark any owned contexts as closing.
+ // XXX: Add context draining support here!
+ nni_mtx_lock(&nni_sock_lk);
+ nctx = nni_list_first(&sock->s_ctxs);
+ while ((ctx = nctx) != NULL) {
+ nctx = nni_list_next(&sock->s_ctxs, ctx);
+ ctx->c_closed = true;
+ if (ctx->c_refcnt == 0) {
+ // No open operations. So close it.
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_list_remove(&sock->s_ctxs, ctx);
+ nni_ctx_destroy(ctx);
+ }
+ // If still has a reference count, then wait for last
+ // reference to close before nuking it.
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // XXX: Add protocol specific drain here. This should replace the
+ // msgq_drain feature below. Probably msgq_drain will need to
+ // be changed to take an AIO for completion.
+
// We drain the upper write queue. This is just like closing it,
// except that the protocol gets a chance to get the messages and
// push them down to the transport. This operation can *block*
- // until the linger time has expired.
- nni_msgq_drain(sock->s_uwq, linger);
+ // until the linger time has expired. We only do this for sendable
+ // sockets that are actually using the message queue of course.
+ if ((nni_sock_flags(sock) &
+ (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) ==
+ NNI_PROTO_FLAG_SND) {
+ nni_msgq_drain(sock->s_uwq, linger);
+ }
// Generally, unless the protocol is blocked trying to perform
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
// a chance to do so gracefully.
+
+ nni_mtx_lock(&nni_sock_lk);
+ while (!nni_list_empty(&sock->s_ctxs)) {
+ sock->s_ctxwait = true;
+ nni_cv_wait(&sock->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
nni_mtx_lock(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
@@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s)
// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
- while (s->s_refcnt > 1) {
+ s->s_ctxwait = true;
+ while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
nni_mtx_unlock(&nni_sock_lk);
@@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock)
{
return (sock->s_flags);
}
+
+int
+nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
+{
+ int rv;
+ nni_ctx *ctx;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&nni_sock_lk);
+ if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) {
+ // We refuse a reference if either the socket is closed,
+ // or the context is closed. (If the socket is closed,
+ // and we are only getting the reference so we can close it,
+ // then we still allow. In the case the only valid operation
+ // will be to close the socket.)
+ if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
+ rv = NNG_ECLOSED;
+ } else {
+ ctx->c_refcnt++;
+ *ctxp = ctx;
+ }
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ if (rv == NNG_ENOENT) {
+ rv = NNG_ECLOSED;
+ }
+
+ return (rv);
+}
+
+static void
+nni_ctx_destroy(nni_ctx *ctx)
+{
+ if (ctx->c_data != NULL) {
+ ctx->c_ops.ctx_fini(ctx->c_data);
+ }
+
+ // Let the socket go, our hold on it is done.
+ NNI_FREE_STRUCT(ctx);
+}
+
+void
+nni_ctx_rele(nni_ctx *ctx)
+{
+ nni_sock *sock = ctx->c_sock;
+ nni_mtx_lock(&nni_sock_lk);
+ ctx->c_refcnt--;
+ if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) {
+ // Either still have an active reference, or not actually
+ // closing yet.
+ nni_mtx_unlock(&nni_sock_lk);
+ return;
+ }
+
+ // Remove us from the hash, so we can't be found any more.
+ // This allows our ID to be reused later, although the system
+ // tries to avoid ID reuse.
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_list_remove(&sock->s_ctxs, ctx);
+ if (sock->s_closed || sock->s_ctxwait) {
+ nni_cv_wake(&sock->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ nni_ctx_destroy(ctx);
+}
+
+int
+nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
+{
+ nni_ctx *ctx;
+ int rv;
+ uint64_t id;
+
+ if (sock->s_ctx_ops.ctx_init == NULL) {
+ return (NNG_ENOTSUP);
+ }
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_lock(&nni_sock_lk);
+ if (sock->s_closed) {
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (NNG_ECLOSED);
+ }
+ if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) {
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (rv);
+ }
+ ctx->c_id = (uint32_t) id;
+
+ if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (rv);
+ }
+
+ ctx->c_closed = false;
+ ctx->c_refcnt = 1; // Caller implicitly gets a reference.
+ ctx->c_sock = sock;
+ ctx->c_ops = sock->s_ctx_ops;
+ ctx->c_rcvtimeo = sock->s_rcvtimeo;
+ ctx->c_sndtimeo = sock->s_sndtimeo;
+
+ nni_list_append(&sock->s_ctxs, ctx);
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // Paranoia, fixing a possible race in close. Don't let us
+ // give back a context if the socket is being shutdown (it might
+ // not have reached the "closed" state yet.)
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ctx_rele(ctx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+ *ctxp = ctx;
+
+ return (0);
+}
+
+void
+nni_ctx_close(nni_ctx *ctx)
+{
+ nni_mtx_lock(&nni_sock_lk);
+ ctx->c_closed = true;
+ nni_mtx_unlock(&nni_sock_lk);
+
+ nni_ctx_rele(ctx);
+}
+
+uint32_t
+nni_ctx_id(nni_ctx *ctx)
+{
+ return (ctx->c_id);
+}
+
+void
+nni_ctx_send(nni_ctx *ctx, nni_aio *aio)
+{
+ nni_aio_normalize_timeout(aio, ctx->c_sndtimeo);
+ ctx->c_ops.ctx_send(ctx->c_data, aio);
+}
+
+void
+nni_ctx_recv(nni_ctx *ctx, nni_aio *aio)
+{
+ nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo);
+ ctx->c_ops.ctx_recv(ctx->c_data, aio);
+}
+
+int
+nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ)
+{
+ nni_sock * sock = ctx->c_sock;
+ nni_proto_ctx_option *co;
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+ if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
+ rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ);
+ } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
+ rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ);
+ } else {
+ rv = NNG_ENOTSUP;
+ for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) {
+ if (strcmp(opt, co->co_name) != 0) {
+ continue;
+ }
+ if (co->co_getopt == NULL) {
+ rv = NNG_EWRITEONLY;
+ break;
+ }
+ rv = co->co_getopt(ctx->c_data, v, szp, typ);
+ break;
+ }
+ }
+
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+}
+
+int
+nni_ctx_setopt(
+ nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ)
+{
+ nni_sock * sock = ctx->c_sock;
+ nni_proto_ctx_option *co;
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+ if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
+ rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ);
+ } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
+ rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ);
+ } else {
+ rv = NNG_ENOTSUP;
+ for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) {
+ if (strcmp(opt, co->co_name) != 0) {
+ continue;
+ }
+ if (co->co_setopt == NULL) {
+ rv = NNG_EREADONLY;
+ break;
+ }
+ rv = co->co_setopt(ctx->c_data, v, sz, typ);
+ break;
+ }
+ }
+
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+}