diff options
| -rw-r--r-- | src/core/transport.h | 11 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 9 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 10 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 10 |
4 files changed, 21 insertions, 19 deletions
diff --git a/src/core/transport.h b/src/core/transport.h index 05f2e7f7..dd4da299 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -89,18 +89,25 @@ struct nni_tran_pipe { // further calls on the same pipe. void (*p_fini)(void *); + // p_start starts the pipe running. This gives the transport a + // chance to hook into any transport specific negotiation phase. + // The pipe will not have its p_send or p_recv calls started, and + // will not be access by the "socket" until the pipe has indicated + // its readiness by finishing the aio. + void (*p_start)(void *, nni_aio *); + // p_aio_send queues the message for transmit. If this fails, then // the caller may try again with the same message (or free it). If // the call succeeds, then the transport has taken ownership of the // message, and the caller may not use it again. The transport will // have the responsibility to free the message (nng_msg_free()) when // it is finished with it. - int (*p_send)(void *, nni_aio *); + void (*p_send)(void *, nni_aio *); // p_recv schedules a message receive. This will be performed even for // cases where no data is expected, to allow detection of a remote // disconnect. - int (*p_recv)(void *, nni_aio *); + void (*p_recv)(void *, nni_aio *); // p_close closes the pipe. Further recv or send operations should // return back NNG_ECLOSED. diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index b1208e25..52ed582a 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -154,7 +154,7 @@ nni_inproc_pipe_fini(void *arg) } -static int +static void nni_inproc_pipe_send(void *arg, nni_aio *aio) { nni_inproc_pipe *pipe = arg; @@ -168,21 +168,20 @@ nni_inproc_pipe_send(void *arg, nni_aio *aio) h = nni_msg_header(msg); l = nni_msg_header_len(msg); if ((rv = nni_msg_prepend(msg, h, l)) != 0) { - return (rv); + nni_aio_finish(aio, rv, aio->a_count); + return; } nni_msg_trunc_header(msg, l); nni_msgq_aio_put(pipe->wq, aio); - return (0); } -static int +static void nni_inproc_pipe_recv(void *arg, nni_aio *aio) { nni_inproc_pipe *pipe = arg; nni_msgq_aio_get(pipe->rq, aio); - return (0); } diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 3d8c4466..7a486b4d 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -339,7 +339,7 @@ nni_ipc_cancel_tx(nni_aio *aio) } -static int +static void nni_ipc_pipe_send(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; @@ -351,7 +351,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) nni_mtx_lock(&pipe->mtx); if (nni_aio_start(aio, nni_ipc_cancel_tx, pipe) != 0) { nni_mtx_unlock(&pipe->mtx); - return (0); + return; } pipe->user_txaio = aio; @@ -369,7 +369,6 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) nni_plat_ipc_send(pipe->isp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); - return (0); } @@ -387,7 +386,7 @@ nni_ipc_cancel_rx(nni_aio *aio) } -static int +static void nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; @@ -396,7 +395,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) if (nni_aio_start(aio, nni_ipc_cancel_rx, pipe) != 0) { nni_mtx_unlock(&pipe->mtx); - return (0); + return; } pipe->user_rxaio = aio; @@ -409,7 +408,6 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) nni_plat_ipc_recv(pipe->isp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); - return (0); } diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index b6520080..86b151cf 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -325,7 +325,7 @@ nni_tcp_cancel_tx(nni_aio *aio) } -static int +static void nni_tcp_pipe_send(void *arg, nni_aio *aio) { nni_tcp_pipe *pipe = arg; @@ -338,7 +338,7 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) if (nni_aio_start(aio, nni_tcp_cancel_tx, pipe) != 0) { nni_mtx_unlock(&pipe->mtx); - return (0); + return; } pipe->user_txaio = aio; @@ -355,7 +355,6 @@ nni_tcp_pipe_send(void *arg, nni_aio *aio) nni_plat_tcp_aio_send(pipe->tsp, &pipe->txaio); nni_mtx_unlock(&pipe->mtx); - return (0); } @@ -373,7 +372,7 @@ nni_tcp_cancel_rx(nni_aio *aio) } -static int +static void nni_tcp_pipe_recv(void *arg, nni_aio *aio) { nni_tcp_pipe *pipe = arg; @@ -382,7 +381,7 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) if (nni_aio_start(aio, nni_tcp_cancel_rx, pipe) != 0) { nni_mtx_unlock(&pipe->mtx); - return (0); + return; } pipe->user_rxaio = aio; @@ -395,7 +394,6 @@ nni_tcp_pipe_recv(void *arg, nni_aio *aio) nni_plat_tcp_aio_recv(pipe->tsp, &pipe->rxaio); nni_mtx_unlock(&pipe->mtx); - return (0); } |
