diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 354 |
1 files changed, 314 insertions, 40 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 62950b1c..67a2991c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,19 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +static nni_idhash *nni_ctx_hash; + +struct nni_ctx { + nni_list_node c_node; + nni_sock * c_sock; + nni_proto_ctx_ops c_ops; + void * c_data; + bool c_closed; + unsigned c_refcnt; // protected by global lock + uint32_t c_id; + nng_duration c_sndtimeo; + nng_duration c_rcvtimeo; +}; typedef struct nni_socket_option { const char *so_name; @@ -53,6 +66,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; + nni_proto_ctx_ops s_ctx_ops; // options nni_duration s_linger; // linger time @@ -66,15 +80,19 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void nni_ctx_destroy(nni_ctx *); + static void nni_sock_can_send_cb(void *arg, int flags) { @@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags) } } -void -nni_sock_set_sendable(nni_sock *s, bool cansend) -{ - nni_notifyfd *fd = &s->s_send_fd; - if (fd->sn_init) { - if (cansend) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - -void -nni_sock_set_recvable(nni_sock *s, bool canrecv) -{ - nni_notifyfd *fd = &s->s_recv_fd; - if (fd->sn_init) { - if (canrecv) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) return (rv); } - nni_msgq_set_cb(mq, cb, fd); + // Only set the callback on the message queue if we are + // using it. The message queue automatically updates + // the pipe when the callback is first established. + // If we are not using the message queue, then we have + // to update the initial state explicitly ourselves. + + if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { + nni_msgq_set_cb(mq, cb, fd); + } fd->sn_init = 1; } @@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) s->s_sock_ops = *proto->proto_sock_ops; s->s_pipe_ops = *proto->proto_pipe_ops; + if (proto->proto_ctx_ops != NULL) { + s->s_ctx_ops = *proto->proto_ctx_ops; + } + NNI_ASSERT(s->s_sock_ops.sock_open != NULL); NNI_ASSERT(s->s_sock_ops.sock_close != NULL); @@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); + NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); + nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); @@ -613,19 +619,27 @@ nni_sock_sys_init(void) NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); nni_mtx_init(&nni_sock_lk); - if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) { + if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || + ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) { nni_sock_sys_fini(); - } else { - nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + return (rv); } - return (rv); + nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1); + return (0); } void nni_sock_sys_fini(void) { - nni_idhash_fini(nni_sock_hash); - nni_sock_hash = NULL; + if (nni_sock_hash != NULL) { + nni_idhash_fini(nni_sock_hash); + nni_sock_hash = NULL; + } + if (nni_ctx_hash != NULL) { + nni_idhash_fini(nni_ctx_hash); + nni_ctx_hash = NULL; + } nni_mtx_fini(&nni_sock_lk); } @@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) s->s_sock_ops.sock_open(s->s_data); *sockp = s; } + nni_mtx_unlock(&nni_sock_lk); + // Set the sockname. (void) snprintf( s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); - nni_mtx_unlock(&nni_sock_lk); return (rv); } @@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock) nni_ep * ep; nni_ep * nep; nni_time linger; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&sock->s_mx); + // We now mark any owned contexts as closing. + // XXX: Add context draining support here! + nni_mtx_lock(&nni_sock_lk); + nctx = nni_list_first(&sock->s_ctxs); + while ((ctx = nctx) != NULL) { + nctx = nni_list_next(&sock->s_ctxs, ctx); + ctx->c_closed = true; + if (ctx->c_refcnt == 0) { + // No open operations. So close it. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + nni_ctx_destroy(ctx); + } + // If still has a reference count, then wait for last + // reference to close before nuking it. + } + nni_mtx_unlock(&nni_sock_lk); + + // XXX: Add protocol specific drain here. This should replace the + // msgq_drain feature below. Probably msgq_drain will need to + // be changed to take an AIO for completion. + // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* - // until the linger time has expired. - nni_msgq_drain(sock->s_uwq, linger); + // until the linger time has expired. We only do this for sendable + // sockets that are actually using the message queue of course. + if ((nni_sock_flags(sock) & + (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) == + NNI_PROTO_FLAG_SND) { + nni_msgq_drain(sock->s_uwq, linger); + } // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it // a chance to do so gracefully. + + nni_mtx_lock(&nni_sock_lk); + while (!nni_list_empty(&sock->s_ctxs)) { + sock->s_ctxwait = true; + nni_cv_wait(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + nni_mtx_lock(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { @@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s) // Wait for all other references to drop. Note that we // have a reference already (from our caller). - while (s->s_refcnt > 1) { + s->s_ctxwait = true; + while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) { nni_cv_wait(&s->s_close_cv); } nni_mtx_unlock(&nni_sock_lk); @@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock) { return (sock->s_flags); } + +int +nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_mtx_lock(&nni_sock_lk); + if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) { + // We refuse a reference if either the socket is closed, + // or the context is closed. (If the socket is closed, + // and we are only getting the reference so we can close it, + // then we still allow. In the case the only valid operation + // will be to close the socket.) + if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) { + rv = NNG_ECLOSED; + } else { + ctx->c_refcnt++; + *ctxp = ctx; + } + } + nni_mtx_unlock(&nni_sock_lk); + + if (rv == NNG_ENOENT) { + rv = NNG_ECLOSED; + } + + return (rv); +} + +static void +nni_ctx_destroy(nni_ctx *ctx) +{ + if (ctx->c_data != NULL) { + ctx->c_ops.ctx_fini(ctx->c_data); + } + + // Let the socket go, our hold on it is done. + NNI_FREE_STRUCT(ctx); +} + +void +nni_ctx_rele(nni_ctx *ctx) +{ + nni_sock *sock = ctx->c_sock; + nni_mtx_lock(&nni_sock_lk); + ctx->c_refcnt--; + if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) { + // Either still have an active reference, or not actually + // closing yet. + nni_mtx_unlock(&nni_sock_lk); + return; + } + + // Remove us from the hash, so we can't be found any more. + // This allows our ID to be reused later, although the system + // tries to avoid ID reuse. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + if (sock->s_closed || sock->s_ctxwait) { + nni_cv_wake(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_destroy(ctx); +} + +int +nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) +{ + nni_ctx *ctx; + int rv; + uint64_t id; + + if (sock->s_ctx_ops.ctx_init == NULL) { + return (NNG_ENOTSUP); + } + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_lock(&nni_sock_lk); + if (sock->s_closed) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (NNG_ECLOSED); + } + if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + ctx->c_id = (uint32_t) id; + + if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + + ctx->c_closed = false; + ctx->c_refcnt = 1; // Caller implicitly gets a reference. + ctx->c_sock = sock; + ctx->c_ops = sock->s_ctx_ops; + ctx->c_rcvtimeo = sock->s_rcvtimeo; + ctx->c_sndtimeo = sock->s_sndtimeo; + + nni_list_append(&sock->s_ctxs, ctx); + nni_mtx_unlock(&nni_sock_lk); + + // Paranoia, fixing a possible race in close. Don't let us + // give back a context if the socket is being shutdown (it might + // not have reached the "closed" state yet.) + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + nni_ctx_rele(ctx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&sock->s_mx); + *ctxp = ctx; + + return (0); +} + +void +nni_ctx_close(nni_ctx *ctx) +{ + nni_mtx_lock(&nni_sock_lk); + ctx->c_closed = true; + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_rele(ctx); +} + +uint32_t +nni_ctx_id(nni_ctx *ctx) +{ + return (ctx->c_id); +} + +void +nni_ctx_send(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_sndtimeo); + ctx->c_ops.ctx_send(ctx->c_data, aio); +} + +void +nni_ctx_recv(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo); + ctx->c_ops.ctx_recv(ctx->c_data, aio); +} + +int +nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_getopt == NULL) { + rv = NNG_EWRITEONLY; + break; + } + rv = co->co_getopt(ctx->c_data, v, szp, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} + +int +nni_ctx_setopt( + nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_setopt == NULL) { + rv = NNG_EREADONLY; + break; + } + rv = co->co_setopt(ctx->c_data, v, sz, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} |
