diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-01-21 22:40:10 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-16 19:22:27 -0800 |
| commit | 5cf750697624d4fd63cfe26921209d7c30e1a2d2 (patch) | |
| tree | bf11695e5f1ec5e400c87da0cc6ff23935a2eeff /src/platform/posix | |
| parent | ca655b9db689ee0e655248b1a9f166b8db6cc984 (diff) | |
| download | nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.gz nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.tar.bz2 nng-5cf750697624d4fd63cfe26921209d7c30e1a2d2.zip | |
fixes #872 create unified nng_stream API
This is a major change, and includes changes to use a polymorphic
stream API for all transports. There have been related bugs fixed
along the way. Additionally the man pages have changed.
The old non-polymorphic APIs are removed now. This is a breaking
change, but the old APIs were never part of any released public API.
Diffstat (limited to 'src/platform/posix')
| -rw-r--r-- | src/platform/posix/posix_ipc.h | 28 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcconn.c | 197 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipcdial.c | 100 | ||||
| -rw-r--r-- | src/platform/posix/posix_ipclisten.c | 173 | ||||
| -rw-r--r-- | src/platform/posix/posix_resolv_gai.c | 7 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcp.h | 28 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpconn.c | 186 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcpdial.c | 35 | ||||
| -rw-r--r-- | src/platform/posix/posix_tcplisten.c | 14 |
9 files changed, 410 insertions, 358 deletions
diff --git a/src/platform/posix/posix_ipc.h b/src/platform/posix/posix_ipc.h index bbe11f0d..f570b172 100644 --- a/src/platform/posix/posix_ipc.h +++ b/src/platform/posix/posix_ipc.h @@ -1,7 +1,7 @@ // // Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -10,6 +10,7 @@ // #include "core/nng_impl.h" +#include "core/stream.h" #ifdef NNG_PLATFORM_POSIX #include "platform/posix/posix_aio.h" @@ -17,6 +18,7 @@ #include <sys/types.h> // For mode_t struct nni_ipc_conn { + nng_stream stream; nni_posix_pfd * pfd; nni_list readq; nni_list writeq; @@ -24,27 +26,17 @@ struct nni_ipc_conn { nni_mtx mtx; nni_aio * dial_aio; nni_ipc_dialer *dialer; - nni_reap_item reap; }; struct nni_ipc_dialer { - nni_list connq; // pending connections - bool closed; - nni_mtx mtx; + nng_stream_dialer sd; + nni_list connq; // pending connections + bool closed; + nni_mtx mtx; + nng_sockaddr sa; }; -struct nni_ipc_listener { - nni_posix_pfd *pfd; - nng_sockaddr sa; - nni_list acceptq; - bool started; - bool closed; - char * path; - mode_t perms; - nni_mtx mtx; -}; - -extern int nni_posix_ipc_conn_init(nni_ipc_conn **, nni_posix_pfd *); -extern void nni_posix_ipc_conn_start(nni_ipc_conn *); +extern int nni_posix_ipc_init(nni_ipc_conn **, nni_posix_pfd *); +extern void nni_posix_ipc_start(nni_ipc_conn *); #endif // NNG_PLATFORM_POSIX diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 48bd75a4..07ec6213 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -39,8 +39,10 @@ #include "posix_ipc.h" +typedef struct nni_ipc_conn ipc_conn; + static void -ipc_conn_dowrite(nni_ipc_conn *c) +ipc_dowrite(ipc_conn *c) { nni_aio *aio; int fd; @@ -122,7 +124,7 @@ ipc_conn_dowrite(nni_ipc_conn *c) } static void -ipc_conn_doread(nni_ipc_conn *c) +ipc_doread(ipc_conn *c) { nni_aio *aio; int fd; @@ -199,9 +201,10 @@ ipc_conn_doread(nni_ipc_conn *c) } } -void -nni_ipc_conn_close(nni_ipc_conn *c) +static void +ipc_close(void *arg) { + ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (!c->closed) { nni_aio *aio; @@ -217,20 +220,20 @@ nni_ipc_conn_close(nni_ipc_conn *c) } static void -ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +ipc_cb(nni_posix_pfd *pfd, int events, void *arg) { - nni_ipc_conn *c = arg; + ipc_conn *c = arg; if (events & (POLLHUP | POLLERR | POLLNVAL)) { - nni_ipc_conn_close(c); + ipc_close(c); return; } nni_mtx_lock(&c->mtx); if (events & POLLIN) { - ipc_conn_doread(c); + ipc_doread(c); } if (events & POLLOUT) { - ipc_conn_dowrite(c); + ipc_dowrite(c); } events = 0; if (!nni_list_empty(&c->writeq)) { @@ -246,9 +249,9 @@ ipc_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -ipc_conn_cancel(nni_aio *aio, void *arg, int rv) +ipc_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_conn *c = arg; + ipc_conn *c = arg; nni_mtx_lock(&c->mtx); if (nni_aio_list_active(aio)) { @@ -258,18 +261,18 @@ ipc_conn_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&c->mtx); } -void -nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) +static void +ipc_send(void *arg, nni_aio *aio) { - - int rv; + ipc_conn *c = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; @@ -277,7 +280,7 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) nni_aio_list_append(&c->writeq, aio); if (nni_list_first(&c->writeq) == aio) { - ipc_conn_dowrite(c); + ipc_dowrite(c); // If we are still the first thing on the list, that // means we didn't finish the job, so arm the poller to // complete us. @@ -288,17 +291,18 @@ nni_ipc_conn_send(nni_ipc_conn *c, nni_aio *aio) nni_mtx_unlock(&c->mtx); } -void -nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) +static void +ipc_recv(void *arg, nni_aio *aio) { - int rv; + ipc_conn *c = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, ipc_conn_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; @@ -310,7 +314,7 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) // many cases. We also need not arm a list if it was already // armed. if (nni_list_first(&c->readq) == aio) { - ipc_conn_doread(c); + ipc_doread(c); // If we are still the first thing on the list, that // means we didn't finish the job, so arm the poller to // complete us. @@ -322,8 +326,8 @@ nni_ipc_conn_recv(nni_ipc_conn *c, nni_aio *aio) } static int -ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid, - uint64_t *prid, uint64_t *znid) +ipc_peerid(ipc_conn *c, uint64_t *euid, uint64_t *egid, uint64_t *prid, + uint64_t *znid) { int fd = nni_posix_pfd_fd(c->pfd); #if defined(NNG_HAVE_GETPEEREID) @@ -403,43 +407,43 @@ ipc_conn_peerid(nni_ipc_conn *c, uint64_t *euid, uint64_t *egid, #endif } -int -ipc_conn_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t) +static int +ipc_get_peer_uid(void *arg, void *buf, size_t *szp, nni_type t) { - nni_ipc_conn *c = arg; - int rv; - uint64_t ignore; - uint64_t id; + ipc_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id; - if ((rv = ipc_conn_peerid(c, &id, &ignore, &ignore, &ignore)) != 0) { + if ((rv = ipc_peerid(c, &id, &ignore, &ignore, &ignore)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_conn_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t) +ipc_get_peer_gid(void *arg, void *buf, size_t *szp, nni_type t) { - nni_ipc_conn *c = arg; - int rv; - uint64_t ignore; - uint64_t id; + ipc_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id; - if ((rv = ipc_conn_peerid(c, &ignore, &id, &ignore, &ignore)) != 0) { + if ((rv = ipc_peerid(c, &ignore, &id, &ignore, &ignore)) != 0) { return (rv); } return (nni_copyout_u64(id, buf, szp, t)); } static int -ipc_conn_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t) +ipc_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t) { - nni_ipc_conn *c = arg; - int rv; - uint64_t ignore; - uint64_t id; + ipc_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id; - if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) { + if ((rv = ipc_peerid(c, &ignore, &ignore, &ignore, &id)) != 0) { return (rv); } if (id == (uint64_t) -1) { @@ -450,14 +454,14 @@ ipc_conn_get_peer_zoneid(void *arg, void *buf, size_t *szp, nni_type t) } static int -ipc_conn_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t) +ipc_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t) { - nni_ipc_conn *c = arg; - int rv; - uint64_t ignore; - uint64_t id; + ipc_conn *c = arg; + int rv; + uint64_t ignore; + uint64_t id; - if ((rv = ipc_conn_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) { + if ((rv = ipc_peerid(c, &ignore, &ignore, &id, &ignore)) != 0) { return (rv); } if (id == (uint64_t) -1) { @@ -468,9 +472,9 @@ ipc_conn_get_peer_pid(void *arg, void *buf, size_t *szp, nni_type t) } static int -ipc_conn_get_addr(void *arg, void *buf, size_t *szp, nni_type t) +ipc_get_addr(void *arg, void *buf, size_t *szp, nni_type t) { - nni_ipc_conn * c = arg; + ipc_conn * c = arg; nni_sockaddr sa; struct sockaddr_storage ss; socklen_t sslen = sizeof(ss); @@ -486,36 +490,17 @@ ipc_conn_get_addr(void *arg, void *buf, size_t *szp, nni_type t) return (nni_copyout_sockaddr(&sa, buf, szp, t)); } -int -nni_posix_ipc_conn_init(nni_ipc_conn **cp, nni_posix_pfd *pfd) -{ - nni_ipc_conn *c; - - if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { - return (NNG_ENOMEM); - } - - c->closed = false; - c->pfd = pfd; - - nni_mtx_init(&c->mtx); - nni_aio_list_init(&c->readq); - nni_aio_list_init(&c->writeq); - - *cp = c; - return (0); -} - void -nni_posix_ipc_conn_start(nni_ipc_conn *c) +nni_posix_ipc_start(nni_ipc_conn *c) { - nni_posix_pfd_set_cb(c->pfd, ipc_conn_cb, c); + nni_posix_pfd_set_cb(c->pfd, ipc_cb, c); } -void -nni_ipc_conn_fini(nni_ipc_conn *c) +static void +ipc_free(void *arg) { - nni_ipc_conn_close(c); + ipc_conn *c = arg; + ipc_close(c); nni_posix_pfd_fini(c->pfd); nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN c->pfd = NULL; @@ -525,46 +510,72 @@ nni_ipc_conn_fini(nni_ipc_conn *c) NNI_FREE_STRUCT(c); } -static const nni_option ipc_conn_options[] = { +static const nni_option ipc_options[] = { { .o_name = NNG_OPT_LOCADDR, - .o_get = ipc_conn_get_addr, + .o_get = ipc_get_addr, }, { .o_name = NNG_OPT_REMADDR, - .o_get = ipc_conn_get_addr, + .o_get = ipc_get_addr, }, { .o_name = NNG_OPT_IPC_PEER_PID, - .o_get = ipc_conn_get_peer_pid, + .o_get = ipc_get_peer_pid, }, { .o_name = NNG_OPT_IPC_PEER_UID, - .o_get = ipc_conn_get_peer_uid, + .o_get = ipc_get_peer_uid, }, { .o_name = NNG_OPT_IPC_PEER_GID, - .o_get = ipc_conn_get_peer_gid, + .o_get = ipc_get_peer_gid, }, { .o_name = NNG_OPT_IPC_PEER_ZONEID, - .o_get = ipc_conn_get_peer_zoneid, + .o_get = ipc_get_peer_zoneid, }, { .o_name = NULL, }, }; -int -nni_ipc_conn_getopt( - nni_ipc_conn *c, const char *name, void *val, size_t *szp, nni_type t) +static int +ipc_getx(void *arg, const char *name, void *val, size_t *szp, nni_type t) { - return (nni_getopt(ipc_conn_options, name, c, val, szp, t)); + ipc_conn *c = arg; + return (nni_getopt(ipc_options, name, c, val, szp, t)); +} + +static int +ipc_setx(void *arg, const char *name, const void *val, size_t sz, nni_type t) +{ + ipc_conn *c = arg; + return (nni_setopt(ipc_options, name, c, val, sz, t)); } int -nni_ipc_conn_setopt( - nni_ipc_conn *c, const char *name, const void *val, size_t sz, nni_type t) +nni_posix_ipc_init(nni_ipc_conn **cp, nni_posix_pfd *pfd) { - return (nni_setopt(ipc_conn_options, name, c, val, sz, t)); -}
\ No newline at end of file + ipc_conn *c; + + if ((c = NNI_ALLOC_STRUCT(c)) == NULL) { + return (NNG_ENOMEM); + } + + c->closed = false; + c->pfd = pfd; + c->stream.s_free = ipc_free; + c->stream.s_close = ipc_close; + c->stream.s_send = ipc_send; + c->stream.s_recv = ipc_recv; + c->stream.s_getx = ipc_getx; + c->stream.s_setx = ipc_setx; + + nni_mtx_init(&c->mtx); + nni_aio_list_init(&c->readq); + nni_aio_list_init(&c->writeq); + + *cp = c; + return (0); +} diff --git a/src/platform/posix/posix_ipcdial.c b/src/platform/posix/posix_ipcdial.c index d3dc2109..3182b390 100644 --- a/src/platform/posix/posix_ipcdial.c +++ b/src/platform/posix/posix_ipcdial.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -29,25 +29,13 @@ #include "posix_ipc.h" -// Dialer stuff. -int -nni_ipc_dialer_init(nni_ipc_dialer **dp) -{ - nni_ipc_dialer *d; +typedef struct nni_ipc_dialer ipc_dialer; - if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { - return (NNG_ENOMEM); - } - nni_mtx_init(&d->mtx); - d->closed = false; - nni_aio_list_init(&d->connq); - *dp = d; - return (0); -} - -void -nni_ipc_dialer_close(nni_ipc_dialer *d) +// Dialer stuff. +static void +ipc_dialer_close(void *arg) { + ipc_dialer *d = arg; nni_mtx_lock(&d->mtx); if (!d->closed) { nni_aio *aio; @@ -58,9 +46,8 @@ nni_ipc_dialer_close(nni_ipc_dialer *d) if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { c->dial_aio = NULL; nni_aio_set_prov_extra(aio, 0, NULL); - nni_ipc_conn_close(c); - nni_reap( - &c->reap, (nni_cb) nni_ipc_conn_fini, c); + nng_stream_close(&c->stream); + nng_stream_free(&c->stream); } nni_aio_finish_error(aio, NNG_ECLOSED); } @@ -68,10 +55,11 @@ nni_ipc_dialer_close(nni_ipc_dialer *d) nni_mtx_unlock(&d->mtx); } -void -nni_ipc_dialer_fini(nni_ipc_dialer *d) +static void +ipc_dialer_free(void *arg) { - nni_ipc_dialer_close(d); + ipc_dialer *d = arg; + ipc_dialer_close(d); nni_mtx_fini(&d->mtx); NNI_FREE_STRUCT(d); } @@ -94,7 +82,7 @@ ipc_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); - nni_ipc_conn_fini(c); + nng_stream_free(&c->stream); } static void @@ -137,13 +125,13 @@ ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) nni_mtx_unlock(&d->mtx); if (rv != 0) { - nni_ipc_conn_close(c); - nni_ipc_conn_fini(c); + nng_stream_close(&c->stream); + nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); return; } - nni_posix_ipc_conn_start(c); + nni_posix_ipc_start(c); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); } @@ -151,8 +139,9 @@ ipc_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) // We don't give local address binding support. Outbound dialers always // get an ephemeral port. void -nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +ipc_dialer_dial(void *arg, nni_aio *aio) { + ipc_dialer * d = arg; nni_ipc_conn * c; nni_posix_pfd * pfd = NULL; struct sockaddr_storage ss; @@ -164,7 +153,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) return; } - if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + if (((sslen = nni_posix_nn2sockaddr(&ss, &d->sa)) == 0) || (ss.ss_family != AF_UNIX)) { nni_aio_finish_error(aio, NNG_EADDRINVAL); return; @@ -182,7 +171,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) { + if ((rv = nni_posix_ipc_init(&c, pfd)) != 0) { nni_posix_pfd_fini(pfd); nni_aio_finish_error(aio, rv); return; @@ -222,7 +211,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) // on loopback, and probably not on every platform. nni_aio_set_prov_extra(aio, 0, NULL); nni_mtx_unlock(&d->mtx); - nni_posix_ipc_conn_start(c); + nni_posix_ipc_start(c); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); return; @@ -230,7 +219,7 @@ nni_ipc_dialer_dial(nni_ipc_dialer *d, const nni_sockaddr *sa, nni_aio *aio) error: nni_aio_set_prov_extra(aio, 0, NULL); nni_mtx_unlock(&d->mtx); - nni_reap(&c->reap, (nni_cb) nni_ipc_conn_fini, c); + nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } @@ -241,15 +230,44 @@ static const nni_option ipc_dialer_options[] = { }; int -nni_ipc_dialer_getopt( - nni_ipc_dialer *d, const char *name, void *buf, size_t *szp, nni_type t) +ipc_dialer_getx(void *arg, const char *nm, void *buf, size_t *szp, nni_type t) +{ + ipc_dialer *d = arg; + return (nni_getopt(ipc_dialer_options, nm, d, buf, szp, t)); +} + +int +ipc_dialer_setx( + void *arg, const char *nm, const void *buf, size_t sz, nni_type t) { - return (nni_getopt(ipc_dialer_options, name, d, buf, szp, t)); + ipc_dialer *d = arg; + return (nni_setopt(ipc_dialer_options, nm, d, buf, sz, t)); } int -nni_ipc_dialer_setopt(nni_ipc_dialer *d, const char *name, const void *buf, - size_t sz, nni_type t) +nni_ipc_dialer_alloc(nng_stream_dialer **dp, const nng_url *url) { - return (nni_setopt(ipc_dialer_options, name, d, buf, sz, t)); -}
\ No newline at end of file + ipc_dialer *d; + + if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) || + (strlen(url->u_path) == 0) || + (strlen(url->u_path) >= NNG_MAXADDRLEN)) { + return (NNG_EADDRINVAL); + } + if ((d = NNI_ALLOC_STRUCT(d)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&d->mtx); + nni_aio_list_init(&d->connq); + d->closed = false; + d->sa.s_ipc.sa_family = NNG_AF_IPC; + strcpy(d->sa.s_ipc.sa_path, url->u_path); + d->sd.sd_free = ipc_dialer_free; + d->sd.sd_close = ipc_dialer_close; + d->sd.sd_dial = ipc_dialer_dial; + d->sd.sd_getx = ipc_dialer_getx; + d->sd.sd_setx = ipc_dialer_setx; + + *dp = (void *) d; + return (0); +} diff --git a/src/platform/posix/posix_ipclisten.c b/src/platform/posix/posix_ipclisten.c index 11b56ab0..2b5d0edd 100644 --- a/src/platform/posix/posix_ipclisten.c +++ b/src/platform/posix/posix_ipclisten.c @@ -1,7 +1,7 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> -// Copyright 2018 Devolutions <info@devolutions.net> +// Copyright 2019 Devolutions <info@devolutions.net> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -29,28 +29,20 @@ #include "posix_ipc.h" -int -nni_ipc_listener_init(nni_ipc_listener **lp) -{ - nni_ipc_listener *l; - if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { - return (NNG_ENOMEM); - } - - nni_mtx_init(&l->mtx); - - l->pfd = NULL; - l->closed = false; - l->started = false; - l->perms = 0; - - nni_aio_list_init(&l->acceptq); - *lp = l; - return (0); -} +typedef struct { + nng_stream_listener sl; + nni_posix_pfd * pfd; + nng_sockaddr sa; + nni_list acceptq; + bool started; + bool closed; + char * path; + mode_t perms; + nni_mtx mtx; +} ipc_listener; static void -ipc_listener_doclose(nni_ipc_listener *l) +ipc_listener_doclose(ipc_listener *l) { nni_aio *aio; char * path; @@ -71,16 +63,17 @@ ipc_listener_doclose(nni_ipc_listener *l) } } -void -nni_ipc_listener_close(nni_ipc_listener *l) +static void +ipc_listener_close(void *arg) { + ipc_listener *l = arg; nni_mtx_lock(&l->mtx); ipc_listener_doclose(l); nni_mtx_unlock(&l->mtx); } static void -ipc_listener_doaccept(nni_ipc_listener *l) +ipc_listener_doaccept(ipc_listener *l) { nni_aio *aio; @@ -138,7 +131,7 @@ ipc_listener_doaccept(nni_ipc_listener *l) continue; } - if ((rv = nni_posix_ipc_conn_init(&c, pfd)) != 0) { + if ((rv = nni_posix_ipc_init(&c, pfd)) != 0) { nni_posix_pfd_fini(pfd); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -146,7 +139,7 @@ ipc_listener_doaccept(nni_ipc_listener *l) } nni_aio_list_remove(aio); - nni_posix_ipc_conn_start(c); + nni_posix_ipc_start(c); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); } @@ -155,7 +148,7 @@ ipc_listener_doaccept(nni_ipc_listener *l) static void ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg) { - nni_ipc_listener *l = arg; + ipc_listener *l = arg; NNI_ARG_UNUSED(pfd); nni_mtx_lock(&l->mtx); @@ -173,7 +166,7 @@ ipc_listener_cb(nni_posix_pfd *pfd, int events, void *arg) static void ipc_listener_cancel(nni_aio *aio, void *arg, int rv) { - nni_ipc_listener *l = arg; + ipc_listener *l = arg; // This is dead easy, because we'll ignore the completion if there // isn't anything to do the accept on! @@ -222,16 +215,16 @@ ipc_remove_stale(const char *path) static int ipc_listener_get_addr(void *arg, void *buf, size_t *szp, nni_type t) { - nni_ipc_listener *l = arg; + ipc_listener *l = arg; return (nni_copyout_sockaddr(&l->sa, buf, szp, t)); } static int ipc_listener_set_perms(void *arg, const void *buf, size_t sz, nni_type t) { - nni_ipc_listener *l = arg; - int mode; - int rv; + ipc_listener *l = arg; + int mode; + int rv; if ((rv = nni_copyin_int(&mode, buf, sz, 0, S_IFMT, t)) != 0) { return (rv); @@ -239,16 +232,14 @@ ipc_listener_set_perms(void *arg, const void *buf, size_t sz, nni_type t) if ((mode & S_IFMT) != 0) { return (NNG_EINVAL); } - if (l != NULL) { - mode |= S_IFSOCK; // set IFSOCK to ensure non-zero - nni_mtx_lock(&l->mtx); - if (l->started) { - nni_mtx_unlock(&l->mtx); - return (NNG_EBUSY); - } - l->perms = mode; + mode |= S_IFSOCK; // set IFSOCK to ensure non-zero + nni_mtx_lock(&l->mtx); + if (l->started) { nni_mtx_unlock(&l->mtx); + return (NNG_EBUSY); } + l->perms = mode; + nni_mtx_unlock(&l->mtx); return (0); } @@ -266,23 +257,26 @@ static const nni_option ipc_listener_options[] = { }, }; -int -nni_ipc_listener_getopt( - nni_ipc_listener *l, const char *name, void *buf, size_t *szp, nni_type t) +static int +ipc_listener_getx( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) { + ipc_listener *l = arg; return (nni_getopt(ipc_listener_options, name, l, buf, szp, t)); } -int -nni_ipc_listener_setopt(nni_ipc_listener *l, const char *name, const void *buf, - size_t sz, nni_type t) +static int +ipc_listener_setx( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) { + ipc_listener *l = arg; return (nni_setopt(ipc_listener_options, name, l, buf, sz, t)); } int -nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) +ipc_listener_listen(void *arg) { + ipc_listener * l = arg; socklen_t len; struct sockaddr_storage ss; int rv; @@ -290,7 +284,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) nni_posix_pfd * pfd; char * path; - if (((len = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + if (((len = nni_posix_nn2sockaddr(&ss, &l->sa)) == 0) || (ss.ss_family != AF_UNIX)) { return (NNG_EADDRINVAL); } @@ -304,7 +298,7 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) nni_mtx_unlock(&l->mtx); return (NNG_ECLOSED); } - path = nni_strdup(sa->s_ipc.sa_path); + path = nni_strdup(l->sa.s_ipc.sa_path); if (path == NULL) { return (NNG_ENOMEM); } @@ -352,15 +346,15 @@ nni_ipc_listener_listen(nni_ipc_listener *l, const nni_sockaddr *sa) l->pfd = pfd; l->started = true; l->path = path; - l->sa = *sa; nni_mtx_unlock(&l->mtx); return (0); } -void -nni_ipc_listener_fini(nni_ipc_listener *l) +static void +ipc_listener_free(void *arg) { + ipc_listener * l = arg; nni_posix_pfd *pfd; nni_mtx_lock(&l->mtx); @@ -375,10 +369,11 @@ nni_ipc_listener_fini(nni_ipc_listener *l) NNI_FREE_STRUCT(l); } -void -nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio) +static void +ipc_listener_accept(void *arg, nni_aio *aio) { - int rv; + ipc_listener *l = arg; + int rv; // Accept is simpler than the connect case. With accept we just // need to wait for the socket to be readable to indicate an incoming @@ -410,3 +405,69 @@ nni_ipc_listener_accept(nni_ipc_listener *l, nni_aio *aio) } nni_mtx_unlock(&l->mtx); } + +int +nni_ipc_listener_alloc(nng_stream_listener **lp, const nng_url *url) +{ + ipc_listener *l; + + if ((strcmp(url->u_scheme, "ipc") != 0) || (url->u_path == NULL) || + (strlen(url->u_path) == 0) || + (strlen(url->u_path) >= NNG_MAXADDRLEN)) { + return (NNG_EADDRINVAL); + } + + if ((l = NNI_ALLOC_STRUCT(l)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_init(&l->mtx); + nni_aio_list_init(&l->acceptq); + + l->pfd = NULL; + l->closed = false; + l->started = false; + l->perms = 0; + l->sa.s_ipc.sa_family = NNG_AF_IPC; + strcpy(l->sa.s_ipc.sa_path, url->u_path); + l->sl.sl_free = ipc_listener_free; + l->sl.sl_close = ipc_listener_close; + l->sl.sl_listen = ipc_listener_listen; + l->sl.sl_accept = ipc_listener_accept; + l->sl.sl_getx = ipc_listener_getx; + l->sl.sl_setx = ipc_listener_setx; + + *lp = (void *) l; + return (0); +} + +static int +ipc_check_perms(const void *buf, size_t sz, nni_type t) +{ + int32_t mode; + int rv; + + if ((rv = nni_copyin_int(&mode, buf, sz, 0, S_IFMT, t)) != 0) { + return (rv); + } + if ((mode & S_IFMT) != 0) { + return (NNG_EINVAL); + } + return (0); +} + +static const nni_chkoption ipc_chkopts[] = { + { + .o_name = NNG_OPT_IPC_PERMISSIONS, + .o_check = ipc_check_perms, + }, + { + .o_name = NULL, + }, +}; + +int +nni_ipc_checkopt(const char *name, const void *data, size_t sz, nni_type t) +{ + return (nni_chkopt(ipc_chkopts, name, data, sz, t)); +} diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index b4d63b59..bb6db188 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a @@ -321,10 +321,11 @@ resolv_worker(void *notused) // Check to make sure we were not canceled. if ((aio = item->aio) != NULL) { - nng_sockaddr *sa = nni_aio_get_input(aio, 0); + nni_aio_set_prov_extra(aio, 0, NULL); item->aio = NULL; - memcpy(sa, &item->sa, sizeof(*sa)); + + nni_aio_set_sockaddr(aio, &item->sa); nni_aio_finish(aio, rv, 0); NNI_FREE_STRUCT(item); diff --git a/src/platform/posix/posix_tcp.h b/src/platform/posix/posix_tcp.h index 788fcbf8..1638df61 100644 --- a/src/platform/posix/posix_tcp.h +++ b/src/platform/posix/posix_tcp.h @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2018 Devolutions <info@devolutions.net> // @@ -14,6 +14,7 @@ #include "platform/posix/posix_aio.h" struct nni_tcp_conn { + nng_stream stream; nni_posix_pfd * pfd; nni_list readq; nni_list writeq; @@ -23,26 +24,5 @@ struct nni_tcp_conn { nni_tcp_dialer *dialer; nni_reap_item reap; }; - -struct nni_tcp_dialer { - nni_list connq; // pending connections - bool closed; - bool nodelay; - bool keepalive; - struct sockaddr_storage src; - size_t srclen; - nni_mtx mtx; -}; - -struct nni_tcp_listener { - nni_posix_pfd *pfd; - nni_list acceptq; - bool started; - bool closed; - bool nodelay; - bool keepalive; - nni_mtx mtx; -}; - -extern int nni_posix_tcp_conn_init(nni_tcp_conn **, nni_posix_pfd *); -extern void nni_posix_tcp_conn_start(nni_tcp_conn *, int, int); +extern int nni_posix_tcp_init(nni_tcp_conn **, nni_posix_pfd *); +extern void nni_posix_tcp_start(nni_tcp_conn *, int, int); diff --git a/src/platform/posix/posix_tcpconn.c b/src/platform/posix/posix_tcpconn.c index ef6ee8e3..0d3c274d 100644 --- a/src/platform/posix/posix_tcpconn.c +++ b/src/platform/posix/posix_tcpconn.c @@ -1,5 +1,5 @@ // -// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2019 Staysail Systems, Inc. <info@staysail.tech> // Copyright 2018 Capitar IT Group BV <info@capitar.com> // Copyright 2019 Devolutions <info@devolutions.net> // @@ -36,7 +36,7 @@ #include "posix_tcp.h" static void -tcp_conn_dowrite(nni_tcp_conn *c) +tcp_dowrite(nni_tcp_conn *c) { nni_aio *aio; int fd; @@ -118,7 +118,7 @@ tcp_conn_dowrite(nni_tcp_conn *c) } static void -tcp_conn_doread(nni_tcp_conn *c) +tcp_doread(nni_tcp_conn *c) { nni_aio *aio; int fd; @@ -195,9 +195,10 @@ tcp_conn_doread(nni_tcp_conn *c) } } -void -nni_tcp_conn_close(nni_tcp_conn *c) +static void +tcp_close(void *arg) { + nni_tcp_conn *c = arg; nni_mtx_lock(&c->mtx); if (!c->closed) { nni_aio *aio; @@ -212,21 +213,44 @@ nni_tcp_conn_close(nni_tcp_conn *c) nni_mtx_unlock(&c->mtx); } +// tcp_fini may block briefly waiting for the pollq thread. +// To get that out of our context, we simply reap this. +static void +tcp_fini(void *arg) +{ + nni_tcp_conn *c = arg; + tcp_close(c); + nni_posix_pfd_fini(c->pfd); + nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN + c->pfd = NULL; + nni_mtx_unlock(&c->mtx); + nni_mtx_fini(&c->mtx); + + NNI_FREE_STRUCT(c); +} + static void -tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) +tcp_free(void *arg) +{ + nni_tcp_conn *c = arg; + nni_reap(&c->reap, tcp_fini, arg); +} + +static void +tcp_cb(nni_posix_pfd *pfd, int events, void *arg) { nni_tcp_conn *c = arg; if (events & (POLLHUP | POLLERR | POLLNVAL)) { - nni_tcp_conn_close(c); + tcp_close(c); return; } nni_mtx_lock(&c->mtx); if (events & POLLIN) { - tcp_conn_doread(c); + tcp_doread(c); } if (events & POLLOUT) { - tcp_conn_dowrite(c); + tcp_dowrite(c); } events = 0; if (!nni_list_empty(&c->writeq)) { @@ -242,7 +266,7 @@ tcp_conn_cb(nni_posix_pfd *pfd, int events, void *arg) } static void -tcp_conn_cancel(nni_aio *aio, void *arg, int rv) +tcp_cancel(nni_aio *aio, void *arg, int rv) { nni_tcp_conn *c = arg; @@ -254,18 +278,18 @@ tcp_conn_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&c->mtx); } -void -nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) +static void +tcp_send(void *arg, nni_aio *aio) { - - int rv; + nni_tcp_conn *c = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; @@ -273,7 +297,7 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) nni_aio_list_append(&c->writeq, aio); if (nni_list_first(&c->writeq) == aio) { - tcp_conn_dowrite(c); + tcp_dowrite(c); // If we are still the first thing on the list, that // means we didn't finish the job, so arm the poller to // complete us. @@ -284,17 +308,18 @@ nni_tcp_conn_send(nni_tcp_conn *c, nni_aio *aio) nni_mtx_unlock(&c->mtx); } -void -nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) +static void +tcp_recv(void *arg, nni_aio *aio) { - int rv; + nni_tcp_conn *c = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, tcp_conn_cancel, c)) != 0) { + if ((rv = nni_aio_schedule(aio, tcp_cancel, c)) != 0) { nni_mtx_unlock(&c->mtx); nni_aio_finish_error(aio, rv); return; @@ -306,7 +331,7 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) // many cases. We also need not arm a list if it was already // armed. if (nni_list_first(&c->readq) == aio) { - tcp_conn_doread(c); + tcp_doread(c); // If we are still the first thing on the list, that // means we didn't finish the job, so arm the poller to // complete us. @@ -317,59 +342,8 @@ nni_tcp_conn_recv(nni_tcp_conn *c, nni_aio *aio) nni_mtx_unlock(&c->mtx); } -int -nni_tcp_conn_peername(nni_tcp_conn *c, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); - - if (getpeername(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_tcp_conn_sockname(nni_tcp_conn *c, nni_sockaddr *sa) -{ - struct sockaddr_storage ss; - socklen_t sslen = sizeof(ss); - int fd = nni_posix_pfd_fd(c->pfd); - - if (getsockname(fd, (void *) &ss, &sslen) != 0) { - return (nni_plat_errno(errno)); - } - return (nni_posix_sockaddr2nn(sa, &ss)); -} - -int -nni_tcp_conn_set_keepalive(nni_tcp_conn *c, bool keep) -{ - int val = keep ? 1 : 0; - int fd = nni_posix_pfd_fd(c->pfd); - - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - -int -nni_tcp_conn_set_nodelay(nni_tcp_conn *c, bool nodelay) -{ - - int val = nodelay ? 1 : 0; - int fd = nni_posix_pfd_fd(c->pfd); - - if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) != 0) { - return (nni_plat_errno(errno)); - } - return (0); -} - static int -tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_peername(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn * c = arg; struct sockaddr_storage ss; @@ -388,7 +362,7 @@ tcp_conn_get_peername(void *arg, void *buf, size_t *szp, nni_type t) } static int -tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn * c = arg; struct sockaddr_storage ss; @@ -407,7 +381,7 @@ tcp_conn_get_sockname(void *arg, void *buf, size_t *szp, nni_type t) } static int -tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t) +tcp_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t) { nni_tcp_conn *c = arg; int fd; @@ -427,7 +401,7 @@ tcp_conn_set_nodelay(void *arg, const void *buf, size_t sz, nni_type t) } static int -tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t) +tcp_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t) { nni_tcp_conn *c = arg; int fd; @@ -447,7 +421,7 @@ tcp_conn_set_keepalive(void *arg, const void *buf, size_t sz, nni_type t) } static int -tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; int fd = nni_posix_pfd_fd(c->pfd); @@ -462,7 +436,7 @@ tcp_conn_get_nodelay(void *arg, void *buf, size_t *szp, nni_type t) } static int -tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) +tcp_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) { nni_tcp_conn *c = arg; int fd = nni_posix_pfd_fd(c->pfd); @@ -476,46 +450,46 @@ tcp_conn_get_keepalive(void *arg, void *buf, size_t *szp, nni_type t) return (nni_copyout_bool(val, buf, szp, t)); } -static const nni_option tcp_conn_options[] = { +static const nni_option tcp_options[] = { { .o_name = NNG_OPT_REMADDR, - .o_get = tcp_conn_get_peername, + .o_get = tcp_get_peername, }, { .o_name = NNG_OPT_LOCADDR, - .o_get = tcp_conn_get_sockname, + .o_get = tcp_get_sockname, }, { .o_name = NNG_OPT_TCP_NODELAY, - .o_get = tcp_conn_get_nodelay, - .o_set = tcp_conn_set_nodelay, + .o_get = tcp_get_nodelay, + .o_set = tcp_set_nodelay, }, { .o_name = NNG_OPT_TCP_KEEPALIVE, - .o_get = tcp_conn_get_keepalive, - .o_set = tcp_conn_set_keepalive, + .o_get = tcp_get_keepalive, + .o_set = tcp_set_keepalive, }, { .o_name = NULL, }, }; -int -nni_tcp_conn_getopt( - nni_tcp_conn *c, const char *name, void *buf, size_t *szp, nni_type t) +static int +tcp_getx(void *arg, const char *name, void *buf, size_t *szp, nni_type t) { - return (nni_getopt(tcp_conn_options, name, c, buf, szp, t)); + nni_tcp_conn *c = arg; + return (nni_getopt(tcp_options, name, c, buf, szp, t)); } -int -nni_tcp_conn_setopt( - nni_tcp_conn *c, const char *name, const void *buf, size_t sz, nni_type t) +static int +tcp_setx(void *arg, const char *name, const void *buf, size_t sz, nni_type t) { - return (nni_setopt(tcp_conn_options, name, c, buf, sz, t)); + nni_tcp_conn *c = arg; + return (nni_setopt(tcp_options, name, c, buf, sz, t)); } int -nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) +nni_posix_tcp_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) { nni_tcp_conn *c; @@ -530,12 +504,19 @@ nni_posix_tcp_conn_init(nni_tcp_conn **cp, nni_posix_pfd *pfd) nni_aio_list_init(&c->readq); nni_aio_list_init(&c->writeq); + c->stream.s_free = tcp_free; + c->stream.s_close = tcp_close; + c->stream.s_recv = tcp_recv; + c->stream.s_send = tcp_send; + c->stream.s_getx = tcp_getx; + c->stream.s_setx = tcp_setx; + *cp = c; return (0); } void -nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive) +nni_posix_tcp_start(nni_tcp_conn *c, int nodelay, int keepalive) { // Configure the initial socket options. (void) setsockopt(nni_posix_pfd_fd(c->pfd), IPPROTO_TCP, TCP_NODELAY, @@ -543,18 +524,5 @@ nni_posix_tcp_conn_start(nni_tcp_conn *c, int nodelay, int keepalive) (void) setsockopt(nni_posix_pfd_fd(c->pfd), SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(int)); - nni_posix_pfd_set_cb(c->pfd, tcp_conn_cb, c); -} - -void -nni_tcp_conn_fini(nni_tcp_conn *c) -{ - nni_tcp_conn_close(c); - nni_posix_pfd_fini(c->pfd); - nni_mtx_lock(&c->mtx); // not strictly needed, but shut up TSAN - c->pfd = NULL; - nni_mtx_unlock(&c->mtx); - nni_mtx_fini(&c->mtx); - - NNI_FREE_STRUCT(c); + nni_posix_pfd_set_cb(c->pfd, tcp_cb, c); } diff --git a/src/platform/posix/posix_tcpdial.c b/src/platform/posix/posix_tcpdial.c index cfb3482c..21ad862d 100644 --- a/src/platform/posix/posix_tcpdial.c +++ b/src/platform/posix/posix_tcpdial.c @@ -29,6 +29,16 @@ #include "posix_tcp.h" +struct nni_tcp_dialer { + nni_list connq; // pending connections + bool closed; + bool nodelay; + bool keepalive; + struct sockaddr_storage src; + size_t srclen; + nni_mtx mtx; +}; + // Dialer stuff. int nni_tcp_dialer_init(nni_tcp_dialer **dp) @@ -58,9 +68,8 @@ nni_tcp_dialer_close(nni_tcp_dialer *d) if ((c = nni_aio_get_prov_extra(aio, 0)) != NULL) { c->dial_aio = NULL; nni_aio_set_prov_extra(aio, 0, NULL); - nni_tcp_conn_close(c); - nni_reap( - &c->reap, (nni_cb) nni_tcp_conn_fini, c); + nng_stream_close(&c->stream); + nng_stream_free(&c->stream); } nni_aio_finish_error(aio, NNG_ECLOSED); } @@ -94,7 +103,7 @@ tcp_dialer_cancel(nni_aio *aio, void *arg, int rv) nni_mtx_unlock(&d->mtx); nni_aio_finish_error(aio, rv); - nni_tcp_conn_fini(c); + nng_stream_free(&c->stream); } static void @@ -142,13 +151,13 @@ tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) nni_mtx_unlock(&d->mtx); if (rv != 0) { - nni_tcp_conn_close(c); - nni_tcp_conn_fini(c); + nng_stream_close(&c->stream); + nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); return; } - nni_posix_tcp_conn_start(c, nd, ka); + nni_posix_tcp_start(c, nd, ka); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); } @@ -156,7 +165,7 @@ tcp_dialer_cb(nni_posix_pfd *pfd, int ev, void *arg) // We don't give local address binding support. Outbound dialers always // get an ephemeral port. void -nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) +nni_tcp_dial(nni_tcp_dialer *d, nni_aio *aio) { nni_tcp_conn * c; nni_posix_pfd * pfd = NULL; @@ -166,12 +175,14 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) int rv; int ka; int nd; + nng_sockaddr sa; if (nni_aio_begin(aio) != 0) { return; } - if (((sslen = nni_posix_nn2sockaddr(&ss, sa)) == 0) || + nni_aio_get_sockaddr(aio, &sa); + if (((sslen = nni_posix_nn2sockaddr(&ss, &sa)) == 0) || ((ss.ss_family != AF_INET) && (ss.ss_family != AF_INET6))) { nni_aio_finish_error(aio, NNG_EADDRINVAL); return; @@ -189,7 +200,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + if ((rv = nni_posix_tcp_init(&c, pfd)) != 0) { nni_posix_pfd_fini(pfd); nni_aio_finish_error(aio, rv); return; @@ -232,7 +243,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) nd = d->nodelay ? 1 : 0; ka = d->keepalive ? 1 : 0; nni_mtx_unlock(&d->mtx); - nni_posix_tcp_conn_start(c, nd, ka); + nni_posix_tcp_start(c, nd, ka); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); return; @@ -240,7 +251,7 @@ nni_tcp_dialer_dial(nni_tcp_dialer *d, const nni_sockaddr *sa, nni_aio *aio) error: nni_aio_set_prov_extra(aio, 0, NULL); nni_mtx_unlock(&d->mtx); - nni_reap(&c->reap, (nni_cb) nni_tcp_conn_fini, c); + nng_stream_free(&c->stream); nni_aio_finish_error(aio, rv); } diff --git a/src/platform/posix/posix_tcplisten.c b/src/platform/posix/posix_tcplisten.c index 1e1b84b1..1edeccbc 100644 --- a/src/platform/posix/posix_tcplisten.c +++ b/src/platform/posix/posix_tcplisten.c @@ -29,6 +29,16 @@ #include "posix_tcp.h" +struct nni_tcp_listener { + nni_posix_pfd *pfd; + nni_list acceptq; + bool started; + bool closed; + bool nodelay; + bool keepalive; + nni_mtx mtx; +}; + int nni_tcp_listener_init(nni_tcp_listener **lp) { @@ -133,7 +143,7 @@ tcp_listener_doaccept(nni_tcp_listener *l) continue; } - if ((rv = nni_posix_tcp_conn_init(&c, pfd)) != 0) { + if ((rv = nni_posix_tcp_init(&c, pfd)) != 0) { nni_posix_pfd_fini(pfd); nni_aio_list_remove(aio); nni_aio_finish_error(aio, rv); @@ -143,7 +153,7 @@ tcp_listener_doaccept(nni_tcp_listener *l) ka = l->keepalive ? 1 : 0; nd = l->nodelay ? 1 : 0; nni_aio_list_remove(aio); - nni_posix_tcp_conn_start(c, nd, ka); + nni_posix_tcp_start(c, nd, ka); nni_aio_set_output(aio, 0, c); nni_aio_finish(aio, 0, 0); } |
