diff options
| author | Garrett D'Amore <garrett@damore.org> | 2021-01-01 11:30:03 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2021-01-01 12:46:17 -0800 |
| commit | ed542ac45e00c9b2faa0b41f3c00de6e291e5678 (patch) | |
| tree | 673924ff077d468e6756529c2c204698d3faa47c /src/sp/transport/inproc | |
| parent | 1413b2421a82cd9b9cde178d44fb60c7893176b0 (diff) | |
| download | nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.gz nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.tar.bz2 nng-ed542ac45e00c9b2faa0b41f3c00de6e291e5678.zip | |
fixes #1345 Restructure the source tree
This is not quite complete, but it sets the stage for other
protocols (such as zmq or mqtt) to be added to the project.
Diffstat (limited to 'src/sp/transport/inproc')
| -rw-r--r-- | src/sp/transport/inproc/CMakeLists.txt | 16 | ||||
| -rw-r--r-- | src/sp/transport/inproc/inproc.c | 692 |
2 files changed, 708 insertions, 0 deletions
diff --git a/src/sp/transport/inproc/CMakeLists.txt b/src/sp/transport/inproc/CMakeLists.txt new file mode 100644 index 00000000..317686bb --- /dev/null +++ b/src/sp/transport/inproc/CMakeLists.txt @@ -0,0 +1,16 @@ +# +# Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +# Copyright 2018 Capitar IT Group BV <info@capitar.com> +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# inproc protocol +nng_directory(inproc) + +nng_sources_if(NNG_TRANSPORT_INPROC inproc.c) +nng_headers_if(NNG_TRANSPORT_INPROC nng/transport/inproc/inproc.h) +nng_defines_if(NNG_TRANSPORT_INPROC NNG_TRANSPORT_INPROC)
\ No newline at end of file diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c new file mode 100644 index 00000000..84e2c625 --- /dev/null +++ b/src/sp/transport/inproc/inproc.c @@ -0,0 +1,692 @@ +// +// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Devolutions <info@devolutions.net> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include <string.h> + +#include "core/nng_impl.h" + +// Inproc transport. This just transports messages from one +// peer to another. The inproc transport is only valid within the same +// process. + +typedef struct inproc_pair inproc_pair; +typedef struct inproc_pipe inproc_pipe; +typedef struct inproc_ep inproc_ep; +typedef struct inproc_queue inproc_queue; + +typedef struct { + nni_mtx mx; + nni_list servers; +} inproc_global; + +// inproc_pipe represents one half of a connection. +struct inproc_pipe { + const char * addr; + inproc_pair * pair; + inproc_queue *recv_queue; + inproc_queue *send_queue; + uint16_t peer; + uint16_t proto; +}; + +struct inproc_queue { + nni_list readers; + nni_list writers; + nni_mtx lock; + bool closed; +}; + +// inproc_pair represents a pair of pipes. Because we control both +// sides of the pipes, we can allocate and free this in one structure. +struct inproc_pair { + nni_atomic_int ref; + inproc_queue queues[2]; +}; + +struct inproc_ep { + const char * addr; + bool listener; + nni_list_node node; + uint16_t proto; + nni_cv cv; + nni_list clients; + nni_list aios; + size_t rcvmax; + nni_mtx mtx; +}; + +// nni_inproc is our global state - this contains the list of active endpoints +// which we use for coordinating rendezvous. +static inproc_global nni_inproc; + +static int +inproc_init(void) +{ + NNI_LIST_INIT(&nni_inproc.servers, inproc_ep, node); + + nni_mtx_init(&nni_inproc.mx); + return (0); +} + +static void +inproc_fini(void) +{ + nni_mtx_fini(&nni_inproc.mx); +} + +// inproc_pair destroy is called when both pipe-ends of the pipe +// have been destroyed. +static void +inproc_pair_destroy(inproc_pair *pair) +{ + for (int i = 0; i < 2; i++) { + nni_mtx_fini(&pair->queues[i].lock); + } + NNI_FREE_STRUCT(pair); +} + +static int +inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) +{ + inproc_pipe *pipe; + + if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { + return (NNG_ENOMEM); + } + + pipe->proto = ep->proto; + pipe->addr = ep->addr; + *pipep = pipe; + return (0); +} + +static int +inproc_pipe_init(void *arg, nni_pipe *p) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(p); + return (0); +} + +static void +inproc_pipe_fini(void *arg) +{ + inproc_pipe *pipe = arg; + inproc_pair *pair; + + if ((pair = pipe->pair) != NULL) { + // If we are the last peer, then toss the pair structure. + if (nni_atomic_dec_nv(&pair->ref) == 0) { + inproc_pair_destroy(pair); + } + } + + NNI_FREE_STRUCT(pipe); +} + +static void +inproc_queue_run_closed(inproc_queue *queue) +{ + nni_aio *aio; + while (((aio = nni_list_first(&queue->readers)) != NULL) || + ((aio = nni_list_first(&queue->writers)) != NULL)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } +} + +static void +inproc_queue_run(inproc_queue *queue) +{ + if (queue->closed) { + inproc_queue_run_closed(queue); + } + for (;;) { + nni_aio *rd; + nni_aio *wr; + nni_msg *msg; + nni_msg *pu; + + if (((rd = nni_list_first(&queue->readers)) == NULL) || + ((wr = nni_list_first(&queue->writers)) == NULL)) { + return; + } + + msg = nni_aio_get_msg(wr); + NNI_ASSERT(msg != NULL); + + // At this point, we pass success back to the caller. If + // we drop the message for any reason, its accounted on the + // receiver side. + nni_aio_list_remove(wr); + nni_aio_set_msg(wr, NULL); + nni_aio_finish( + wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg)); + + // TODO: We could check the max receive size here. + + // Now the receive side. We need to ensure that we have + // an exclusive copy of the message, and pull the header + // up into the body to match protocol expectations. + if ((pu = nni_msg_pull_up(msg)) == NULL) { + nni_msg_free(msg); + continue; + } + msg = pu; + + nni_aio_list_remove(rd); + nni_aio_set_msg(rd, msg); + nni_aio_finish(rd, 0, nni_msg_len(msg)); + } +} + +static void +inproc_queue_cancel(nni_aio *aio, void *arg, int rv) +{ + inproc_queue *queue = arg; + + nni_mtx_lock(&queue->lock); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_send(void *arg, nni_aio *aio) +{ + inproc_pipe * pipe = arg; + inproc_queue *queue = pipe->send_queue; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&queue->lock); + if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + nni_mtx_unlock(&queue->lock); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&queue->writers, aio); + inproc_queue_run(queue); + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_recv(void *arg, nni_aio *aio) +{ + inproc_pipe * pipe = arg; + inproc_queue *queue = pipe->recv_queue; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&queue->lock); + if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + nni_mtx_unlock(&queue->lock); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&queue->readers, aio); + inproc_queue_run(queue); + nni_mtx_unlock(&queue->lock); +} + +static void +inproc_pipe_close(void *arg) +{ + inproc_pipe *pipe = arg; + inproc_pair *pair = pipe->pair; + + for (int i = 0; i < 2; i++) { + inproc_queue *queue = &pair->queues[i]; + nni_mtx_lock(&queue->lock); + queue->closed = true; + inproc_queue_run_closed(queue); + nni_mtx_unlock(&queue->lock); + } +} + +static uint16_t +inproc_pipe_peer(void *arg) +{ + inproc_pipe *pipe = arg; + + return (pipe->peer); +} + +static int +inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) +{ + inproc_pipe *p = arg; + nni_sockaddr sa; + + memset(&sa, 0, sizeof(sa)); + sa.s_inproc.sa_family = NNG_AF_INPROC; + nni_strlcpy(sa.s_inproc.sa_name, p->addr, sizeof(sa.s_inproc.sa_name)); + return (nni_copyout_sockaddr(&sa, buf, szp, t)); +} + +static int +inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) +{ + inproc_ep *ep; + nni_sock * sock = nni_dialer_sock(ndialer); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + + ep->listener = false; + ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; + NNI_LIST_INIT(&ep->clients, inproc_ep, node); + nni_aio_list_init(&ep->aios); + + ep->addr = url->u_rawurl; // we match on the full URL. + + *epp = ep; + return (0); +} + +static int +inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) +{ + inproc_ep *ep; + nni_sock * sock = nni_listener_sock(nlistener); + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&ep->mtx); + + ep->listener = true; + ep->proto = nni_sock_proto_id(sock); + ep->rcvmax = 0; + NNI_LIST_INIT(&ep->clients, inproc_ep, node); + nni_aio_list_init(&ep->aios); + + ep->addr = url->u_rawurl; // we match on the full URL. + + *epp = ep; + return (0); +} + +static void +inproc_ep_fini(void *arg) +{ + inproc_ep *ep = arg; + nni_mtx_fini(&ep->mtx); + NNI_FREE_STRUCT(ep); +} + +static void +inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) +{ + nni_aio_list_remove(aio); + + if ((!ep->listener) && nni_list_empty(&ep->aios)) { + nni_list_node_remove(&ep->node); + } + + if (rv == 0) { + nni_aio_set_output(aio, 0, pipe); + nni_aio_finish(aio, 0, 0); + } else { + NNI_ASSERT(pipe == NULL); + nni_aio_finish_error(aio, rv); + } +} + +static void +inproc_ep_close(void *arg) +{ + inproc_ep *ep = arg; + inproc_ep *client; + nni_aio * aio; + + nni_mtx_lock(&nni_inproc.mx); + if (nni_list_active(&nni_inproc.servers, ep)) { + nni_list_remove(&nni_inproc.servers, ep); + } + // Notify any waiting clients that we are closed. + while ((client = nni_list_first(&ep->clients)) != NULL) { + while ((aio = nni_list_first(&client->aios)) != NULL) { + inproc_conn_finish(aio, NNG_ECONNREFUSED, ep, NULL); + } + nni_list_remove(&ep->clients, client); + } + while ((aio = nni_list_first(&ep->aios)) != NULL) { + inproc_conn_finish(aio, NNG_ECLOSED, ep, NULL); + } + nni_mtx_unlock(&nni_inproc.mx); +} + +static void +inproc_accept_clients(inproc_ep *srv) +{ + inproc_ep *cli, *nclient; + + nclient = nni_list_first(&srv->clients); + while ((cli = nclient) != NULL) { + nni_aio *caio; + nclient = nni_list_next(&srv->clients, nclient); + NNI_LIST_FOREACH (&cli->aios, caio) { + + inproc_pipe *cpipe; + inproc_pipe *spipe; + inproc_pair *pair; + nni_aio * saio; + int rv; + + if ((saio = nni_list_first(&srv->aios)) == NULL) { + // No outstanding accept() calls. + break; + } + + if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { + inproc_conn_finish( + caio, NNG_ENOMEM, cli, NULL); + inproc_conn_finish( + saio, NNG_ENOMEM, srv, NULL); + continue; + } + for (int i = 0; i < 2; i++) { + nni_aio_list_init(&pair->queues[i].readers); + nni_aio_list_init(&pair->queues[i].writers); + nni_mtx_init(&pair->queues[i].lock); + } + nni_atomic_init(&pair->ref); + nni_atomic_set(&pair->ref, 2); + + spipe = cpipe = NULL; + if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) || + ((rv = inproc_pipe_alloc(&spipe, srv)) != 0)) { + + if (cpipe != NULL) { + inproc_pipe_fini(cpipe); + } + if (spipe != NULL) { + inproc_pipe_fini(spipe); + } + inproc_conn_finish(caio, rv, cli, NULL); + inproc_conn_finish(saio, rv, srv, NULL); + inproc_pair_destroy(pair); + continue; + } + + cpipe->peer = spipe->proto; + spipe->peer = cpipe->proto; + cpipe->pair = pair; + spipe->pair = pair; + cpipe->send_queue = &pair->queues[0]; + cpipe->recv_queue = &pair->queues[1]; + spipe->send_queue = &pair->queues[1]; + spipe->recv_queue = &pair->queues[0]; + + inproc_conn_finish(caio, 0, cli, cpipe); + inproc_conn_finish(saio, 0, srv, spipe); + } + + if (nni_list_first(&cli->aios) == NULL) { + // No more outstanding client connects. + // Normally there should only be one. + if (nni_list_active(&srv->clients, cli)) { + nni_list_remove(&srv->clients, cli); + } + } + } +} + +static void +inproc_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + inproc_ep *ep = arg; + + nni_mtx_lock(&nni_inproc.mx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_list_node_remove(&ep->node); + nni_aio_finish_error(aio, rv); + } + nni_mtx_unlock(&nni_inproc.mx); +} + +static void +inproc_ep_connect(void *arg, nni_aio *aio) +{ + inproc_ep *ep = arg; + inproc_ep *server; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&nni_inproc.mx); + + // Find a server. + NNI_LIST_FOREACH (&nni_inproc.servers, server) { + if (strcmp(server->addr, ep->addr) == 0) { + break; + } + } + if (server == NULL) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, NNG_ECONNREFUSED); + return; + } + + // We don't have to worry about the case where a zero timeout + // on connect was specified, as there is no option to specify + // that in the upper API. + if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&server->clients, ep); + nni_aio_list_append(&ep->aios, aio); + + inproc_accept_clients(server); + nni_mtx_unlock(&nni_inproc.mx); +} + +static int +inproc_ep_bind(void *arg) +{ + inproc_ep *ep = arg; + inproc_ep *srch; + nni_list * list = &nni_inproc.servers; + + nni_mtx_lock(&nni_inproc.mx); + NNI_LIST_FOREACH (list, srch) { + if (strcmp(srch->addr, ep->addr) == 0) { + nni_mtx_unlock(&nni_inproc.mx); + return (NNG_EADDRINUSE); + } + } + nni_list_append(list, ep); + nni_mtx_unlock(&nni_inproc.mx); + return (0); +} + +static void +inproc_ep_accept(void *arg, nni_aio *aio) +{ + inproc_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&nni_inproc.mx); + + // We need not worry about the case where a non-blocking + // accept was tried -- there is no API to do such a thing. + if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&nni_inproc.mx); + nni_aio_finish_error(aio, rv); + return; + } + + // We are already on the master list of servers, thanks to bind. + // Insert us into pending server aios, and then run accept list. + nni_aio_list_append(&ep->aios, aio); + inproc_accept_clients(ep); + nni_mtx_unlock(&nni_inproc.mx); +} + +static int +inproc_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + inproc_ep *ep = arg; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + inproc_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { + nni_mtx_lock(&ep->mtx); + ep->rcvmax = val; + nni_mtx_unlock(&ep->mtx); + } + return (rv); +} + +static int +inproc_ep_get_addr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + inproc_ep * ep = arg; + nng_sockaddr sa; + sa.s_inproc.sa_family = NNG_AF_INPROC; + nni_strlcpy( + sa.s_inproc.sa_name, ep->addr, sizeof(sa.s_inproc.sa_name)); + return (nni_copyout_sockaddr(&sa, v, szp, t)); +} + +static const nni_option inproc_pipe_options[] = { + { + .o_name = NNG_OPT_LOCADDR, + .o_get = inproc_pipe_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = inproc_pipe_get_addr, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +inproc_pipe_getopt( + void *arg, const char *name, void *v, size_t *szp, nni_type t) +{ + return (nni_getopt(inproc_pipe_options, name, arg, v, szp, t)); +} + +static nni_tran_pipe_ops inproc_pipe_ops = { + .p_init = inproc_pipe_init, + .p_fini = inproc_pipe_fini, + .p_send = inproc_pipe_send, + .p_recv = inproc_pipe_recv, + .p_close = inproc_pipe_close, + .p_peer = inproc_pipe_peer, + .p_getopt = inproc_pipe_getopt, +}; + +static const nni_option inproc_ep_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = inproc_ep_get_recvmaxsz, + .o_set = inproc_ep_set_recvmaxsz, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = inproc_ep_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = inproc_ep_get_addr, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +inproc_ep_getopt(void *arg, const char *name, void *v, size_t *szp, nni_type t) +{ + return (nni_getopt(inproc_ep_options, name, arg, v, szp, t)); +} + +static int +inproc_ep_setopt( + void *arg, const char *name, const void *v, size_t sz, nni_type t) +{ + return (nni_setopt(inproc_ep_options, name, arg, v, sz, t)); +} + +static nni_tran_dialer_ops inproc_dialer_ops = { + .d_init = inproc_dialer_init, + .d_fini = inproc_ep_fini, + .d_connect = inproc_ep_connect, + .d_close = inproc_ep_close, + .d_getopt = inproc_ep_getopt, + .d_setopt = inproc_ep_setopt, +}; + +static nni_tran_listener_ops inproc_listener_ops = { + .l_init = inproc_listener_init, + .l_fini = inproc_ep_fini, + .l_bind = inproc_ep_bind, + .l_accept = inproc_ep_accept, + .l_close = inproc_ep_close, + .l_getopt = inproc_ep_getopt, + .l_setopt = inproc_ep_setopt, +}; + +// This is the inproc transport linkage, and should be the only global +// symbol in this entire file. +struct nni_tran nni_inproc_tran = { + .tran_version = NNI_TRANSPORT_VERSION, + .tran_scheme = "inproc", + .tran_dialer = &inproc_dialer_ops, + .tran_listener = &inproc_listener_ops, + .tran_pipe = &inproc_pipe_ops, + .tran_init = inproc_init, + .tran_fini = inproc_fini, +}; + +int +nng_inproc_register(void) +{ + return (nni_tran_register(&nni_inproc_tran)); +} |
