// // Copyright 2025 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // Copyright 2018 Devolutions // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // #include #include "core/nng_impl.h" // Inproc transport. This just transports messages from one // peer to another. The inproc transport is only valid within the same // process. typedef struct inproc_pair inproc_pair; typedef struct inproc_pipe inproc_pipe; typedef struct inproc_ep inproc_ep; typedef struct inproc_queue inproc_queue; typedef struct { nni_mtx mx; nni_list servers; } inproc_global; // inproc_pipe represents one half of a connection. struct inproc_pipe { const char *addr; inproc_pair *pair; inproc_queue *recv_queue; inproc_queue *send_queue; uint16_t peer; uint16_t proto; nni_pipe *pipe; nng_sockaddr sa; }; struct inproc_queue { nni_list readers; nni_list writers; nni_mtx lock; bool closed; }; // inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. struct inproc_pair { nni_refcnt ref; inproc_queue queues[2]; }; struct inproc_ep { const char *addr; nni_listener *listener; nni_dialer *dialer; nni_list_node node; uint16_t proto; nni_cv cv; nni_list clients; nni_list aios; size_t rcvmax; nni_mtx mtx; }; // nni_inproc is our global state - this contains the list of active endpoints // which we use for coordinating rendezvous. static inproc_global nni_inproc = { .servers = NNI_LIST_INITIALIZER(nni_inproc.servers, inproc_ep, node), .mx = NNI_MTX_INITIALIZER, }; static void inproc_init(void) { } static void inproc_fini(void) { } // inproc_pair destroy is called when both pipe-ends of the pipe // have been destroyed. static void inproc_pair_destroy(void *arg) { inproc_pair *pair = arg; nni_mtx_fini(&pair->queues[0].lock); nni_mtx_fini(&pair->queues[1].lock); NNI_FREE_STRUCT(pair); } static int inproc_pipe_init(void *arg, nni_pipe *p) { inproc_pipe *pipe = arg; pipe->pipe = p; return (0); } static void inproc_pipe_stop(void *arg) { NNI_ARG_UNUSED(arg); } static void inproc_pipe_fini(void *arg) { inproc_pipe *pipe = arg; inproc_pair *pair; if ((pair = pipe->pair) != NULL) { // If we are the last peer, then toss the pair structure. nni_refcnt_rele(&pair->ref); } } static void inproc_queue_run_closed(inproc_queue *queue) { nni_aio *aio; while (((aio = nni_list_first(&queue->readers)) != NULL) || ((aio = nni_list_first(&queue->writers)) != NULL)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, NNG_ECLOSED); } } static void inproc_queue_run(inproc_queue *queue) { if (queue->closed) { inproc_queue_run_closed(queue); } for (;;) { nni_aio *rd; nni_aio *wr; nni_msg *msg; nni_msg *pu; if (((rd = nni_list_first(&queue->readers)) == NULL) || ((wr = nni_list_first(&queue->writers)) == NULL)) { return; } msg = nni_aio_get_msg(wr); NNI_ASSERT(msg != NULL); // At this point, we pass success back to the caller. If // we drop the message for any reason, its accounted on the // receiver side. nni_aio_list_remove(wr); nni_aio_set_msg(wr, NULL); nni_aio_finish( wr, 0, nni_msg_len(msg) + nni_msg_header_len(msg)); // TODO: We could check the max receive size here. // Now the receive side. We need to ensure that we have // an exclusive copy of the message, and pull the header // up into the body to match protocol expectations. if ((pu = nni_msg_pull_up(msg)) == NULL) { nni_msg_free(msg); continue; } msg = pu; nni_aio_list_remove(rd); nni_aio_set_msg(rd, msg); nni_aio_finish(rd, 0, nni_msg_len(msg)); } } static void inproc_queue_cancel(nni_aio *aio, void *arg, nng_err rv) { inproc_queue *queue = arg; nni_mtx_lock(&queue->lock); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&queue->lock); } static void inproc_pipe_send(void *arg, nni_aio *aio) { inproc_pipe *pipe = arg; inproc_queue *queue = pipe->send_queue; nni_aio_reset(aio); nni_mtx_lock(&queue->lock); if (!nni_aio_start(aio, inproc_queue_cancel, queue)) { nni_mtx_unlock(&queue->lock); return; } nni_aio_list_append(&queue->writers, aio); inproc_queue_run(queue); nni_mtx_unlock(&queue->lock); } static void inproc_pipe_recv(void *arg, nni_aio *aio) { inproc_pipe *pipe = arg; inproc_queue *queue = pipe->recv_queue; nni_aio_reset(aio); nni_mtx_lock(&queue->lock); if (!nni_aio_start(aio, inproc_queue_cancel, queue)) { nni_mtx_unlock(&queue->lock); return; } nni_aio_list_append(&queue->readers, aio); inproc_queue_run(queue); nni_mtx_unlock(&queue->lock); } static void inproc_pipe_close(void *arg) { inproc_pipe *pipe = arg; inproc_pair *pair = pipe->pair; for (int i = 0; i < 2; i++) { inproc_queue *queue = &pair->queues[i]; nni_mtx_lock(&queue->lock); queue->closed = true; inproc_queue_run_closed(queue); nni_mtx_unlock(&queue->lock); } } static uint16_t inproc_pipe_peer(void *arg) { inproc_pipe *pipe = arg; return (pipe->peer); } static void inproc_ep_init(inproc_ep *ep, nni_sock *sock, const nng_url *url) { nni_mtx_init(&ep->mtx); ep->proto = nni_sock_proto_id(sock); ep->rcvmax = 0; NNI_LIST_INIT(&ep->clients, inproc_ep, node); nni_aio_list_init(&ep->aios); ep->addr = url->u_path; // we match on the URL path. } static nng_err inproc_dialer_init(void *arg, nng_url *url, nni_dialer *ndialer) { inproc_ep *ep = arg; ep->dialer = ndialer; inproc_ep_init(ep, nni_dialer_sock(ndialer), url); return (NNG_OK); } static nng_err inproc_listener_init(void *arg, nng_url *url, nni_listener *nlistener) { inproc_ep *ep = arg; ep->listener = nlistener; inproc_ep_init(ep, nni_listener_sock(nlistener), url); return (NNG_OK); } static void inproc_ep_stop(void *arg) { NNI_ARG_UNUSED(arg); } static void inproc_ep_fini(void *arg) { inproc_ep *ep = arg; nni_mtx_fini(&ep->mtx); } static void inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) { nni_aio_list_remove(aio); if ((ep->listener == NULL) && nni_list_empty(&ep->aios)) { nni_list_node_remove(&ep->node); } if (rv == 0) { nni_aio_set_output(aio, 0, pipe->pipe); nni_aio_finish(aio, 0, 0); } else { NNI_ASSERT(pipe == NULL); nni_aio_finish_error(aio, rv); } } static void inproc_ep_close(void *arg) { inproc_ep *ep = arg; inproc_ep *client; nni_aio *aio; nni_mtx_lock(&nni_inproc.mx); if (nni_list_active(&nni_inproc.servers, ep)) { nni_list_remove(&nni_inproc.servers, ep); } // Notify any waiting clients that we are closed. while ((client = nni_list_first(&ep->clients)) != NULL) { while ((aio = nni_list_first(&client->aios)) != NULL) { inproc_conn_finish(aio, NNG_ECONNREFUSED, ep, NULL); } nni_list_remove(&ep->clients, client); } while ((aio = nni_list_first(&ep->aios)) != NULL) { inproc_conn_finish(aio, NNG_ECLOSED, ep, NULL); } nni_mtx_unlock(&nni_inproc.mx); } static void inproc_accept_clients(inproc_ep *srv) { inproc_ep *cli, *nclient; nclient = nni_list_first(&srv->clients); while ((cli = nclient) != NULL) { nni_aio *caio; nclient = nni_list_next(&srv->clients, nclient); NNI_LIST_FOREACH (&cli->aios, caio) { inproc_pipe *cpipe; inproc_pipe *spipe; inproc_pair *pair; nni_aio *saio; int rv; if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. break; } if ((pair = NNI_ALLOC_STRUCT(pair)) == NULL) { inproc_conn_finish( caio, NNG_ENOMEM, cli, NULL); inproc_conn_finish( saio, NNG_ENOMEM, srv, NULL); continue; } for (int i = 0; i < 2; i++) { nni_aio_list_init(&pair->queues[i].readers); nni_aio_list_init(&pair->queues[i].writers); nni_mtx_init(&pair->queues[i].lock); } nni_refcnt_init( &pair->ref, 2, pair, inproc_pair_destroy); spipe = cpipe = NULL; if (((rv = nni_pipe_alloc_dialer( (void **) &cpipe, cli->dialer)) != 0) || ((rv = nni_pipe_alloc_listener( (void **) &spipe, srv->listener)) != 0)) { if (cpipe != NULL) { nni_pipe_close(cpipe->pipe); nni_pipe_rele(cpipe->pipe); } else { nni_refcnt_rele(&pair->ref); } if (spipe != NULL) { nni_pipe_close(spipe->pipe); nni_pipe_rele(spipe->pipe); } else { nni_refcnt_rele(&pair->ref); } inproc_conn_finish(caio, rv, cli, NULL); inproc_conn_finish(saio, rv, srv, NULL); continue; } cpipe->proto = cli->proto; cpipe->addr = cli->addr; spipe->proto = srv->proto; spipe->addr = srv->addr; cpipe->peer = spipe->proto; spipe->peer = cpipe->proto; cpipe->pair = pair; spipe->pair = pair; cpipe->send_queue = &pair->queues[0]; cpipe->recv_queue = &pair->queues[1]; spipe->send_queue = &pair->queues[1]; spipe->recv_queue = &pair->queues[0]; cpipe->sa.s_inproc.sa_family = NNG_AF_INPROC; nni_strlcpy(cpipe->sa.s_inproc.sa_name, cpipe->addr, sizeof(cpipe->sa.s_inproc.sa_name)); spipe->sa.s_inproc.sa_family = NNG_AF_INPROC; nni_strlcpy(spipe->sa.s_inproc.sa_name, spipe->addr, sizeof(spipe->sa.s_inproc.sa_name)); inproc_conn_finish(caio, 0, cli, cpipe); inproc_conn_finish(saio, 0, srv, spipe); } if (nni_list_first(&cli->aios) == NULL) { // No more outstanding client connects. // Normally there should only be one. if (nni_list_active(&srv->clients, cli)) { nni_list_remove(&srv->clients, cli); } } } } static void inproc_ep_cancel(nni_aio *aio, void *arg, nng_err rv) { inproc_ep *ep = arg; nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { nni_aio_list_remove(aio); nni_list_node_remove(&ep->node); nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&nni_inproc.mx); } static void inproc_ep_connect(void *arg, nni_aio *aio) { inproc_ep *ep = arg; inproc_ep *server; nni_aio_reset(aio); nni_mtx_lock(&nni_inproc.mx); // Find a server. NNI_LIST_FOREACH (&nni_inproc.servers, server) { if (strcmp(server->addr, ep->addr) == 0) { break; } } if (server == NULL) { nni_mtx_unlock(&nni_inproc.mx); nni_aio_finish_error(aio, NNG_ECONNREFUSED); return; } // We don't have to worry about the case where a zero timeout // on connect was specified, as there is no option to specify // that in the upper API. if (!nni_aio_start(aio, inproc_ep_cancel, ep)) { nni_mtx_unlock(&nni_inproc.mx); return; } nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); inproc_accept_clients(server); nni_mtx_unlock(&nni_inproc.mx); } static nng_err inproc_ep_bind(void *arg, nng_url *url) { inproc_ep *ep = arg; inproc_ep *srch; nni_list *list = &nni_inproc.servers; NNI_ARG_UNUSED(url); nni_mtx_lock(&nni_inproc.mx); NNI_LIST_FOREACH (list, srch) { if (strcmp(srch->addr, ep->addr) == 0) { nni_mtx_unlock(&nni_inproc.mx); return (NNG_EADDRINUSE); } } nni_list_append(list, ep); nni_mtx_unlock(&nni_inproc.mx); return (NNG_OK); } static void inproc_ep_accept(void *arg, nni_aio *aio) { inproc_ep *ep = arg; nni_aio_reset(aio); nni_mtx_lock(&nni_inproc.mx); // We need not worry about the case where a non-blocking // accept was tried -- there is no API to do such a thing. if (!nni_aio_start(aio, inproc_ep_cancel, ep)) { nni_mtx_unlock(&nni_inproc.mx); return; } // We are already on the master list of servers, thanks to bind. // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); } static nng_err inproc_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { inproc_ep *ep = arg; nng_err rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_size(ep->rcvmax, v, szp, t); nni_mtx_unlock(&ep->mtx); return (rv); } static nng_err inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) { inproc_ep *ep = arg; size_t val; nng_err rv; if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == NNG_OK) { nni_mtx_lock(&ep->mtx); ep->rcvmax = val; nni_mtx_unlock(&ep->mtx); } return (rv); } static const nni_option inproc_pipe_options[] = { // terminate list { .o_name = NULL, }, }; static nng_err inproc_pipe_getopt( void *arg, const char *name, void *v, size_t *szp, nni_type t) { return (nni_getopt(inproc_pipe_options, name, arg, v, szp, t)); } static const nng_sockaddr * inproc_pipe_addr(void *arg) { inproc_pipe *p = arg; return (&p->sa); } static size_t inproc_pipe_size(void) { return (sizeof(inproc_pipe)); } static nni_sp_pipe_ops inproc_pipe_ops = { .p_size = inproc_pipe_size, .p_init = inproc_pipe_init, .p_fini = inproc_pipe_fini, .p_send = inproc_pipe_send, .p_recv = inproc_pipe_recv, .p_close = inproc_pipe_close, .p_stop = inproc_pipe_stop, .p_peer = inproc_pipe_peer, .p_getopt = inproc_pipe_getopt, .p_peer_addr = inproc_pipe_addr, .p_self_addr = inproc_pipe_addr, }; static const nni_option inproc_ep_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_get = inproc_ep_get_recvmaxsz, .o_set = inproc_ep_set_recvmaxsz, }, // terminate list { .o_name = NULL, }, }; static nng_err inproc_ep_getopt(void *arg, const char *name, void *v, size_t *szp, nni_type t) { return (nni_getopt(inproc_ep_options, name, arg, v, szp, t)); } static nng_err inproc_ep_setopt( void *arg, const char *name, const void *v, size_t sz, nni_type t) { return (nni_setopt(inproc_ep_options, name, arg, v, sz, t)); } static nni_sp_dialer_ops inproc_dialer_ops = { .d_size = sizeof(inproc_ep), .d_init = inproc_dialer_init, .d_fini = inproc_ep_fini, .d_connect = inproc_ep_connect, .d_close = inproc_ep_close, .d_stop = inproc_ep_stop, .d_getopt = inproc_ep_getopt, .d_setopt = inproc_ep_setopt, }; static nni_sp_listener_ops inproc_listener_ops = { .l_size = sizeof(inproc_ep), .l_init = inproc_listener_init, .l_fini = inproc_ep_fini, .l_bind = inproc_ep_bind, .l_accept = inproc_ep_accept, .l_close = inproc_ep_close, .l_stop = inproc_ep_stop, .l_getopt = inproc_ep_getopt, .l_setopt = inproc_ep_setopt, }; // This is the inproc transport linkage, and should be the only global // symbol in this entire file. struct nni_sp_tran nni_inproc_tran = { .tran_scheme = "inproc", .tran_dialer = &inproc_dialer_ops, .tran_listener = &inproc_listener_ops, .tran_pipe = &inproc_pipe_ops, .tran_init = inproc_init, .tran_fini = inproc_fini, }; void nni_sp_inproc_register(void) { nni_sp_tran_register(&nni_inproc_tran); }