diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-12-29 20:13:11 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-12-31 14:23:34 -0800 |
| commit | e0fff1f9c45f5486fc2e7eeb49b4462c3bb2dad4 (patch) | |
| tree | fe6914c8b06e87880e72cb27486971bf7fc056ff /src/transport/inproc | |
| parent | 97118a423a48ecea816f37cb86378c5bafba2d51 (diff) | |
| download | nng-e0fff1f9c45f5486fc2e7eeb49b4462c3bb2dad4.tar.gz nng-e0fff1f9c45f5486fc2e7eeb49b4462c3bb2dad4.tar.bz2 nng-e0fff1f9c45f5486fc2e7eeb49b4462c3bb2dad4.zip | |
fixes #835 inproc should use new option API
Some more changes to use nni_type instead of nni_opt_type
are included as well.
Diffstat (limited to 'src/transport/inproc')
| -rw-r--r-- | src/transport/inproc/inproc.c | 327 |
1 files changed, 184 insertions, 143 deletions
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index 798a946f..40ab9107 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -1,6 +1,7 @@ // // Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -18,47 +19,47 @@ // peer to another. The inproc transport is only valid within the same // process. -typedef struct nni_inproc_pair nni_inproc_pair; -typedef struct nni_inproc_pipe nni_inproc_pipe; -typedef struct nni_inproc_ep nni_inproc_ep; +typedef struct inproc_pair inproc_pair; +typedef struct inproc_pipe inproc_pipe; +typedef struct inproc_ep inproc_ep; typedef struct { nni_mtx mx; nni_list servers; -} nni_inproc_global; - -// nni_inproc_pipe represents one half of a connection. -struct nni_inproc_pipe { - const char * addr; - nni_inproc_pair *pair; - nni_msgq * rq; - nni_msgq * wq; - nni_pipe * npipe; - uint16_t peer; - uint16_t proto; - size_t rcvmax; - nni_stat_item st_rxbytes; - nni_stat_item st_txbytes; - nni_stat_item st_rxmsgs; - nni_stat_item st_txmsgs; - nni_stat_item st_rxdiscards; - nni_stat_item st_txdiscards; - nni_stat_item st_rxerrs; - nni_stat_item st_txerrs; - nni_stat_item st_rxoversize; - nni_stat_item st_rcvmaxsz; +} inproc_global; + +// inproc_pipe represents one half of a connection. +struct inproc_pipe { + const char * addr; + inproc_pair * pair; + nni_msgq * rq; + nni_msgq * wq; + nni_pipe * npipe; + uint16_t peer; + uint16_t proto; + size_t rcvmax; + nni_stat_item st_rxbytes; + nni_stat_item st_txbytes; + nni_stat_item st_rxmsgs; + nni_stat_item st_txmsgs; + nni_stat_item st_rxdiscards; + nni_stat_item st_txdiscards; + nni_stat_item st_rxerrs; + nni_stat_item st_txerrs; + nni_stat_item st_rxoversize; + nni_stat_item st_rcvmaxsz; }; -// nni_inproc_pair represents a pair of pipes. Because we control both +// inproc_pair represents a pair of pipes. Because we control both // sides of the pipes, we can allocate and free this in one structure. -struct nni_inproc_pair { - nni_mtx mx; - int refcnt; - nni_msgq * q[2]; - nni_inproc_pipe *pipes[2]; +struct inproc_pair { + nni_mtx mx; + int refcnt; + nni_msgq * q[2]; + inproc_pipe *pipes[2]; }; -struct nni_inproc_ep { +struct inproc_ep { const char * addr; bool listener; nni_list_node node; @@ -75,27 +76,27 @@ struct nni_inproc_ep { // nni_inproc is our global state - this contains the list of active endpoints // which we use for coordinating rendezvous. -static nni_inproc_global nni_inproc; +static inproc_global nni_inproc; static int -nni_inproc_init(void) +inproc_init(void) { - NNI_LIST_INIT(&nni_inproc.servers, nni_inproc_ep, node); + NNI_LIST_INIT(&nni_inproc.servers, inproc_ep, node); nni_mtx_init(&nni_inproc.mx); return (0); } static void -nni_inproc_fini(void) +inproc_fini(void) { nni_mtx_fini(&nni_inproc.mx); } static void -nni_inproc_pipe_close(void *arg) +inproc_pipe_close(void *arg) { - nni_inproc_pipe *pipe = arg; + inproc_pipe *pipe = arg; if (pipe->rq != NULL) { nni_msgq_close(pipe->rq); @@ -105,10 +106,10 @@ nni_inproc_pipe_close(void *arg) } } -// nni_inproc_pair destroy is called when both pipe-ends of the pipe +// inproc_pair destroy is called when both pipe-ends of the pipe // have been destroyed. static void -nni_inproc_pair_destroy(nni_inproc_pair *pair) +inproc_pair_destroy(inproc_pair *pair) { nni_msgq_fini(pair->q[0]); nni_msgq_fini(pair->q[1]); @@ -117,9 +118,9 @@ nni_inproc_pair_destroy(nni_inproc_pair *pair) } static int -nni_inproc_pipe_alloc(nni_inproc_pipe **pipep, nni_inproc_ep *ep) +inproc_pipe_alloc(inproc_pipe **pipep, inproc_ep *ep) { - nni_inproc_pipe *pipe; + inproc_pipe *pipe; if ((pipe = NNI_ALLOC_STRUCT(pipe)) == NULL) { return (NNG_ENOMEM); @@ -189,10 +190,10 @@ inproc_get_rxerrs(nni_stat_item *st, void *arg) #endif static int -nni_inproc_pipe_init(void *arg, nni_pipe *p) +inproc_pipe_init(void *arg, nni_pipe *p) { - nni_inproc_pipe *pipe = arg; - pipe->npipe = p; + inproc_pipe *pipe = arg; + pipe->npipe = p; nni_stat_init(&pipe->st_rxbytes, "rxbytes", "bytes received (raw)"); nni_stat_set_update(&pipe->st_rxbytes, inproc_get_rxbytes, pipe->rq); @@ -252,10 +253,10 @@ nni_inproc_pipe_init(void *arg, nni_pipe *p) } static void -nni_inproc_pipe_fini(void *arg) +inproc_pipe_fini(void *arg) { - nni_inproc_pipe *pipe = arg; - nni_inproc_pair *pair; + inproc_pipe *pipe = arg; + inproc_pair *pair; if ((pair = pipe->pair) != NULL) { // If we are the last peer, then toss the pair structure. @@ -268,7 +269,7 @@ nni_inproc_pipe_fini(void *arg) pair->refcnt--; if (pair->refcnt == 0) { nni_mtx_unlock(&pair->mx); - nni_inproc_pair_destroy(pair); + inproc_pair_destroy(pair); } else { nni_mtx_unlock(&pair->mx); } @@ -278,13 +279,13 @@ nni_inproc_pipe_fini(void *arg) } static void -nni_inproc_pipe_send(void *arg, nni_aio *aio) +inproc_pipe_send(void *arg, nni_aio *aio) { - nni_inproc_pipe *pipe = arg; - nni_msg * msg = nni_aio_get_msg(aio); - char * h; - size_t l; - int rv; + inproc_pipe *pipe = arg; + nni_msg * msg = nni_aio_get_msg(aio); + char * h; + size_t l; + int rv; // We need to move any header data to the body, because the other // side won't know what to do otherwise. @@ -299,26 +300,26 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio) } static void -nni_inproc_pipe_recv(void *arg, nni_aio *aio) +inproc_pipe_recv(void *arg, nni_aio *aio) { - nni_inproc_pipe *pipe = arg; + inproc_pipe *pipe = arg; nni_msgq_aio_get(pipe->rq, aio); } static uint16_t -nni_inproc_pipe_peer(void *arg) +inproc_pipe_peer(void *arg) { - nni_inproc_pipe *pipe = arg; + inproc_pipe *pipe = arg; return (pipe->peer); } static int -nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) +inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) { - nni_inproc_pipe *p = arg; - nni_sockaddr sa; + inproc_pipe *p = arg; + nni_sockaddr sa; memset(&sa, 0, sizeof(sa)); sa.s_inproc.sa_family = NNG_AF_INPROC; @@ -327,10 +328,10 @@ nni_inproc_pipe_get_addr(void *arg, void *buf, size_t *szp, nni_opt_type t) } static int -nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) +inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) { - nni_inproc_ep *ep; - nni_sock * sock = nni_dialer_sock(ndialer); + inproc_ep *ep; + nni_sock * sock = nni_dialer_sock(ndialer); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -341,7 +342,7 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) ep->proto = nni_sock_proto_id(sock); ep->rcvmax = 0; ep->ndialer = ndialer; - NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); + NNI_LIST_INIT(&ep->clients, inproc_ep, node); nni_aio_list_init(&ep->aios); ep->addr = url->u_rawurl; // we match on the full URL. @@ -358,10 +359,10 @@ nni_inproc_dialer_init(void **epp, nni_url *url, nni_dialer *ndialer) } static int -nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) +inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) { - nni_inproc_ep *ep; - nni_sock * sock = nni_listener_sock(nlistener); + inproc_ep *ep; + nni_sock * sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); @@ -372,7 +373,7 @@ nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) ep->proto = nni_sock_proto_id(sock); ep->rcvmax = 0; ep->nlistener = nlistener; - NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); + NNI_LIST_INIT(&ep->clients, inproc_ep, node); nni_aio_list_init(&ep->aios); ep->addr = url->u_rawurl; // we match on the full URL. @@ -388,16 +389,15 @@ nni_inproc_listener_init(void **epp, nni_url *url, nni_listener *nlistener) } static void -nni_inproc_ep_fini(void *arg) +inproc_ep_fini(void *arg) { - nni_inproc_ep *ep = arg; + inproc_ep *ep = arg; nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } static void -inproc_conn_finish( - nni_aio *aio, int rv, nni_inproc_ep *ep, nni_inproc_pipe *pipe) +inproc_conn_finish(nni_aio *aio, int rv, inproc_ep *ep, inproc_pipe *pipe) { nni_aio_list_remove(aio); @@ -417,7 +417,7 @@ inproc_conn_finish( static nni_msg * inproc_filter(void *arg, nni_msg *msg) { - nni_inproc_pipe *p = arg; + inproc_pipe *p = arg; if (p->rcvmax && (nni_msg_len(msg) > p->rcvmax)) { nni_stat_inc_atomic(&p->st_rxoversize, 1); nni_msg_free(msg); @@ -427,11 +427,11 @@ inproc_filter(void *arg, nni_msg *msg) } static void -nni_inproc_ep_close(void *arg) +inproc_ep_close(void *arg) { - nni_inproc_ep *ep = arg; - nni_inproc_ep *client; - nni_aio * aio; + inproc_ep *ep = arg; + inproc_ep *client; + nni_aio * aio; nni_mtx_lock(&nni_inproc.mx); if (nni_list_active(&nni_inproc.servers, ep)) { @@ -451,9 +451,9 @@ nni_inproc_ep_close(void *arg) } static void -nni_inproc_accept_clients(nni_inproc_ep *srv) +inproc_accept_clients(inproc_ep *srv) { - nni_inproc_ep *cli, *nclient; + inproc_ep *cli, *nclient; nclient = nni_list_first(&srv->clients); while ((cli = nclient) != NULL) { @@ -461,11 +461,11 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) nclient = nni_list_next(&srv->clients, nclient); NNI_LIST_FOREACH (&cli->aios, caio) { - nni_inproc_pipe *cpipe; - nni_inproc_pipe *spipe; - nni_inproc_pair *pair; - nni_aio * saio; - int rv; + inproc_pipe *cpipe; + inproc_pipe *spipe; + inproc_pair *pair; + nni_aio * saio; + int rv; if ((saio = nni_list_first(&srv->aios)) == NULL) { // No outstanding accept() calls. @@ -482,20 +482,20 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) nni_mtx_init(&pair->mx); spipe = cpipe = NULL; - if (((rv = nni_inproc_pipe_alloc(&cpipe, cli)) != 0) || - ((rv = nni_inproc_pipe_alloc(&spipe, srv)) != 0) || + if (((rv = inproc_pipe_alloc(&cpipe, cli)) != 0) || + ((rv = inproc_pipe_alloc(&spipe, srv)) != 0) || ((rv = nni_msgq_init(&pair->q[0], 1)) != 0) || ((rv = nni_msgq_init(&pair->q[1], 1)) != 0)) { if (cpipe != NULL) { - nni_inproc_pipe_fini(cpipe); + inproc_pipe_fini(cpipe); } if (spipe != NULL) { - nni_inproc_pipe_fini(spipe); + inproc_pipe_fini(spipe); } inproc_conn_finish(caio, rv, cli, NULL); inproc_conn_finish(saio, rv, srv, NULL); - nni_inproc_pair_destroy(pair); + inproc_pair_destroy(pair); continue; } @@ -525,9 +525,9 @@ nni_inproc_accept_clients(nni_inproc_ep *srv) } static void -nni_inproc_ep_cancel(nni_aio *aio, void *arg, int rv) +inproc_ep_cancel(nni_aio *aio, void *arg, int rv) { - nni_inproc_ep *ep = arg; + inproc_ep *ep = arg; nni_mtx_lock(&nni_inproc.mx); if (nni_aio_list_active(aio)) { @@ -539,11 +539,11 @@ nni_inproc_ep_cancel(nni_aio *aio, void *arg, int rv) } static void -nni_inproc_ep_connect(void *arg, nni_aio *aio) +inproc_ep_connect(void *arg, nni_aio *aio) { - nni_inproc_ep *ep = arg; - nni_inproc_ep *server; - int rv; + inproc_ep *ep = arg; + inproc_ep *server; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -566,7 +566,7 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) // We don't have to worry about the case where a zero timeout // on connect was specified, as there is no option to specify // that in the upper API. - if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) { + if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&nni_inproc.mx); nni_aio_finish_error(aio, rv); return; @@ -575,16 +575,16 @@ nni_inproc_ep_connect(void *arg, nni_aio *aio) nni_list_append(&server->clients, ep); nni_aio_list_append(&ep->aios, aio); - nni_inproc_accept_clients(server); + inproc_accept_clients(server); nni_mtx_unlock(&nni_inproc.mx); } static int -nni_inproc_ep_bind(void *arg) +inproc_ep_bind(void *arg) { - nni_inproc_ep *ep = arg; - nni_inproc_ep *srch; - nni_list * list = &nni_inproc.servers; + inproc_ep *ep = arg; + inproc_ep *srch; + nni_list * list = &nni_inproc.servers; nni_mtx_lock(&nni_inproc.mx); NNI_LIST_FOREACH (list, srch) { @@ -599,10 +599,10 @@ nni_inproc_ep_bind(void *arg) } static void -nni_inproc_ep_accept(void *arg, nni_aio *aio) +inproc_ep_accept(void *arg, nni_aio *aio) { - nni_inproc_ep *ep = arg; - int rv; + inproc_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; @@ -612,7 +612,7 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) // We need not worry about the case where a non-blocking // accept was tried -- there is no API to do such a thing. - if ((rv = nni_aio_schedule(aio, nni_inproc_ep_cancel, ep)) != 0) { + if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { nni_mtx_unlock(&nni_inproc.mx); nni_aio_finish_error(aio, rv); return; @@ -621,15 +621,15 @@ nni_inproc_ep_accept(void *arg, nni_aio *aio) // We are already on the master list of servers, thanks to bind. // Insert us into pending server aios, and then run accept list. nni_aio_list_append(&ep->aios, aio); - nni_inproc_accept_clients(ep); + inproc_accept_clients(ep); nni_mtx_unlock(&nni_inproc.mx); } static int inproc_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) { - nni_inproc_ep *ep = arg; - int rv; + inproc_ep *ep = arg; + int rv; nni_mtx_lock(&ep->mtx); rv = nni_copyout_size(ep->rcvmax, v, szp, t); nni_mtx_unlock(&ep->mtx); @@ -639,9 +639,9 @@ inproc_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) static int inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) { - nni_inproc_ep *ep = arg; - size_t val; - int rv; + inproc_ep *ep = arg; + size_t val; + int rv; if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) && (ep != NULL)) { nni_mtx_lock(&ep->mtx); @@ -652,14 +652,25 @@ inproc_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) return (rv); } -static nni_option nni_inproc_pipe_options[] = { +static int +inproc_ep_get_addr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + inproc_ep * ep = arg; + nng_sockaddr sa; + sa.s_inproc.sa_family = NNG_AF_INPROC; + nni_strlcpy( + sa.s_inproc.sa_name, ep->addr, sizeof(sa.s_inproc.sa_name)); + return (nni_copyout_sockaddr(&sa, v, szp, t)); +} + +static const nni_option inproc_pipe_options[] = { { .o_name = NNG_OPT_LOCADDR, - .o_get = nni_inproc_pipe_get_addr, + .o_get = inproc_pipe_get_addr, }, { .o_name = NNG_OPT_REMADDR, - .o_get = nni_inproc_pipe_get_addr, + .o_get = inproc_pipe_get_addr, }, // terminate list { @@ -667,43 +678,73 @@ static nni_option nni_inproc_pipe_options[] = { }, }; -static nni_tran_pipe_ops nni_inproc_pipe_ops = { - .p_init = nni_inproc_pipe_init, - .p_fini = nni_inproc_pipe_fini, - .p_send = nni_inproc_pipe_send, - .p_recv = nni_inproc_pipe_recv, - .p_close = nni_inproc_pipe_close, - .p_peer = nni_inproc_pipe_peer, - .p_options = nni_inproc_pipe_options, +static int +inproc_pipe_getopt( + void *arg, const char *name, void *v, size_t *szp, nni_type t) +{ + return (nni_getopt(inproc_pipe_options, name, arg, v, szp, t)); +} + +static nni_tran_pipe_ops inproc_pipe_ops = { + .p_init = inproc_pipe_init, + .p_fini = inproc_pipe_fini, + .p_send = inproc_pipe_send, + .p_recv = inproc_pipe_recv, + .p_close = inproc_pipe_close, + .p_peer = inproc_pipe_peer, + .p_getopt = inproc_pipe_getopt, }; -static nni_option nni_inproc_ep_options[] = { +static const nni_option inproc_ep_options[] = { { .o_name = NNG_OPT_RECVMAXSZ, .o_get = inproc_ep_get_recvmaxsz, .o_set = inproc_ep_set_recvmaxsz, }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = inproc_ep_get_addr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = inproc_ep_get_addr, + }, // terminate list { .o_name = NULL, }, }; -static nni_tran_dialer_ops nni_inproc_dialer_ops = { - .d_init = nni_inproc_dialer_init, - .d_fini = nni_inproc_ep_fini, - .d_connect = nni_inproc_ep_connect, - .d_close = nni_inproc_ep_close, - .d_options = nni_inproc_ep_options, +static int +inproc_ep_getopt(void *arg, const char *name, void *v, size_t *szp, nni_type t) +{ + return (nni_getopt(inproc_ep_options, name, arg, v, szp, t)); +} + +static int +inproc_ep_setopt( + void *arg, const char *name, const void *v, size_t sz, nni_type t) +{ + return (nni_setopt(inproc_ep_options, name, arg, v, sz, t)); +} + +static nni_tran_dialer_ops inproc_dialer_ops = { + .d_init = inproc_dialer_init, + .d_fini = inproc_ep_fini, + .d_connect = inproc_ep_connect, + .d_close = inproc_ep_close, + .d_getopt = inproc_ep_getopt, + .d_setopt = inproc_ep_setopt, }; -static nni_tran_listener_ops nni_inproc_listener_ops = { - .l_init = nni_inproc_listener_init, - .l_fini = nni_inproc_ep_fini, - .l_bind = nni_inproc_ep_bind, - .l_accept = nni_inproc_ep_accept, - .l_close = nni_inproc_ep_close, - .l_options = nni_inproc_ep_options, +static nni_tran_listener_ops inproc_listener_ops = { + .l_init = inproc_listener_init, + .l_fini = inproc_ep_fini, + .l_bind = inproc_ep_bind, + .l_accept = inproc_ep_accept, + .l_close = inproc_ep_close, + .l_getopt = inproc_ep_getopt, + .l_setopt = inproc_ep_setopt, }; // This is the inproc transport linkage, and should be the only global @@ -711,11 +752,11 @@ static nni_tran_listener_ops nni_inproc_listener_ops = { struct nni_tran nni_inproc_tran = { .tran_version = NNI_TRANSPORT_VERSION, .tran_scheme = "inproc", - .tran_dialer = &nni_inproc_dialer_ops, - .tran_listener = &nni_inproc_listener_ops, - .tran_pipe = &nni_inproc_pipe_ops, - .tran_init = nni_inproc_init, - .tran_fini = nni_inproc_fini, + .tran_dialer = &inproc_dialer_ops, + .tran_listener = &inproc_listener_ops, + .tran_pipe = &inproc_pipe_ops, + .tran_init = inproc_init, + .tran_fini = inproc_fini, }; int |
