diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-01-21 22:40:10 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-16 19:22:27 -0800 |
| commit | 5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch) | |
| tree | bf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/transport/ipc | |
| parent | ca655b9db689ee0e655248b1a9f166b8db6cc984 (diff) | |
| download | nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2 nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip | |
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic
stream API for all transports. There have been related bugs fixed
along the way. Additionally the man pages have changed.
The old non-polymorphic APIs are removed now. This is a breaking
change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 142 |
1 files changed, 72 insertions, 70 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 609b1811..e0d83be0 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -27,7 +27,7 @@ typedef struct ipctran_ep ipctran_ep; // ipc_pipe is one end of an IPC connection. struct ipctran_pipe { - nni_ipc_conn * conn; + nng_stream * conn; uint16_t peer; uint16_t proto; size_t rcvmax; @@ -58,17 +58,17 @@ struct ipctran_pipe { }; struct ipctran_ep { - nni_mtx mtx; - nni_sockaddr sa; - size_t rcvmax; - uint16_t proto; - nni_list pipes; - bool fini; - nni_ipc_dialer * dialer; - nni_ipc_listener *listener; - nni_reap_item reap; - nni_dialer * ndialer; - nni_listener * nlistener; + nni_mtx mtx; + nni_sockaddr sa; + size_t rcvmax; + uint16_t proto; + nni_list pipes; + bool fini; + nng_stream_dialer * dialer; + nng_stream_listener *listener; + nni_reap_item reap; + nni_dialer * ndialer; + nni_listener * nlistener; }; static void ipctran_pipe_send_start(ipctran_pipe *); @@ -104,7 +104,7 @@ ipctran_pipe_close(void *arg) nni_aio_close(p->negoaio); nni_aio_close(p->connaio); - nni_ipc_conn_close(p->conn); + nng_stream_close(p->conn); } static void @@ -145,9 +145,7 @@ ipctran_pipe_fini(void *arg) nni_aio_fini(p->txaio); nni_aio_fini(p->negoaio); nni_aio_fini(p->connaio); - if (p->conn != NULL) { - nni_ipc_conn_fini(p->conn); - } + nng_stream_free(p->conn); if (p->rxmsg) { nni_msg_free(p->rxmsg); } @@ -160,7 +158,7 @@ ipctran_pipe_reap(ipctran_pipe *p) { if (!nni_atomic_flag_test_and_set(&p->reaped)) { if (p->conn != NULL) { - nni_ipc_conn_close(p->conn); + nng_stream_close(p->conn); } nni_reap(&p->reap, ipctran_pipe_fini, p); } @@ -195,7 +193,6 @@ ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep) p->proto = ep->proto; p->rcvmax = ep->rcvmax; - p->sa = ep->sa; p->ep = ep; *pipep = p; @@ -244,7 +241,7 @@ ipctran_pipe_conn_cb(void *arg) iov.iov_len = 8; iov.iov_buf = &p->txhead[0]; nni_aio_set_iov(p->negoaio, 1, &iov); - nni_ipc_conn_send(p->conn, p->negoaio); + nng_stream_send(p->conn, p->negoaio); nni_mtx_unlock(&ep->mtx); } @@ -278,7 +275,7 @@ ipctran_pipe_nego_cb(void *arg) iov.iov_buf = &p->txhead[p->gottxhead]; nni_aio_set_iov(aio, 1, &iov); // send it down... - nni_ipc_conn_send(p->conn, aio); + nng_stream_send(p->conn, aio); nni_mtx_unlock(&p->ep->mtx); return; } @@ -287,7 +284,7 @@ ipctran_pipe_nego_cb(void *arg) iov.iov_len = p->wantrxhead - p->gotrxhead; iov.iov_buf = &p->rxhead[p->gotrxhead]; nni_aio_set_iov(aio, 1, &iov); - nni_ipc_conn_recv(p->conn, aio); + nng_stream_recv(p->conn, aio); nni_mtx_unlock(&p->ep->mtx); return; } @@ -343,7 +340,7 @@ ipctran_pipe_send_cb(void *arg) n = nni_aio_count(txaio); nni_aio_iov_advance(txaio, n); if (nni_aio_iov_count(txaio) != 0) { - nni_ipc_conn_send(p->conn, txaio); + nng_stream_send(p->conn, txaio); nni_mtx_unlock(&p->mtx); return; } @@ -385,7 +382,7 @@ ipctran_pipe_recv_cb(void *arg) nni_aio_iov_advance(rxaio, n); if (nni_aio_iov_count(rxaio) != 0) { // Was this a partial read? If so then resubmit for the rest. - nni_ipc_conn_recv(p->conn, rxaio); + nng_stream_recv(p->conn, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -429,7 +426,7 @@ ipctran_pipe_recv_cb(void *arg) iov.iov_len = (size_t) len; nni_aio_set_iov(rxaio, 1, &iov); - nni_ipc_conn_recv(p->conn, rxaio); + nng_stream_recv(p->conn, rxaio); nni_mtx_unlock(&p->mtx); return; } @@ -531,7 +528,7 @@ ipctran_pipe_send_start(ipctran_pipe *p) niov++; } nni_aio_set_iov(txaio, niov, iov); - nni_ipc_conn_send(p->conn, txaio); + nng_stream_send(p->conn, txaio); } static void @@ -604,7 +601,7 @@ ipctran_pipe_recv_start(ipctran_pipe *p) iov.iov_len = sizeof(p->rxhead); nni_aio_set_iov(rxaio, 1, &iov); - nni_ipc_conn_recv(p->conn, rxaio); + nng_stream_recv(p->conn, rxaio); } static void @@ -670,12 +667,8 @@ ipctran_ep_fini(void *arg) nni_mtx_unlock(&ep->mtx); return; } - if (ep->dialer != NULL) { - nni_ipc_dialer_fini(ep->dialer); - } - if (ep->listener != NULL) { - nni_ipc_listener_fini(ep->listener); - } + nng_stream_dialer_free(ep->dialer); + nng_stream_listener_free(ep->listener); nni_mtx_unlock(&ep->mtx); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); @@ -694,14 +687,14 @@ ipctran_ep_close(void *arg) nni_aio_close(p->txaio); nni_aio_close(p->rxaio); if (p->conn != NULL) { - nni_ipc_conn_close(p->conn); + nng_stream_close(p->conn); } } if (ep->dialer != NULL) { - nni_ipc_dialer_close(ep->dialer); + nng_stream_dialer_close(ep->dialer); } if (ep->listener != NULL) { - nni_ipc_listener_close(ep->listener); + nng_stream_listener_close(ep->listener); } nni_mtx_unlock(&ep->mtx); } @@ -711,7 +704,6 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) { ipctran_ep *ep; int rv; - size_t sz; nni_sock * sock = nni_dialer_sock(ndialer); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { @@ -720,17 +712,10 @@ ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) nni_mtx_init(&ep->mtx); NNI_LIST_INIT(&ep->pipes, ipctran_pipe, node); - sz = sizeof(ep->sa.s_ipc.sa_path); - ep->sa.s_ipc.sa_family = NNG_AF_IPC; - ep->proto = nni_sock_proto_id(sock); - ep->ndialer = ndialer; - - if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { - ipctran_ep_fini(ep); - return (NNG_EADDRINVAL); - } + ep->proto = nni_sock_proto_id(sock); + ep->ndialer = ndialer; - if ((rv = nni_ipc_dialer_init(&ep->dialer)) != 0) { + if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) { ipctran_ep_fini(ep); return (rv); } @@ -744,7 +729,6 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) { ipctran_ep *ep; int rv; - size_t sz; nni_sock * sock = nni_listener_sock(nlistener); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { @@ -753,17 +737,10 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) nni_mtx_init(&ep->mtx); NNI_LIST_INIT(&ep->pipes, ipctran_pipe, node); - sz = sizeof(ep->sa.s_ipc.sa_path); - ep->sa.s_ipc.sa_family = NNG_AF_IPC; - ep->proto = nni_sock_proto_id(sock); - ep->nlistener = nlistener; + ep->proto = nni_sock_proto_id(sock); + ep->nlistener = nlistener; - if (nni_strlcpy(ep->sa.s_ipc.sa_path, url->u_path, sz) >= sz) { - ipctran_ep_fini(ep); - return (NNG_EADDRINVAL); - } - - if ((rv = nni_ipc_listener_init(&ep->listener)) != 0) { + if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { ipctran_ep_fini(ep); return (rv); } @@ -797,7 +774,7 @@ ipctran_ep_connect(void *arg, nni_aio *aio) return; } p->useraio = aio; - nni_ipc_dialer_dial(ep->dialer, &p->sa, p->connaio); + nng_stream_dialer_dial(ep->dialer, p->connaio); nni_mtx_unlock(&ep->mtx); } @@ -839,7 +816,7 @@ ipctran_ep_bind(void *arg) int rv; nni_mtx_lock(&ep->mtx); - rv = nni_ipc_listener_listen(ep->listener, &ep->sa); + rv = nng_stream_listener_listen(ep->listener); nni_mtx_unlock(&ep->mtx); return (rv); } @@ -869,7 +846,7 @@ ipctran_ep_accept(void *arg, nni_aio *aio) return; } p->useraio = aio; - nni_ipc_listener_accept(ep->listener, p->connaio); + nng_stream_listener_accept(ep->listener, p->connaio); nni_mtx_unlock(&ep->mtx); } @@ -879,8 +856,7 @@ ipctran_pipe_getopt( { ipctran_pipe *p = arg; - // We defer to the platform getopt code for IPC connections. - return (nni_ipc_conn_getopt(p->conn, name, buf, szp, t)); + return (nni_stream_getx(p->conn, name, buf, szp, t)); } static nni_tran_pipe_ops ipctran_pipe_ops = { @@ -915,7 +891,7 @@ ipctran_dialer_getopt( rv = nni_getopt(ipctran_ep_options, name, ep, buf, szp, t); if (rv == NNG_ENOTSUP) { - rv = nni_ipc_dialer_getopt(ep->dialer, name, buf, szp, t); + rv = nni_stream_dialer_getx(ep->dialer, name, buf, szp, t); } return (rv); } @@ -929,8 +905,7 @@ ipctran_dialer_setopt( rv = nni_setopt(ipctran_ep_options, name, ep, buf, sz, t); if (rv == NNG_ENOTSUP) { - rv = nni_ipc_dialer_setopt( - ep != NULL ? ep->dialer : NULL, name, buf, sz, t); + rv = nni_stream_dialer_setx(ep->dialer, name, buf, sz, t); } return (rv); } @@ -944,7 +919,7 @@ ipctran_listener_getopt( rv = nni_getopt(ipctran_ep_options, name, ep, buf, szp, t); if (rv == NNG_ENOTSUP) { - rv = nni_ipc_listener_getopt(ep->listener, name, buf, szp, t); + rv = nni_stream_listener_getx(ep->listener, name, buf, szp, t); } return (rv); } @@ -958,8 +933,34 @@ ipctran_listener_setopt( rv = nni_setopt(ipctran_ep_options, name, ep, buf, sz, t); if (rv == NNG_ENOTSUP) { - rv = nni_ipc_listener_setopt( - ep != NULL ? ep->listener : NULL, name, buf, sz, t); + rv = nni_stream_listener_setx(ep->listener, name, buf, sz, t); + } + return (rv); +} + +static int +ipctran_check_recvmaxsz(const void *v, size_t sz, nni_type t) +{ + return (nni_copyin_size(NULL, v, sz, 0, NNI_MAXSZ, t)); +} + +static nni_chkoption ipctran_checkopts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_check = ipctran_check_recvmaxsz, + }, + { + .o_name = NULL, + }, +}; + +static int +ipctran_checkopt(const char *name, const void *buf, size_t sz, nni_type t) +{ + int rv; + rv = nni_chkopt(ipctran_checkopts, name, buf, sz, t); + if (rv == NNG_ENOTSUP) { + rv = nni_stream_checkopt("ipc", name, buf, sz, t); } return (rv); } @@ -991,6 +992,7 @@ static nni_tran ipc_tran = { .tran_pipe = &ipctran_pipe_ops, .tran_init = ipctran_init, .tran_fini = ipctran_fini, + .tran_checkopt = ipctran_checkopt, }; int |
