aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c354
1 files changed, 314 insertions, 40 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 62950b1c..67a2991c 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -18,6 +18,19 @@
static nni_list nni_sock_list;
static nni_idhash *nni_sock_hash;
static nni_mtx nni_sock_lk;
+static nni_idhash *nni_ctx_hash;
+
+struct nni_ctx {
+ nni_list_node c_node;
+ nni_sock * c_sock;
+ nni_proto_ctx_ops c_ops;
+ void * c_data;
+ bool c_closed;
+ unsigned c_refcnt; // protected by global lock
+ uint32_t c_id;
+ nng_duration c_sndtimeo;
+ nng_duration c_rcvtimeo;
+};
typedef struct nni_socket_option {
const char *so_name;
@@ -53,6 +66,7 @@ struct nni_socket {
nni_proto_pipe_ops s_pipe_ops;
nni_proto_sock_ops s_sock_ops;
+ nni_proto_ctx_ops s_ctx_ops;
// options
nni_duration s_linger; // linger time
@@ -66,15 +80,19 @@ struct nni_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
nni_notifyfd s_send_fd;
nni_notifyfd s_recv_fd;
};
+static void nni_ctx_destroy(nni_ctx *);
+
static void
nni_sock_can_send_cb(void *arg, int flags)
{
@@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags)
}
}
-void
-nni_sock_set_sendable(nni_sock *s, bool cansend)
-{
- nni_notifyfd *fd = &s->s_send_fd;
- if (fd->sn_init) {
- if (cansend) {
- nni_plat_pipe_raise(fd->sn_wfd);
- } else {
- nni_plat_pipe_clear(fd->sn_rfd);
- }
- }
-}
-
-void
-nni_sock_set_recvable(nni_sock *s, bool canrecv)
-{
- nni_notifyfd *fd = &s->s_recv_fd;
- if (fd->sn_init) {
- if (canrecv) {
- nni_plat_pipe_raise(fd->sn_wfd);
- } else {
- nni_plat_pipe_clear(fd->sn_rfd);
- }
- }
-}
-
static int
nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
{
@@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp)
return (rv);
}
- nni_msgq_set_cb(mq, cb, fd);
+ // Only set the callback on the message queue if we are
+ // using it. The message queue automatically updates
+ // the pipe when the callback is first established.
+ // If we are not using the message queue, then we have
+ // to update the initial state explicitly ourselves.
+
+ if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) {
+ nni_msgq_set_cb(mq, cb, fd);
+ }
fd->sn_init = 1;
}
@@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
s->s_sock_ops = *proto->proto_sock_ops;
s->s_pipe_ops = *proto->proto_pipe_ops;
+ if (proto->proto_ctx_ops != NULL) {
+ s->s_ctx_ops = *proto->proto_ctx_ops;
+ }
+
NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
@@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
+ NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
+
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
nni_mtx_init(&s->s_mx);
@@ -613,19 +619,27 @@ nni_sock_sys_init(void)
NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
nni_mtx_init(&nni_sock_lk);
- if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) {
+ if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) ||
+ ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) {
nni_sock_sys_fini();
- } else {
- nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
+ return (rv);
}
- return (rv);
+ nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1);
+ nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1);
+ return (0);
}
void
nni_sock_sys_fini(void)
{
- nni_idhash_fini(nni_sock_hash);
- nni_sock_hash = NULL;
+ if (nni_sock_hash != NULL) {
+ nni_idhash_fini(nni_sock_hash);
+ nni_sock_hash = NULL;
+ }
+ if (nni_ctx_hash != NULL) {
+ nni_idhash_fini(nni_ctx_hash);
+ nni_ctx_hash = NULL;
+ }
nni_mtx_fini(&nni_sock_lk);
}
@@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
s->s_sock_ops.sock_open(s->s_data);
*sockp = s;
}
+ nni_mtx_unlock(&nni_sock_lk);
+
// Set the sockname.
(void) snprintf(
s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id);
- nni_mtx_unlock(&nni_sock_lk);
return (rv);
}
@@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock)
nni_ep * ep;
nni_ep * nep;
nni_time linger;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock)
}
nni_mtx_unlock(&sock->s_mx);
+ // We now mark any owned contexts as closing.
+ // XXX: Add context draining support here!
+ nni_mtx_lock(&nni_sock_lk);
+ nctx = nni_list_first(&sock->s_ctxs);
+ while ((ctx = nctx) != NULL) {
+ nctx = nni_list_next(&sock->s_ctxs, ctx);
+ ctx->c_closed = true;
+ if (ctx->c_refcnt == 0) {
+ // No open operations. So close it.
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_list_remove(&sock->s_ctxs, ctx);
+ nni_ctx_destroy(ctx);
+ }
+ // If still has a reference count, then wait for last
+ // reference to close before nuking it.
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // XXX: Add protocol specific drain here. This should replace the
+ // msgq_drain feature below. Probably msgq_drain will need to
+ // be changed to take an AIO for completion.
+
// We drain the upper write queue. This is just like closing it,
// except that the protocol gets a chance to get the messages and
// push them down to the transport. This operation can *block*
- // until the linger time has expired.
- nni_msgq_drain(sock->s_uwq, linger);
+ // until the linger time has expired. We only do this for sendable
+ // sockets that are actually using the message queue of course.
+ if ((nni_sock_flags(sock) &
+ (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) ==
+ NNI_PROTO_FLAG_SND) {
+ nni_msgq_drain(sock->s_uwq, linger);
+ }
// Generally, unless the protocol is blocked trying to perform
// writes (e.g. a slow reader on the other side), it should be
// trying to shut things down. We wait to give it
// a chance to do so gracefully.
+
+ nni_mtx_lock(&nni_sock_lk);
+ while (!nni_list_empty(&sock->s_ctxs)) {
+ sock->s_ctxwait = true;
+ nni_cv_wait(&sock->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
nni_mtx_lock(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
@@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s)
// Wait for all other references to drop. Note that we
// have a reference already (from our caller).
- while (s->s_refcnt > 1) {
+ s->s_ctxwait = true;
+ while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) {
nni_cv_wait(&s->s_close_cv);
}
nni_mtx_unlock(&nni_sock_lk);
@@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock)
{
return (sock->s_flags);
}
+
+int
+nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
+{
+ int rv;
+ nni_ctx *ctx;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(&nni_sock_lk);
+ if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) {
+ // We refuse a reference if either the socket is closed,
+ // or the context is closed. (If the socket is closed,
+ // and we are only getting the reference so we can close it,
+ // then we still allow. In the case the only valid operation
+ // will be to close the socket.)
+ if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
+ rv = NNG_ECLOSED;
+ } else {
+ ctx->c_refcnt++;
+ *ctxp = ctx;
+ }
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ if (rv == NNG_ENOENT) {
+ rv = NNG_ECLOSED;
+ }
+
+ return (rv);
+}
+
+static void
+nni_ctx_destroy(nni_ctx *ctx)
+{
+ if (ctx->c_data != NULL) {
+ ctx->c_ops.ctx_fini(ctx->c_data);
+ }
+
+ // Let the socket go, our hold on it is done.
+ NNI_FREE_STRUCT(ctx);
+}
+
+void
+nni_ctx_rele(nni_ctx *ctx)
+{
+ nni_sock *sock = ctx->c_sock;
+ nni_mtx_lock(&nni_sock_lk);
+ ctx->c_refcnt--;
+ if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) {
+ // Either still have an active reference, or not actually
+ // closing yet.
+ nni_mtx_unlock(&nni_sock_lk);
+ return;
+ }
+
+ // Remove us from the hash, so we can't be found any more.
+ // This allows our ID to be reused later, although the system
+ // tries to avoid ID reuse.
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_list_remove(&sock->s_ctxs, ctx);
+ if (sock->s_closed || sock->s_ctxwait) {
+ nni_cv_wake(&sock->s_close_cv);
+ }
+ nni_mtx_unlock(&nni_sock_lk);
+
+ nni_ctx_destroy(ctx);
+}
+
+int
+nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
+{
+ nni_ctx *ctx;
+ int rv;
+ uint64_t id;
+
+ if (sock->s_ctx_ops.ctx_init == NULL) {
+ return (NNG_ENOTSUP);
+ }
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ nni_mtx_lock(&nni_sock_lk);
+ if (sock->s_closed) {
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (NNG_ECLOSED);
+ }
+ if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) {
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (rv);
+ }
+ ctx->c_id = (uint32_t) id;
+
+ if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) {
+ nni_idhash_remove(nni_ctx_hash, ctx->c_id);
+ nni_mtx_unlock(&nni_sock_lk);
+ NNI_FREE_STRUCT(ctx);
+ return (rv);
+ }
+
+ ctx->c_closed = false;
+ ctx->c_refcnt = 1; // Caller implicitly gets a reference.
+ ctx->c_sock = sock;
+ ctx->c_ops = sock->s_ctx_ops;
+ ctx->c_rcvtimeo = sock->s_rcvtimeo;
+ ctx->c_sndtimeo = sock->s_sndtimeo;
+
+ nni_list_append(&sock->s_ctxs, ctx);
+ nni_mtx_unlock(&nni_sock_lk);
+
+ // Paranoia, fixing a possible race in close. Don't let us
+ // give back a context if the socket is being shutdown (it might
+ // not have reached the "closed" state yet.)
+ nni_mtx_lock(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_ctx_rele(ctx);
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+ *ctxp = ctx;
+
+ return (0);
+}
+
+void
+nni_ctx_close(nni_ctx *ctx)
+{
+ nni_mtx_lock(&nni_sock_lk);
+ ctx->c_closed = true;
+ nni_mtx_unlock(&nni_sock_lk);
+
+ nni_ctx_rele(ctx);
+}
+
+uint32_t
+nni_ctx_id(nni_ctx *ctx)
+{
+ return (ctx->c_id);
+}
+
+void
+nni_ctx_send(nni_ctx *ctx, nni_aio *aio)
+{
+ nni_aio_normalize_timeout(aio, ctx->c_sndtimeo);
+ ctx->c_ops.ctx_send(ctx->c_data, aio);
+}
+
+void
+nni_ctx_recv(nni_ctx *ctx, nni_aio *aio)
+{
+ nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo);
+ ctx->c_ops.ctx_recv(ctx->c_data, aio);
+}
+
+int
+nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ)
+{
+ nni_sock * sock = ctx->c_sock;
+ nni_proto_ctx_option *co;
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+ if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
+ rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ);
+ } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
+ rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ);
+ } else {
+ rv = NNG_ENOTSUP;
+ for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) {
+ if (strcmp(opt, co->co_name) != 0) {
+ continue;
+ }
+ if (co->co_getopt == NULL) {
+ rv = NNG_EWRITEONLY;
+ break;
+ }
+ rv = co->co_getopt(ctx->c_data, v, szp, typ);
+ break;
+ }
+ }
+
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+}
+
+int
+nni_ctx_setopt(
+ nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ)
+{
+ nni_sock * sock = ctx->c_sock;
+ nni_proto_ctx_option *co;
+ int rv;
+
+ nni_mtx_lock(&sock->s_mx);
+ if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) {
+ rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ);
+ } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) {
+ rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ);
+ } else {
+ rv = NNG_ENOTSUP;
+ for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) {
+ if (strcmp(opt, co->co_name) != 0) {
+ continue;
+ }
+ if (co->co_setopt == NULL) {
+ rv = NNG_EREADONLY;
+ break;
+ }
+ rv = co->co_setopt(ctx->c_data, v, sz, typ);
+ break;
+ }
+ }
+
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
+}