diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-01-24 17:38:16 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-02-01 16:11:38 -0800 |
| commit | 3dae30ed5e543dc73fc993334ef56b9b157b9b3c (patch) | |
| tree | d7e294b5d544aa18e8fc8749abfe605a05fa4bd7 /src/transport/ipc | |
| parent | 5914e40c2ff7fcf346c90705785f3fb7650a9fdc (diff) | |
| download | nng-3dae30ed5e543dc73fc993334ef56b9b157b9b3c.tar.gz nng-3dae30ed5e543dc73fc993334ef56b9b157b9b3c.tar.bz2 nng-3dae30ed5e543dc73fc993334ef56b9b157b9b3c.zip | |
fixes #173 Define public HTTP server API
This introduces enough of the HTTP API to support fully server
applications, including creation of websocket style protocols,
pluggable handlers, and so forth.
We have also introduced scatter/gather I/O (rudimentary) for
aios, and made other enhancements to the AIO framework. The
internals of the AIOs themselves are now fully private, and we
have eliminated the aio->a_addr member, with plans to remove the
pipe and possibly message members as well.
A few other minor issues were found and fixed as well.
The HTTP API includes request, response, and connection objects,
which can be used with both servers and clients. It also defines
the HTTP server and handler objects, which support server applications.
Support for client applications will require a client object to be
exposed, and that should be happening shortly.
None of this is "documented" yet, bug again, we will follow up shortly.
Diffstat (limited to 'src/transport/ipc')
| -rw-r--r-- | src/transport/ipc/ipc.c | 124 |
1 files changed, 52 insertions, 72 deletions
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 9e8a3829..6475e43b 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -28,7 +28,7 @@ struct nni_ipc_pipe { uint16_t peer; uint16_t proto; size_t rcvmax; - nng_sockaddr sa; + nni_sockaddr sa; uint8_t txhead[1 + sizeof(uint64_t)]; uint8_t rxhead[1 + sizeof(uint64_t)]; @@ -133,7 +133,7 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) static void nni_ipc_cancel_start(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_negaio != aio) { @@ -143,7 +143,7 @@ nni_ipc_cancel_start(nni_aio *aio, int rv) pipe->user_negaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->negaio, rv); + nni_aio_abort(pipe->negaio, rv); nni_aio_finish_error(aio, rv); } @@ -167,18 +167,20 @@ nni_ipc_pipe_nego_cb(void *arg) } if (pipe->gottxhead < pipe->wanttxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = pipe->wanttxhead - pipe->gottxhead; - aio->a_iov[0].iov_buf = &pipe->txhead[pipe->gottxhead]; + nni_iov iov; + iov.iov_len = pipe->wanttxhead - pipe->gottxhead; + iov.iov_buf = &pipe->txhead[pipe->gottxhead]; + nni_aio_set_iov(aio, 1, &iov); // send it down... nni_plat_ipc_pipe_send(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; } if (pipe->gotrxhead < pipe->wantrxhead) { - aio->a_niov = 1; - aio->a_iov[0].iov_len = pipe->wantrxhead - pipe->gotrxhead; - aio->a_iov[0].iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_iov iov; + iov.iov_len = pipe->wantrxhead - pipe->gotrxhead; + iov.iov_buf = &pipe->rxhead[pipe->gotrxhead]; + nni_aio_set_iov(aio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, aio); nni_mtx_unlock(&pipe->mtx); return; @@ -229,20 +231,8 @@ nni_ipc_pipe_send_cb(void *arg) } n = nni_aio_count(txaio); - while (n) { - NNI_ASSERT(txaio->a_niov != 0); - if (txaio->a_iov[0].iov_len > n) { - txaio->a_iov[0].iov_len -= n; - NNI_INCPTR(txaio->a_iov[0].iov_buf, n); - break; - } - n -= txaio->a_iov[0].iov_len; - for (int i = 0; i < txaio->a_niov; i++) { - txaio->a_iov[i] = txaio->a_iov[i + 1]; - } - txaio->a_niov--; - } - if ((txaio->a_niov != 0) && (txaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(txaio, n); + if (nni_aio_iov_count(txaio) != 0) { nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); return; @@ -281,22 +271,9 @@ nni_ipc_pipe_recv_cb(void *arg) } n = nni_aio_count(rxaio); - while (n) { - NNI_ASSERT(rxaio->a_niov != 0); - if (rxaio->a_iov[0].iov_len > n) { - rxaio->a_iov[0].iov_len -= n; - NNI_INCPTR(rxaio->a_iov[0].iov_buf, n); - break; - } - n -= rxaio->a_iov[0].iov_len; - for (int i = 0; i < rxaio->a_niov; i++) { - rxaio->a_iov[i] = rxaio->a_iov[i + 1]; - } - rxaio->a_niov--; - } - - // Was this a partial read? If so then resubmit for the rest. - if ((rxaio->a_niov != 0) && (rxaio->a_iov[0].iov_len != 0)) { + nni_aio_iov_advance(rxaio, n); + if (nni_aio_iov_count(rxaio) != 0) { + // Was this a partial read? If so then resubmit for the rest. nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; @@ -329,17 +306,17 @@ nni_ipc_pipe_recv_cb(void *arg) // lock for the read side in the future, so that we allow // transmits to proceed normally. In practice this is // unlikely to be much of an issue though. - if ((rv = nng_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { + if ((rv = nni_msg_alloc(&pipe->rxmsg, (size_t) len)) != 0) { goto recv_error; } if (len != 0) { + nni_iov iov; // Submit the rest of the data for a read -- we want to // read the entire message now. - rxaio->a_iov[0].iov_buf = nni_msg_body(pipe->rxmsg); - rxaio->a_iov[0].iov_len = (size_t) len; - rxaio->a_niov = 1; - + iov.iov_buf = nni_msg_body(pipe->rxmsg); + iov.iov_len = (size_t) len; + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); return; @@ -368,7 +345,7 @@ recv_error: static void nni_ipc_cancel_tx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_txaio != aio) { @@ -378,7 +355,7 @@ nni_ipc_cancel_tx(nni_aio *aio, int rv) pipe->user_txaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->txaio, rv); + nni_aio_abort(pipe->txaio, rv); nni_aio_finish_error(aio, rv); } @@ -390,6 +367,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) uint64_t len; nni_aio * txaio; int niov; + nni_iov iov[3]; len = nni_msg_len(msg) + nni_msg_header_len(msg); @@ -404,22 +382,22 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) pipe->txhead[0] = 1; // message type, 1. NNI_PUT64(pipe->txhead + 1, len); - txaio = pipe->txaio; - niov = 0; - txaio->a_iov[niov].iov_buf = pipe->txhead; - txaio->a_iov[niov].iov_len = sizeof(pipe->txhead); + txaio = pipe->txaio; + niov = 0; + iov[0].iov_buf = pipe->txhead; + iov[0].iov_len = sizeof(pipe->txhead); niov++; if (nni_msg_header_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_header(msg); - txaio->a_iov[niov].iov_len = nni_msg_header_len(msg); + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); niov++; } if (nni_msg_len(msg) > 0) { - txaio->a_iov[niov].iov_buf = nni_msg_body(msg); - txaio->a_iov[niov].iov_len = nni_msg_len(msg); + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); niov++; } - txaio->a_niov = niov; + nni_aio_set_iov(txaio, niov, iov); nni_plat_ipc_pipe_send(pipe->ipp, txaio); nni_mtx_unlock(&pipe->mtx); @@ -428,7 +406,7 @@ nni_ipc_pipe_send(void *arg, nni_aio *aio) static void nni_ipc_cancel_rx(nni_aio *aio, int rv) { - nni_ipc_pipe *pipe = aio->a_prov_data; + nni_ipc_pipe *pipe = nni_aio_get_prov_data(aio); nni_mtx_lock(&pipe->mtx); if (pipe->user_rxaio != aio) { @@ -438,7 +416,7 @@ nni_ipc_cancel_rx(nni_aio *aio, int rv) pipe->user_rxaio = NULL; nni_mtx_unlock(&pipe->mtx); - nni_aio_cancel(pipe->rxaio, rv); + nni_aio_abort(pipe->rxaio, rv); nni_aio_finish_error(aio, rv); } @@ -447,6 +425,7 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) { nni_ipc_pipe *pipe = arg; nni_aio * rxaio; + nni_iov iov; nni_mtx_lock(&pipe->mtx); @@ -459,10 +438,10 @@ nni_ipc_pipe_recv(void *arg, nni_aio *aio) NNI_ASSERT(pipe->rxmsg == NULL); // Schedule a read of the IPC header. - rxaio = pipe->rxaio; - rxaio->a_iov[0].iov_buf = pipe->rxhead; - rxaio->a_iov[0].iov_len = sizeof(pipe->rxhead); - rxaio->a_niov = 1; + rxaio = pipe->rxaio; + iov.iov_buf = pipe->rxhead; + iov.iov_len = sizeof(pipe->rxhead); + nni_aio_set_iov(rxaio, 1, &iov); nni_plat_ipc_pipe_recv(pipe->ipp, rxaio); nni_mtx_unlock(&pipe->mtx); @@ -474,6 +453,7 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) nni_ipc_pipe *pipe = arg; int rv; nni_aio * negaio; + nni_iov iov; nni_mtx_lock(&pipe->mtx); pipe->txhead[0] = 0; @@ -483,15 +463,15 @@ nni_ipc_pipe_start(void *arg, nni_aio *aio) NNI_PUT16(&pipe->txhead[4], pipe->proto); NNI_PUT16(&pipe->txhead[6], 0); - pipe->user_negaio = aio; - pipe->gotrxhead = 0; - pipe->gottxhead = 0; - pipe->wantrxhead = 8; - pipe->wanttxhead = 8; - negaio = pipe->negaio; - negaio->a_niov = 1; - negaio->a_iov[0].iov_len = 8; - negaio->a_iov[0].iov_buf = &pipe->txhead[0]; + pipe->user_negaio = aio; + pipe->gotrxhead = 0; + pipe->gottxhead = 0; + pipe->wantrxhead = 8; + pipe->wanttxhead = 8; + negaio = pipe->negaio; + iov.iov_len = 8; + iov.iov_buf = &pipe->txhead[0]; + nni_aio_set_iov(negaio, 1, &iov); rv = nni_aio_start(aio, nni_ipc_cancel_start, pipe); if (rv != 0) { nni_mtx_unlock(&pipe->mtx); @@ -638,7 +618,7 @@ nni_ipc_ep_cb(void *arg) static void nni_ipc_cancel_ep(nni_aio *aio, int rv) { - nni_ipc_ep *ep = aio->a_prov_data; + nni_ipc_ep *ep = nni_aio_get_prov_data(aio); NNI_ASSERT(rv != 0); nni_mtx_lock(&ep->mtx); @@ -649,7 +629,7 @@ nni_ipc_cancel_ep(nni_aio *aio, int rv) ep->user_aio = NULL; nni_mtx_unlock(&ep->mtx); - nni_aio_cancel(ep->aio, rv); + nni_aio_abort(ep->aio, rv); nni_aio_finish_error(aio, rv); } |
