diff options
| author | Garrett D'Amore <garrett@damore.org> | 2019-02-18 20:15:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2019-02-23 11:50:17 -0800 |
| commit | 64e784237d143aa032311942bc44abd22e1e4114 (patch) | |
| tree | e3ea529d5e5adfd022773ab207622cc2247f057c /src/transport/ipc | |
| parent | d210ef96517c1462bc058c95bced8c27b5e19c4f (diff) | |
| download | nng-64e784237d143aa032311942bc44abd22e1e4114.tar.gz nng-64e784237d143aa032311942bc44abd22e1e4114.tar.bz2 nng-64e784237d143aa032311942bc44abd22e1e4114.zip | |
fixes #848 server hang waiting for client handshake
fixes #698 Need TCP stats
fixes #699 Need IPC stats
fixes #701 Need TLS stats
This commit addresses a problem when negotiating using one of the stream
based negotiation APIs -- a slow or misbehaving peer can prevent well
behaved ones from establishing a connection. The fix is a fairly
significant change in how these transports link up, and it does rely
on the fact that the socket only has a single accept() or connect()
pending at a time (on a given endpoint that is).
While here, we have completely revamped the way transport statistics are
done, offering a standard API for collecting these statistics.
Unfortunately, this completely borks the statistics for inproc. As we
are planning to change the way inproc works soon, in order to provide
more control and work on performance fixes for the message queue, we feel
this is an acceptable trade-off. Furthermore, almost nobody uses inproc
for anything, and even fewer people are making use of the statistics
at this time.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 476 |
1 files changed, 321 insertions, 155 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index e0d83be0..0d8f12ae 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -38,23 +38,20 @@ struct ipctran_pipe { nni_list_node node; nni_atomic_flag reaped; nni_reap_item reap; - - uint8_t txhead[1 + sizeof(uint64_t)]; - uint8_t rxhead[1 + sizeof(uint64_t)]; - size_t gottxhead; - size_t gotrxhead; - size_t wanttxhead; - size_t wantrxhead; - - nni_list recvq; - nni_list sendq; - nni_aio *useraio; - nni_aio *txaio; - nni_aio *rxaio; - nni_aio *negoaio; - nni_aio *connaio; - nni_msg *rxmsg; - nni_mtx mtx; + uint8_t txhead[1 + sizeof(uint64_t)]; + uint8_t rxhead[1 + sizeof(uint64_t)]; + size_t gottxhead; + size_t gotrxhead; + size_t wanttxhead; + size_t wantrxhead; + nni_list recvq; + nni_list sendq; + nni_aio * useraio; + nni_aio * txaio; + nni_aio * rxaio; + nni_aio * negoaio; + nni_msg * rxmsg; + nni_mtx mtx; }; struct ipctran_ep { @@ -62,13 +59,22 @@ struct ipctran_ep { nni_sockaddr sa; size_t rcvmax; uint16_t proto; - nni_list pipes; + bool started; + bool closed; bool fini; + int refcnt; nng_stream_dialer * dialer; nng_stream_listener *listener; + nni_aio * useraio; + nni_aio * connaio; + nni_aio * timeaio; + nni_list busypipes; // busy pipes -- ones passed to socket + nni_list waitpipes; // pipes waiting to match to socket + nni_list negopipes; // pipes busy negotiating nni_reap_item reap; nni_dialer * ndialer; nni_listener * nlistener; + nni_stat_item st_rcvmaxsz; }; static void ipctran_pipe_send_start(ipctran_pipe *); @@ -76,7 +82,6 @@ static void ipctran_pipe_recv_start(ipctran_pipe *); static void ipctran_pipe_send_cb(void *); static void ipctran_pipe_recv_cb(void *); static void ipctran_pipe_nego_cb(void *); -static void ipctran_pipe_conn_cb(void *); static void ipctran_ep_fini(void *); static int @@ -102,7 +107,6 @@ ipctran_pipe_close(void *arg) nni_aio_close(p->rxaio); nni_aio_close(p->txaio); nni_aio_close(p->negoaio); - nni_aio_close(p->connaio); nng_stream_close(p->conn); } @@ -115,7 +119,6 @@ ipctran_pipe_stop(void *arg) nni_aio_stop(p->rxaio); nni_aio_stop(p->txaio); nni_aio_stop(p->negoaio); - nni_aio_stop(p->connaio); } static int @@ -135,8 +138,9 @@ ipctran_pipe_fini(void *arg) ipctran_pipe_stop(p); if ((ep = p->ep) != NULL) { nni_mtx_lock(&ep->mtx); - nni_list_remove(&ep->pipes, p); - if (ep->fini && nni_list_empty(&ep->pipes)) { + nni_list_node_remove(&p->node); + ep->refcnt--; + if (ep->fini && (ep->refcnt == 0)) { nni_reap(&ep->reap, ipctran_ep_fini, ep); } nni_mtx_unlock(&ep->mtx); @@ -144,7 +148,6 @@ ipctran_pipe_fini(void *arg) nni_aio_fini(p->rxaio); nni_aio_fini(p->txaio); nni_aio_fini(p->negoaio); - nni_aio_fini(p->connaio); nng_stream_free(p->conn); if (p->rxmsg) { nni_msg_free(p->rxmsg); @@ -165,7 +168,7 @@ ipctran_pipe_reap(ipctran_pipe *p) } static int -ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep) +ipctran_pipe_alloc(ipctran_pipe **pipep) { ipctran_pipe *p; int rv; @@ -176,7 +179,6 @@ ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep) nni_mtx_init(&p->mtx); if (((rv = nni_aio_init(&p->txaio, ipctran_pipe_send_cb, p)) != 0) || ((rv = nni_aio_init(&p->rxaio, ipctran_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->connaio, ipctran_pipe_conn_cb, p)) != 0) || ((rv = nni_aio_init(&p->negoaio, ipctran_pipe_nego_cb, p)) != 0)) { ipctran_pipe_fini(p); return (rv); @@ -184,81 +186,38 @@ ipctran_pipe_alloc(ipctran_pipe **pipep, ipctran_ep *ep) nni_aio_list_init(&p->sendq); nni_aio_list_init(&p->recvq); nni_atomic_flag_reset(&p->reaped); - nni_list_append(&ep->pipes, p); - - // 5 seconds each for connection and negotiation; should be more than - // sufficient. - nni_aio_set_timeout(p->connaio, 5000); - nni_aio_set_timeout(p->negoaio, 5000); - - p->proto = ep->proto; - p->rcvmax = ep->rcvmax; - p->ep = ep; - *pipep = p; return (0); } static void -ipctran_pipe_conn_cb(void *arg) +ipctran_ep_match(ipctran_ep *ep) { - ipctran_pipe *p = arg; - ipctran_ep * ep = p->ep; - nni_aio * aio = p->connaio; - nni_aio * uaio; - nni_iov iov; - int rv; + nni_aio * aio; + ipctran_pipe *p; - nni_mtx_lock(&ep->mtx); - uaio = p->useraio; - if ((rv = nni_aio_result(aio)) == 0) { - p->conn = nni_aio_get_output(aio, 0); - } - if (uaio == NULL) { - nni_mtx_unlock(&ep->mtx); - ipctran_pipe_reap(p); + if (((aio = ep->useraio) == NULL) || + ((p = nni_list_first(&ep->waitpipes)) == NULL)) { return; } - if (rv != 0) { - p->useraio = NULL; - nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(uaio, rv); - ipctran_pipe_reap(p); - return; - } - p->conn = nni_aio_get_output(aio, 0); - p->txhead[0] = 0; - p->txhead[1] = 'S'; - p->txhead[2] = 'P'; - p->txhead[3] = 0; - NNI_PUT16(&p->txhead[4], p->proto); - NNI_PUT16(&p->txhead[6], 0); - - p->gotrxhead = 0; - p->gottxhead = 0; - p->wantrxhead = 8; - p->wanttxhead = 8; - iov.iov_len = 8; - iov.iov_buf = &p->txhead[0]; - nni_aio_set_iov(p->negoaio, 1, &iov); - nng_stream_send(p->conn, p->negoaio); - nni_mtx_unlock(&ep->mtx); + nni_list_remove(&ep->waitpipes, p); + nni_list_append(&ep->busypipes, p); + ep->useraio = NULL; + p->rcvmax = ep->rcvmax; + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); } static void ipctran_pipe_nego_cb(void *arg) { ipctran_pipe *p = arg; + ipctran_ep * ep = p->ep; nni_aio * aio = p->negoaio; nni_aio * uaio; int rv; - nni_mtx_lock(&p->ep->mtx); - if ((uaio = p->useraio) == NULL) { - nni_mtx_unlock(&p->ep->mtx); - ipctran_pipe_reap(p); - return; - } + nni_mtx_lock(&ep->mtx); if ((rv = nni_aio_result(aio)) != 0) { goto error; } @@ -298,16 +257,33 @@ ipctran_pipe_nego_cb(void *arg) } NNI_GET16(&p->rxhead[4], p->peer); - p->useraio = NULL; - nni_mtx_unlock(&p->ep->mtx); - nni_aio_set_output(uaio, 0, p); - nni_aio_finish(uaio, 0, 0); + + // We are all ready now. We put this in the wait list, and + // then try to run the matcher. + nni_list_remove(&ep->negopipes, p); + nni_list_append(&ep->waitpipes, p); + + ipctran_ep_match(ep); + nni_mtx_unlock(&ep->mtx); return; error: p->useraio = NULL; - nni_mtx_unlock(&p->ep->mtx); - nni_aio_finish_error(uaio, rv); + + if (ep->ndialer != NULL) { + nni_dialer_bump_error(ep->ndialer, rv); + } else { + nni_listener_bump_error(ep->nlistener, rv); + } + + nng_stream_close(p->conn); + // If we are waiting to negotiate on a client side, then a failure + // here has to be passed to the user app. + if ((ep->dialer != NULL) && ((uaio = ep->useraio) != NULL)) { + ep->useraio = NULL; + nni_aio_finish_error(uaio, rv); + } + nni_mtx_unlock(&ep->mtx); ipctran_pipe_reap(p); } @@ -323,6 +299,7 @@ ipctran_pipe_send_cb(void *arg) nni_mtx_lock(&p->mtx); if ((rv = nni_aio_result(txaio)) != 0) { + nni_pipe_bump_error(p->npipe, rv); // Intentionally we do not queue up another transfer. // There's an excellent chance that the pipe is no longer // usable, with a partial transfer. @@ -349,10 +326,11 @@ ipctran_pipe_send_cb(void *arg) nni_aio_list_remove(aio); ipctran_pipe_send_start(p); - nni_mtx_unlock(&p->mtx); - msg = nni_aio_get_msg(aio); n = nni_msg_len(msg); + nni_pipe_bump_tx(p->npipe, n); + nni_mtx_unlock(&p->mtx); + nni_aio_set_msg(aio, NULL); nni_msg_free(msg); nni_aio_finish_synch(aio, 0, n); @@ -439,11 +417,13 @@ ipctran_pipe_recv_cb(void *arg) nni_aio_list_remove(aio); msg = p->rxmsg; p->rxmsg = NULL; + n = nni_msg_len(msg); + nni_pipe_bump_rx(p->npipe, n); ipctran_pipe_recv_start(p); nni_mtx_unlock(&p->mtx); nni_aio_set_msg(aio, msg); - nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + nni_aio_finish_synch(aio, 0, n); return; error: @@ -453,6 +433,7 @@ error: } msg = p->rxmsg; p->rxmsg = NULL; + nni_pipe_bump_error(p->npipe, rv); // Intentionally, we do not queue up another receive. // The protocol should notice this error and close the pipe. nni_mtx_unlock(&p->mtx); @@ -641,19 +622,65 @@ ipctran_pipe_peer(void *arg) } static void -ipctran_pipe_conn_cancel(nni_aio *aio, void *arg, int rv) +ipctran_pipe_start(ipctran_pipe *p, nng_stream *conn, ipctran_ep *ep) { - ipctran_pipe *p = arg; + nni_iov iov; - nni_mtx_lock(&p->ep->mtx); - if (aio == p->useraio) { - nni_aio_close(p->negoaio); - nni_aio_close(p->connaio); - p->useraio = NULL; - nni_aio_finish_error(aio, rv); - ipctran_pipe_reap(p); + ep->refcnt++; + + p->conn = conn; + p->ep = ep; + p->proto = ep->proto; + + p->txhead[0] = 0; + p->txhead[1] = 'S'; + p->txhead[2] = 'P'; + p->txhead[3] = 0; + NNI_PUT16(&p->txhead[4], p->proto); + NNI_PUT16(&p->txhead[6], 0); + + p->gotrxhead = 0; + p->gottxhead = 0; + p->wantrxhead = 8; + p->wanttxhead = 8; + iov.iov_len = 8; + iov.iov_buf = &p->txhead[0]; + nni_aio_set_iov(p->negoaio, 1, &iov); + nni_list_append(&ep->negopipes, p); + + nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate + nng_stream_send(p->conn, p->negoaio); +} + +static void +ipctran_ep_close(void *arg) +{ + ipctran_ep * ep = arg; + ipctran_pipe *p; + + nni_mtx_lock(&ep->mtx); + ep->closed = true; + nni_aio_close(ep->timeaio); + if (ep->dialer != NULL) { + nng_stream_dialer_close(ep->dialer); + } + if (ep->listener != NULL) { + nng_stream_listener_close(ep->listener); + } + NNI_LIST_FOREACH (&ep->negopipes, p) { + ipctran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->waitpipes, p) { + ipctran_pipe_close(p); + } + NNI_LIST_FOREACH (&ep->busypipes, p) { + ipctran_pipe_close(p); } - nni_mtx_unlock(&p->ep->mtx); + if (ep->useraio != NULL) { + nni_aio_finish_error(ep->useraio, NNG_ECLOSED); + ep->useraio = NULL; + } + nni_mtx_unlock(&ep->mtx); } static void @@ -663,63 +690,163 @@ ipctran_ep_fini(void *arg) nni_mtx_lock(&ep->mtx); ep->fini = true; - if (!nni_list_empty(&ep->pipes)) { + if (ep->refcnt != 0) { nni_mtx_unlock(&ep->mtx); return; } + nni_mtx_unlock(&ep->mtx); + nni_aio_stop(ep->timeaio); + nni_aio_stop(ep->connaio); nng_stream_dialer_free(ep->dialer); nng_stream_listener_free(ep->listener); - nni_mtx_unlock(&ep->mtx); + nni_aio_fini(ep->timeaio); + nni_aio_fini(ep->connaio); nni_mtx_fini(&ep->mtx); NNI_FREE_STRUCT(ep); } static void -ipctran_ep_close(void *arg) +ipctran_timer_cb(void *arg) { - ipctran_ep * ep = arg; + ipctran_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (nni_aio_result(ep->timeaio) == 0) { + nng_stream_listener_accept(ep->listener, ep->connaio); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +ipctran_accept_cb(void *arg) +{ + ipctran_ep * ep = arg; + nni_aio * aio = ep->connaio; ipctran_pipe *p; + int rv; + nng_stream * conn; nni_mtx_lock(&ep->mtx); - NNI_LIST_FOREACH (&ep->pipes, p) { - nni_aio_close(p->negoaio); - nni_aio_close(p->connaio); - nni_aio_close(p->txaio); - nni_aio_close(p->rxaio); - if (p->conn != NULL) { - nng_stream_close(p->conn); + if ((rv = nni_aio_result(aio)) != 0) { + goto error; + } + + conn = nni_aio_get_output(aio, 0); + if ((rv = ipctran_pipe_alloc(&p)) != 0) { + nng_stream_free(conn); + goto error; + } + if (ep->closed) { + ipctran_pipe_fini(p); + nng_stream_free(conn); + rv = NNG_ECLOSED; + goto error; + } + ipctran_pipe_start(p, conn, ep); + nng_stream_listener_accept(ep->listener, ep->connaio); + nni_mtx_unlock(&ep->mtx); + return; + +error: + nni_listener_bump_error(ep->nlistener, rv); + switch (rv) { + + case NNG_ENOMEM: + nng_sleep_aio(10, ep->timeaio); + break; + + default: + if (!ep->closed) { + nng_stream_listener_accept(ep->listener, ep->connaio); } + break; } - if (ep->dialer != NULL) { - nng_stream_dialer_close(ep->dialer); + nni_mtx_unlock(&ep->mtx); +} + +static void +ipctran_dial_cb(void *arg) +{ + ipctran_ep * ep = arg; + nni_aio * aio = ep->connaio; + ipctran_pipe *p; + int rv; + nng_stream * conn; + + if ((rv = nni_aio_result(aio)) != 0) { + goto error; } - if (ep->listener != NULL) { - nng_stream_listener_close(ep->listener); + + conn = nni_aio_get_output(aio, 0); + if ((rv = ipctran_pipe_alloc(&p)) != 0) { + nng_stream_free(conn); + goto error; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + ipctran_pipe_fini(p); + nng_stream_free(conn); + rv = NNG_ECLOSED; + } else { + ipctran_pipe_start(p, conn, ep); + } + nni_mtx_unlock(&ep->mtx); + return; + +error: + // Error connecting. We need to pass this straight back + // to the user. + nni_dialer_bump_error(ep->ndialer, rv); + nni_mtx_lock(&ep->mtx); + if ((aio = ep->useraio) != NULL) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); } nni_mtx_unlock(&ep->mtx); + return; } static int -ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) +ipctran_ep_init(ipctran_ep **epp, nni_sock *sock) { ipctran_ep *ep; - int rv; - nni_sock * sock = nni_dialer_sock(ndialer); if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->pipes, ipctran_pipe, node); + NNI_LIST_INIT(&ep->busypipes, ipctran_pipe, node); + NNI_LIST_INIT(&ep->waitpipes, ipctran_pipe, node); + NNI_LIST_INIT(&ep->negopipes, ipctran_pipe, node); + + ep->proto = nni_sock_proto_id(sock); + + nni_stat_init(&ep->st_rcvmaxsz, "rcvmaxsz", "maximum receive size"); + nni_stat_set_type(&ep->st_rcvmaxsz, NNG_STAT_LEVEL); + nni_stat_set_unit(&ep->st_rcvmaxsz, NNG_UNIT_BYTES); + + *epp = ep; + return (0); +} + +static int +ipctran_ep_init_dialer(void **dp, nni_url *url, nni_dialer *ndialer) +{ + ipctran_ep *ep; + int rv; + nni_sock * sock = nni_dialer_sock(ndialer); - ep->proto = nni_sock_proto_id(sock); + if ((rv = ipctran_ep_init(&ep, sock)) != 0) { + return (rv); + } ep->ndialer = ndialer; - if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) { + if (((rv = nni_aio_init(&ep->connaio, ipctran_dial_cb, ep)) != 0) || + ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) { ipctran_ep_fini(ep); return (rv); } + nni_dialer_add_stat(ndialer, &ep->st_rcvmaxsz); *dp = ep; return (0); } @@ -731,50 +858,71 @@ ipctran_ep_init_listener(void **dp, nni_url *url, nni_listener *nlistener) int rv; nni_sock * sock = nni_listener_sock(nlistener); - if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { - return (NNG_ENOMEM); + if ((rv = ipctran_ep_init(&ep, sock)) != 0) { + return (rv); } - nni_mtx_init(&ep->mtx); - NNI_LIST_INIT(&ep->pipes, ipctran_pipe, node); - - ep->proto = nni_sock_proto_id(sock); ep->nlistener = nlistener; - if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) { + if (((rv = nni_aio_init(&ep->connaio, ipctran_accept_cb, ep)) != 0) || + ((rv = nni_aio_init(&ep->timeaio, ipctran_timer_cb, ep)) != 0) || + ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) { ipctran_ep_fini(ep); return (rv); } + nni_listener_add_stat(nlistener, &ep->st_rcvmaxsz); *dp = ep; return (0); } static void +ipctran_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + ipctran_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (aio == ep->useraio) { + ep->useraio = NULL; + nni_aio_finish_error(aio, rv); + if (ep->ndialer) { + nni_dialer_bump_error(ep->ndialer, rv); + } else { + nni_listener_bump_error(ep->nlistener, rv); + } + } + nni_mtx_unlock(&ep->mtx); +} + +static void ipctran_ep_connect(void *arg, nni_aio *aio) { - ipctran_ep * ep = arg; - ipctran_pipe *p = NULL; - int rv; + ipctran_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&ep->mtx); - if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) { + if (ep->closed) { nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(aio, rv); + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_dialer_bump_error(ep->ndialer, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, ipctran_pipe_conn_cancel, p)) != 0) { - nni_list_remove(&ep->pipes, p); - p->ep = NULL; + if (ep->useraio != NULL) { nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_EBUSY); + nni_dialer_bump_error(ep->ndialer, NNG_EBUSY); + return; + } + + if ((rv = nni_aio_schedule(aio, ipctran_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_dialer_bump_error(ep->ndialer, NNG_EBUSY); nni_aio_finish_error(aio, rv); - ipctran_pipe_fini(p); return; } - p->useraio = aio; - nng_stream_dialer_dial(ep->dialer, p->connaio); + ep->useraio = aio; + nng_stream_dialer_dial(ep->dialer, ep->connaio); nni_mtx_unlock(&ep->mtx); } @@ -795,15 +943,21 @@ ipctran_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_type t) ipctran_ep *ep = arg; size_t val; int rv; - if (((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) && - (ep != NULL)) { + if ((rv = nni_copyin_size(&val, v, sz, 0, NNI_MAXSZ, t)) == 0) { ipctran_pipe *p; nni_mtx_lock(&ep->mtx); ep->rcvmax = val; - NNI_LIST_FOREACH (&ep->pipes, p) { + NNI_LIST_FOREACH (&ep->waitpipes, p) { + p->rcvmax = val; + } + NNI_LIST_FOREACH (&ep->negopipes, p) { + p->rcvmax = val; + } + NNI_LIST_FOREACH (&ep->busypipes, p) { p->rcvmax = val; } + nni_stat_set_value(&ep->st_rcvmaxsz, val); nni_mtx_unlock(&ep->mtx); } return (rv); @@ -816,7 +970,9 @@ ipctran_ep_bind(void *arg) int rv; nni_mtx_lock(&ep->mtx); - rv = nng_stream_listener_listen(ep->listener); + if ((rv = nng_stream_listener_listen(ep->listener)) != 0) { + nni_listener_bump_error(ep->nlistener, rv); + } nni_mtx_unlock(&ep->mtx); return (rv); } @@ -824,29 +980,39 @@ ipctran_ep_bind(void *arg) static void ipctran_ep_accept(void *arg, nni_aio *aio) { - ipctran_ep * ep = arg; - ipctran_pipe *p = NULL; - int rv; + ipctran_ep *ep = arg; + int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&ep->mtx); - if ((rv = ipctran_pipe_alloc(&p, ep)) != 0) { + if (ep->closed) { + nni_aio_finish_error(aio, NNG_ECLOSED); + nni_listener_bump_error(ep->nlistener, NNG_ECLOSED); nni_mtx_unlock(&ep->mtx); - nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_aio_schedule(aio, ipctran_pipe_conn_cancel, p)) != 0) { - nni_list_remove(&ep->pipes, p); - p->ep = NULL; + if (ep->useraio != NULL) { + nni_aio_finish_error(aio, NNG_EBUSY); + nni_listener_bump_error(ep->nlistener, NNG_EBUSY); + nni_mtx_unlock(&ep->mtx); + return; + } + if ((rv = nni_aio_schedule(aio, ipctran_ep_cancel, ep)) != 0) { nni_mtx_unlock(&ep->mtx); nni_aio_finish_error(aio, rv); - ipctran_pipe_fini(p); + nni_listener_bump_error(ep->nlistener, rv); return; } - p->useraio = aio; - nng_stream_listener_accept(ep->listener, p->connaio); + ep->useraio = aio; + if (!ep->started) { + ep->started = true; + nng_stream_listener_accept(ep->listener, ep->connaio); + } else { + ipctran_ep_match(ep); + } + nni_mtx_unlock(&ep->mtx); } |
