From ed542ac45e00c9b2faa0b41f3c00de6e291e5678 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 1 Jan 2021 11:30:03 -0800 Subject: fixes #1345 Restructure the source tree This is not quite complete, but it sets the stage for other protocols (such as zmq or mqtt) to be added to the project. --- src/sp/transport/inproc/inproc.c | 692 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 692 insertions(+) create mode 100644 src/sp/transport/inproc/inproc.c (limited to 'src/sp/transport/inproc/inproc.c') diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c new file mode 100644 index 00000000..84e2c625 --- /dev/null +++ b/src/sp/transport/inproc/inproc.c @@ -0,0 +1,692 @@ +// +// Copyright 2020 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2018 Devolutions +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include + +#include "core/nng_impl.h" + +// Inproc transport. This just transports messages from one +// peer to another. The inproc transport is only valid within the same +// process. + +typedef struct inproc_pair inproc_pair; +typedef struct inproc_pipe inproc_pipe; +typedef struct inproc_ep inproc_ep; +typedef struct inproc_queue inproc_queue; + +typedef struct { + nni_mtx mx; + nni_list servers; +} inproc_global; + +// inproc_pipe represents one half of a connection. +struct inproc_pipe { + const char * addr; + inproc_pair * pair; + inproc_queue *recv_queue; + inproc_queue *send_queue; + uint16_t peer; + uint16_t proto; +}; + +struct inproc_queue { + nni_list readers; + nni_list writers; + nni_mtx lock; + bool closed; +}; + +// inproc_pair represents a pair of pipes. Because we control both +// sides of the pipes, we can allocate and free this in one structure. +struct inproc_pair { + nni_atomic_int ref; + inproc_queue queues[2]; +}; + +struct inproc_ep { + const char * addr; + bool listener; + nni_list_node node; + uint16_t proto; + nni_cv cv; + nni_list clients; + nni_list aios; + size_t rcvmax; + nni_mtx mtx; +}; + +// nni_inproc is our global state - this contains the list of active endpoints +// which we use for coordinating rendezvous. +static inproc_global nni_inproc; + +static int +inproc_init(void) +{ + NNI_LIST_INIT(&nni_inproc.servers, inproc_ep, node); + + nni_mtx_init(&nni_inproc.mx); + return (0); +} + +static void +inproc_fini(void) +{ + nni_mtx_fini(&nni_inproc.mx); +} + +// inproc_pair destroy is called when both pipe-ends of the pipe +// have been destroyed. +static void +inproc_pair_destroy(inproc_pair *pair) +{ + for (int i = 0; i < 2; i++) { + nni_mtx_fini(&pair->queues[i].lock); + } + NNI_FREE_STRUCT(pair); +} + +static int +inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) +{ + inproc_pipe *pipe; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + + pipe->proto = ep->proto; + pipe->addr = ep->addr; + *pipep = pipe; + return (0); +} + +static int +inproc_pipe_init(void *arg, nni_pipe *p) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(p); + return (0); +} + +static void +inproc_pipe_fini(void *arg) +{ + inproc_pipe *pipe = arg; + inproc_pair *pair; + + if ((pair = pipe->pair) != NULL) { + // If we are the last peer, then toss the pair structure. + if (nni_atomic_dec_nv(&pair->ref) == 0) { + inproc_pair_destroy(pair); + } + } + + NNI_FREE_STRUCT(pipe); +} + +static void +inproc_queue_run_closed(inproc_queue *queue) +{ + nni_aio *aio; + while (((aio = nni_list_first(&queue->readers)) != NULL) || + ((aio = nni_list_first(&queue->writers)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } +} + +static void +inproc_queue_run(inproc_queue *queue) +{ + if (queue->closed) { + inproc_queue_run_closed(queue); + } + for (;;) { + nni_aio *rd; + nni_aio *wr; + nni_msg *msg; + nni_msg *pu; + + if (((rd = nni_list_first(&queue->readers)) == NULL) || + ((wr = nni_list_first(&queue->writers)) == NULL)) { + return; + } + + msg = nni_aio_get_msg(wr); + NNI_ASSERT(msg != NULL); + + // At this point, we pass success back to the caller. If + // we drop the message for any reason, its accounted on the + // receiver side. + nni_aio_list_remove(wr); + nni_aio_set_msg(wr, NULL); + nni_aio_finish( + wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg)); + + // TODO: We could check the max receive size here. + + // Now the receive side. We need to ensure that we have + // an exclusive copy of the message, and pull the header + // up into the body to match protocol expectations. + if ((pu = nni_msg_pull_up(msg)) == NULL) { + nni_msg_free(msg); + continue; + } + msg = pu; + + nni_aio_list_remove(rd); + nni_aio_set_msg(rd, msg); + nni_aio_finish(rd, 0, nni_msg_len(msg)); + } +} + +static void +inproc_queue_cancel(nni_aio *aio, void *arg, int rv) +{ + inproc_queue *queue = arg; + + nni_mtx_lock(&queue->lock); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_send(void *arg, nni_aio *aio) +{ + inproc_pipe * pipe = arg; + inproc_queue *queue = pipe->send_queue; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&queue->lock); + if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + nni_mtx_unlock(&queue->lock); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&queue->writers, aio); + inproc_queue_run(queue); + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_recv(void *arg, nni_aio *aio) +{ + inproc_pipe * pipe = arg; + inproc_queue *queue = pipe->recv_queue; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&queue->lock); + if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + nni_mtx_unlock(&queue->lock); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&queue->readers, aio); + inproc_queue_run(queue); + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_close(void *arg) +{ + inproc_pipe *pipe = arg; + inproc_pair *pair = pipe->pair; + + for (int i = 0; i < 2; i++) { + inproc_queue *queue = &pair->queues[i]; + nni_mtx_lock(&queue->lock); + queue->closed = true; + inproc_queue_run_closed(queue); + nni_mtx_unlock(&queue->lock); + } +} + +static uint16_t +inproc_pipe_peer(void *arg) +{ + inproc_pipe *pipe = arg; + + return (pipe->peer); +} + +static int +inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + inproc_pipe *p = arg; + nni_sockaddr sa; + + memset(&sa, 0, sizeof(sa)); + sa.s_inproc.sa_family = NNG_AF_INPROC; + nni_strlcpy(sa.s_inproc.sa_name, p->addr, sizeof(sa.s_inproc.sa_name)); + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static int +inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) +{ + inproc_ep *ep; + nni_sock * sock = nni_dialer_sock(ndialer); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + + ep->listener = false; + ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; + NNI_LIST_INIT(&ep->clients, inproc_ep, node); + nni_aio_list_init(&ep->aios); + + ep->addr = url->u_rawurl; // we match on the full URL. + + *epp = ep; + return (0); +} + +static int +inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) +{ + inproc_ep *ep; + nni_sock * sock = nni_listener_sock(nlistener); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + + ep->listener = true; + ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; + NNI_LIST_INIT(&ep->clients, inproc_ep, node); + nni_aio_list_init(&ep->aios); + + ep->addr = url->u_rawurl; // we match on the full URL. + + *epp = ep; + return (0); +} + +static void +inproc_ep_fini(void *arg) +{ + inproc_ep *ep = arg; + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + +static void +inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) +{ + nni_aio_list_remove(aio); + + if ((!ep->listener) && nni_list_empty(&ep->aios)) { + nni_list_node_remove(&ep->node); + } + + if (rv == 0) { + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); + } else { + NNI_ASSERT(pipe == NULL); + nni_aio_finish_error(aio, rv); + } +} + +static void +inproc_ep_close(void *arg) +{ + inproc_ep *ep = arg; + inproc_ep *client; + nni_aio * aio; + + nni_mtx_lock(&nni_inproc.mx); + if (nni_list_active(&nni_inproc.servers, ep)) { + nni_list_remove(&nni_inproc.servers, ep); + } + // Notify any waiting clients that we are closed. + while ((client = nni_list_first(&ep->clients)) != NULL) { + while ((aio = nni_list_first(&client->aios)) != NULL) { + inproc_conn_finish(aio, NNG_ECONNREFUSED, ep, NULL); + } + nni_list_remove(&ep->clients, client); + } + while ((aio = nni_list_first(&ep->aios)) != NULL) { + inproc_conn_finish(aio, NNG_ECLOSED, ep, NULL); + } + nni_mtx_unlock(&nni_inproc.mx); +} + +static void +inproc_accept_clients(inproc_ep *srv) +{ + inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { + + inproc_pipe *cpipe; + inproc_pipe *spipe; + inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { + // No outstanding accept() calls. + break; + } + + if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { + inproc_conn_finish( + caio, NNG_ENOMEM, cli, NULL); + inproc_conn_finish( + saio, NNG_ENOMEM, srv, NULL); + continue; + } + for (int i = 0; i < 2; i++) { + nni_aio_list_init(&pair->queues[i].readers); + nni_aio_list_init(&pair->queues[i].writers); + nni_mtx_init(&pair->queues[i].lock); + } + nni_atomic_init(&pair->ref); + nni_atomic_set(&pair->ref, 2); + + spipe = cpipe = NULL; + if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) || + ((rv = inproc_pipe_alloc(&spipe, srv)) != 0)) { + + if (cpipe != NULL) { + inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + inproc_pipe_fini(spipe); + } + inproc_conn_finish(caio, rv, cli, NULL); + inproc_conn_finish(saio, rv, srv, NULL); + inproc_pair_destroy(pair); + continue; + } + + cpipe->peer = spipe->proto; + spipe->peer = cpipe->proto; + cpipe->pair = pair; + spipe->pair = pair; + cpipe->send_queue = &pair->queues[0]; + cpipe->recv_queue = &pair->queues[1]; + spipe->send_queue = &pair->queues[1]; + spipe->recv_queue = &pair->queues[0]; + + inproc_conn_finish(caio, 0, cli, cpipe); + inproc_conn_finish(saio, 0, srv, spipe); + } + + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); + } + } + } +} + +static void +inproc_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + inproc_ep *ep = arg; + + nni_mtx_lock(&nni_inproc.mx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_list_node_remove(&ep->node); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&nni_inproc.mx); +} + +static void +inproc_ep_connect(void *arg, nni_aio *aio) +{ + inproc_ep *ep = arg; + inproc_ep *server; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&nni_inproc.mx); + + // Find a server. + NNI_LIST_FOREACH (&nni_inproc.servers, server) { + if (strcmp(server->addr, ep->addr) == 0) { + break; + } + } + if (server == NULL) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, NNG_ECONNREFUSED); + return; + } + + // We don't have to worry about the case where a zero timeout + // on connect was specified, as there is no option to specify + // that in the upper API. + if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&server->clients, ep); + nni_aio_list_append(&ep->aios, aio); + + inproc_accept_clients(server); + nni_mtx_unlock(&nni_inproc.mx); +} + +static int +inproc_ep_bind(void *arg) +{ + inproc_ep *ep = arg; + inproc_ep *srch; + nni_list * list = &nni_inproc.servers; + + nni_mtx_lock(&nni_inproc.mx); + NNI_LIST_FOREACH (list, srch) { + if (strcmp(srch->addr, ep->addr) == 0) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_EADDRINUSE); + } + } + nni_list_append(list, ep); + nni_mtx_unlock(&nni_inproc.mx); + return (0); +} + +static void +inproc_ep_accept(void *arg, nni_aio *aio) +{ + inproc_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&nni_inproc.mx); + + // We need not worry about the case where a non-blocking + // accept was tried -- there is no API to do such a thing. + if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } + + // We are already on the master list of servers, thanks to bind. + // Insert us into pending server aios, and then run accept list. + nni_aio_list_append(&ep->aios, aio); + inproc_accept_clients(ep); + nni_mtx_unlock(&nni_inproc.mx); +} + +static int +inproc_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + inproc_ep *ep = arg; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + inproc_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + +static int +inproc_ep_get_addr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + inproc_ep * ep = arg; + nng_sockaddr sa; + sa.s_inproc.sa_family = NNG_AF_INPROC; + nni_strlcpy( + sa.s_inproc.sa_name, ep->addr, sizeof(sa.s_inproc.sa_name)); + return (nni_copyout_sockaddr(&sa, v, szp, t)); +} + +static const nni_option inproc_pipe_options[] = { + { + .o_name = NNG_OPT_LOCADDR, + .o_get = inproc_pipe_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = inproc_pipe_get_addr, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +inproc_pipe_getopt( + void *arg, const char *name, void *v, size_t *szp, nni_type t) +{ + return (nni_getopt(inproc_pipe_options, name, arg, v, szp, t)); +} + +static nni_tran_pipe_ops inproc_pipe_ops = { + .p_init = inproc_pipe_init, + .p_fini = inproc_pipe_fini, + .p_send = inproc_pipe_send, + .p_recv = inproc_pipe_recv, + .p_close = inproc_pipe_close, + .p_peer = inproc_pipe_peer, + .p_getopt = inproc_pipe_getopt, +}; + +static const nni_option inproc_ep_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = inproc_ep_get_recvmaxsz, + .o_set = inproc_ep_set_recvmaxsz, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = inproc_ep_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = inproc_ep_get_addr, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +inproc_ep_getopt(void *arg, const char *name, void *v, size_t *szp, nni_type t) +{ + return (nni_getopt(inproc_ep_options, name, arg, v, szp, t)); +} + +static int +inproc_ep_setopt( + void *arg, const char *name, const void *v, size_t sz, nni_type t) +{ + return (nni_setopt(inproc_ep_options, name, arg, v, sz, t)); +} + +static nni_tran_dialer_ops inproc_dialer_ops = { + .d_init = inproc_dialer_init, + .d_fini = inproc_ep_fini, + .d_connect = inproc_ep_connect, + .d_close = inproc_ep_close, + .d_getopt = inproc_ep_getopt, + .d_setopt = inproc_ep_setopt, +}; + +static nni_tran_listener_ops inproc_listener_ops = { + .l_init = inproc_listener_init, + .l_fini = inproc_ep_fini, + .l_bind = inproc_ep_bind, + .l_accept = inproc_ep_accept, + .l_close = inproc_ep_close, + .l_getopt = inproc_ep_getopt, + .l_setopt = inproc_ep_setopt, +}; + +// This is the inproc transport linkage, and should be the only global +// symbol in this entire file. +struct nni_tran nni_inproc_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "inproc", + .tran_dialer = &inproc_dialer_ops, + .tran_listener = &inproc_listener_ops, + .tran_pipe = &inproc_pipe_ops, + .tran_init = inproc_init, + .tran_fini = inproc_fini, +}; + +int +nng_inproc_register(void) +{ + return (nni_tran_register(&nni_inproc_tran)); +} -- cgit v1.2.3-70-g09d2