aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sp/transport/ipc/ipc.c267
1 files changed, 106 insertions, 161 deletions
diff --git a/src/sp/transport/ipc/ipc.c b/src/sp/transport/ipc/ipc.c
index 803c4b4b..dadb1909 100644
--- a/src/sp/transport/ipc/ipc.c
+++ b/src/sp/transport/ipc/ipc.c
@@ -13,6 +13,7 @@
#include "core/defs.h"
#include "core/nng_impl.h"
+#include "core/pipe.h"
#include "nng/nng.h"
// IPC transport. Platform specific IPC operations must be
@@ -25,29 +26,27 @@ typedef struct ipc_ep ipc_ep;
// ipc_pipe is one end of an IPC connection.
struct ipc_pipe {
- nng_stream *conn;
- uint16_t peer;
- uint16_t proto;
- size_t rcv_max;
- bool closed;
- ipc_ep *ep;
- nni_pipe *pipe;
- nni_list_node node;
- nni_atomic_flag reaped;
- nni_reap_node reap;
- uint8_t tx_head[1 + sizeof(uint64_t)];
- uint8_t rx_head[1 + sizeof(uint64_t)];
- size_t got_tx_head;
- size_t got_rx_head;
- size_t want_tx_head;
- size_t want_rx_head;
- nni_list recv_q;
- nni_list send_q;
- nni_aio tx_aio;
- nni_aio rx_aio;
- nni_aio neg_aio;
- nni_msg *rx_msg;
- nni_mtx mtx;
+ nng_stream *conn;
+ uint16_t peer;
+ uint16_t proto;
+ size_t rcv_max;
+ bool closed;
+ ipc_ep *ep;
+ nni_pipe *pipe;
+ nni_list_node node;
+ uint8_t tx_head[1 + sizeof(uint64_t)];
+ uint8_t rx_head[1 + sizeof(uint64_t)];
+ size_t got_tx_head;
+ size_t got_rx_head;
+ size_t want_tx_head;
+ size_t want_rx_head;
+ nni_list recv_q;
+ nni_list send_q;
+ nni_aio tx_aio;
+ nni_aio rx_aio;
+ nni_aio neg_aio;
+ nni_msg *rx_msg;
+ nni_mtx mtx;
};
struct ipc_ep {
@@ -57,16 +56,15 @@ struct ipc_ep {
bool started;
bool closed;
bool fini;
- int ref_cnt;
nng_stream_dialer *dialer;
nng_stream_listener *listener;
+ nni_listener *nlistener;
+ nni_dialer *ndialer;
nni_aio *user_aio;
- nni_aio *conn_aio;
- nni_aio *time_aio;
- nni_list busy_pipes; // busy pipes -- ones passed to socket
+ nni_aio conn_aio;
+ nni_aio time_aio;
nni_list wait_pipes; // pipes waiting to match to socket
nni_list nego_pipes; // pipes busy negotiating
- nni_reap_node reap;
#ifdef NNG_ENABLE_STATS
nni_stat_item st_rcv_max;
#endif
@@ -77,18 +75,6 @@ static void ipc_pipe_recv_start(ipc_pipe *p);
static void ipc_pipe_send_cb(void *);
static void ipc_pipe_recv_cb(void *);
static void ipc_pipe_nego_cb(void *);
-static void ipc_pipe_fini(void *);
-static void ipc_ep_fini(void *);
-
-static nni_reap_list ipc_ep_reap_list = {
- .rl_offset = offsetof(ipc_ep, reap),
- .rl_func = ipc_ep_fini,
-};
-
-static nni_reap_list ipc_pipe_reap_list = {
- .rl_offset = offsetof(ipc_pipe, reap),
- .rl_func = ipc_pipe_fini,
-};
static void
ipc_tran_init(void)
@@ -119,11 +105,15 @@ ipc_pipe_close(void *arg)
static void
ipc_pipe_stop(void *arg)
{
- ipc_pipe *p = arg;
+ ipc_pipe *p = arg;
+ ipc_ep *ep = p->ep;
nni_aio_stop(&p->rx_aio);
nni_aio_stop(&p->tx_aio);
nni_aio_stop(&p->neg_aio);
+ nni_mtx_lock(&ep->mtx);
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&ep->mtx);
}
static int
@@ -131,6 +121,12 @@ ipc_pipe_init(void *arg, nni_pipe *pipe)
{
ipc_pipe *p = arg;
p->pipe = pipe;
+ nni_mtx_init(&p->mtx);
+ nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p);
+ nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p);
+ nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p);
+ nni_aio_list_init(&p->send_q);
+ nni_aio_list_init(&p->recv_q);
return (0);
}
@@ -138,18 +134,8 @@ static void
ipc_pipe_fini(void *arg)
{
ipc_pipe *p = arg;
- ipc_ep *ep;
ipc_pipe_stop(p);
- if ((ep = p->ep) != NULL) {
- nni_mtx_lock(&ep->mtx);
- nni_list_node_remove(&p->node);
- ep->ref_cnt--;
- if (ep->fini && (ep->ref_cnt == 0)) {
- nni_reap(&ipc_ep_reap_list, ep);
- }
- nni_mtx_unlock(&ep->mtx);
- }
nng_stream_free(p->conn);
nni_aio_fini(&p->rx_aio);
nni_aio_fini(&p->tx_aio);
@@ -158,34 +144,6 @@ ipc_pipe_fini(void *arg)
nni_msg_free(p->rx_msg);
}
nni_mtx_fini(&p->mtx);
- NNI_FREE_STRUCT(p);
-}
-
-static void
-ipc_pipe_reap(ipc_pipe *p)
-{
- if (!nni_atomic_flag_test_and_set(&p->reaped)) {
- nni_reap(&ipc_pipe_reap_list, p);
- }
-}
-
-static int
-ipc_pipe_alloc(ipc_pipe **pipe_p)
-{
- ipc_pipe *p;
-
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- nni_mtx_init(&p->mtx);
- nni_aio_init(&p->tx_aio, ipc_pipe_send_cb, p);
- nni_aio_init(&p->rx_aio, ipc_pipe_recv_cb, p);
- nni_aio_init(&p->neg_aio, ipc_pipe_nego_cb, p);
- nni_aio_list_init(&p->send_q);
- nni_aio_list_init(&p->recv_q);
- nni_atomic_flag_reset(&p->reaped);
- *pipe_p = p;
- return (0);
}
static void
@@ -199,10 +157,9 @@ ipc_ep_match(ipc_ep *ep)
return;
}
nni_list_remove(&ep->wait_pipes, p);
- nni_list_append(&ep->busy_pipes, p);
ep->user_aio = NULL;
p->rcv_max = ep->rcv_max;
- nni_aio_set_output(aio, 0, p);
+ nni_aio_set_output(aio, 0, p->pipe);
nni_aio_finish(aio, 0, 0);
}
@@ -282,7 +239,8 @@ error:
nni_aio_finish_error(user_aio, rv);
}
nni_mtx_unlock(&ep->mtx);
- ipc_pipe_reap(p);
+ nni_pipe_close(p->pipe);
+ nni_pipe_rele(p->pipe);
}
static void
@@ -636,8 +594,6 @@ ipc_pipe_start(ipc_pipe *p, nng_stream *conn, ipc_ep *ep)
{
nni_iov iov;
- ep->ref_cnt++;
-
p->conn = conn;
p->ep = ep;
p->proto = ep->proto;
@@ -668,60 +624,59 @@ ipc_ep_close(void *arg)
ipc_ep *ep = arg;
ipc_pipe *p;
+ nni_aio_close(&ep->time_aio);
+ nni_aio_close(&ep->conn_aio);
+
nni_mtx_lock(&ep->mtx);
ep->closed = true;
- nni_aio_close(ep->time_aio);
if (ep->dialer != NULL) {
nng_stream_dialer_close(ep->dialer);
}
if (ep->listener != NULL) {
nng_stream_listener_close(ep->listener);
}
- NNI_LIST_FOREACH (&ep->nego_pipes, p) {
- ipc_pipe_close(p);
- }
- NNI_LIST_FOREACH (&ep->wait_pipes, p) {
- ipc_pipe_close(p);
- }
- NNI_LIST_FOREACH (&ep->busy_pipes, p) {
- ipc_pipe_close(p);
- }
if (ep->user_aio != NULL) {
nni_aio_finish_error(ep->user_aio, NNG_ECLOSED);
ep->user_aio = NULL;
}
+ NNI_LIST_FOREACH (&ep->nego_pipes, p) {
+ nni_pipe_close(p->pipe);
+ }
+ NNI_LIST_FOREACH (&ep->wait_pipes, p) {
+ nni_pipe_close(p->pipe);
+ }
nni_mtx_unlock(&ep->mtx);
}
static void
+ipc_ep_stop(void *arg)
+{
+ ipc_ep *ep = arg;
+
+ nni_aio_stop(&ep->time_aio);
+ nni_aio_stop(&ep->conn_aio);
+}
+
+static void
ipc_ep_fini(void *arg)
{
ipc_ep *ep = arg;
- nni_mtx_lock(&ep->mtx);
- ep->fini = true;
- if (ep->ref_cnt != 0) {
- nni_mtx_unlock(&ep->mtx);
- return;
- }
- nni_mtx_unlock(&ep->mtx);
- nni_aio_stop(ep->time_aio);
- nni_aio_stop(ep->conn_aio);
+ nni_aio_fini(&ep->time_aio);
+ nni_aio_fini(&ep->conn_aio);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
- nni_aio_free(ep->time_aio);
- nni_aio_free(ep->conn_aio);
nni_mtx_fini(&ep->mtx);
- NNI_FREE_STRUCT(ep);
}
static void
ipc_ep_timer_cb(void *arg)
{
ipc_ep *ep = arg;
+
nni_mtx_lock(&ep->mtx);
- if (nni_aio_result(ep->time_aio) == 0) {
- nng_stream_listener_accept(ep->listener, ep->conn_aio);
+ if (nni_aio_result(&ep->time_aio) == 0) {
+ nng_stream_listener_accept(ep->listener, &ep->conn_aio);
}
nni_mtx_unlock(&ep->mtx);
}
@@ -730,7 +685,7 @@ static void
ipc_ep_accept_cb(void *arg)
{
ipc_ep *ep = arg;
- nni_aio *aio = ep->conn_aio;
+ nni_aio *aio = &ep->conn_aio;
ipc_pipe *p;
int rv;
nng_stream *conn;
@@ -741,18 +696,21 @@ ipc_ep_accept_cb(void *arg)
}
conn = nni_aio_get_output(aio, 0);
- if ((rv = ipc_pipe_alloc(&p)) != 0) {
+
+ if (ep->closed) {
+ rv = NNG_ECLOSED;
nng_stream_free(conn);
goto error;
}
- if (ep->closed) {
- ipc_pipe_fini(p);
+ rv = nni_pipe_alloc_listener((void **) &p, ep->nlistener);
+ if (rv != 0) {
nng_stream_free(conn);
- rv = NNG_ECLOSED;
goto error;
}
+
ipc_pipe_start(p, conn, ep);
- nng_stream_listener_accept(ep->listener, ep->conn_aio);
+
+ nng_stream_listener_accept(ep->listener, &ep->conn_aio);
nni_mtx_unlock(&ep->mtx);
return;
@@ -765,16 +723,15 @@ error:
}
switch (rv) {
-
+ case NNG_ECLOSED:
+ break;
case NNG_ENOMEM:
case NNG_ENOFILES:
- nng_sleep_aio(10, ep->time_aio);
+ nng_sleep_aio(10, &ep->time_aio);
break;
default:
- if (!ep->closed) {
- nng_stream_listener_accept(ep->listener, ep->conn_aio);
- }
+ nng_stream_listener_accept(ep->listener, &ep->conn_aio);
break;
}
nni_mtx_unlock(&ep->mtx);
@@ -784,56 +741,51 @@ static void
ipc_ep_dial_cb(void *arg)
{
ipc_ep *ep = arg;
- nni_aio *aio = ep->conn_aio;
+ nni_aio *aio = &ep->conn_aio;
+ nni_aio *uaio;
ipc_pipe *p;
int rv;
nng_stream *conn;
+ nni_mtx_lock(&ep->mtx);
if ((rv = nni_aio_result(aio)) != 0) {
goto error;
}
conn = nni_aio_get_output(aio, 0);
- if ((rv = ipc_pipe_alloc(&p)) != 0) {
+
+ if (ep->closed) {
nng_stream_free(conn);
+ rv = NNG_ECLOSED;
goto error;
}
- nni_mtx_lock(&ep->mtx);
- if (ep->closed) {
- ipc_pipe_fini(p);
+ if ((rv = nni_pipe_alloc_dialer((void **) &p, ep->ndialer)) != 0) {
nng_stream_free(conn);
- rv = NNG_ECLOSED;
- nni_mtx_unlock(&ep->mtx);
goto error;
- } else {
- ipc_pipe_start(p, conn, ep);
}
+
+ ipc_pipe_start(p, conn, ep);
nni_mtx_unlock(&ep->mtx);
return;
error:
// Error connecting. We need to pass this straight back
// to the user.
- nni_mtx_lock(&ep->mtx);
- if ((aio = ep->user_aio) != NULL) {
+ if ((uaio = ep->user_aio) != NULL) {
ep->user_aio = NULL;
- nni_aio_finish_error(aio, rv);
+ nni_aio_finish_error(uaio, rv);
}
nni_mtx_unlock(&ep->mtx);
}
-static int
-ipc_ep_init(ipc_ep **epp, nni_sock *sock)
+static void
+ipc_ep_init(ipc_ep *ep, nni_sock *sock, void (*conn_cb)(void *))
{
- ipc_ep *ep;
-
- if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&ep->mtx);
- NNI_LIST_INIT(&ep->busy_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->wait_pipes, ipc_pipe, node);
NNI_LIST_INIT(&ep->nego_pipes, ipc_pipe, node);
+ nni_aio_init(&ep->conn_aio, conn_cb, ep);
+ nni_aio_init(&ep->time_aio, ipc_ep_timer_cb, ep);
ep->proto = nni_sock_proto_id(sock);
@@ -847,56 +799,44 @@ ipc_ep_init(ipc_ep **epp, nni_sock *sock)
};
nni_stat_init(&ep->st_rcv_max, &rcv_max_info);
#endif
-
- *epp = ep;
- return (0);
}
static int
ipc_ep_init_dialer(void **dp, nng_url *url, nni_dialer *dialer)
{
- ipc_ep *ep;
+ ipc_ep *ep = (void *) dp;
int rv;
nni_sock *sock = nni_dialer_sock(dialer);
- if ((rv = ipc_ep_init(&ep, sock)) != 0) {
- return (rv);
- }
+ ipc_ep_init(ep, sock, ipc_ep_dial_cb);
+ ep->ndialer = dialer;
- if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_dial_cb, ep)) != 0) ||
- ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) {
- ipc_ep_fini(ep);
+ if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) {
return (rv);
}
#ifdef NNG_ENABLE_STATS
nni_dialer_add_stat(dialer, &ep->st_rcv_max);
#endif
- *dp = ep;
return (0);
}
static int
ipc_ep_init_listener(void **dp, nng_url *url, nni_listener *listener)
{
- ipc_ep *ep;
+ ipc_ep *ep = (void *) dp;
int rv;
nni_sock *sock = nni_listener_sock(listener);
- if ((rv = ipc_ep_init(&ep, sock)) != 0) {
- return (rv);
- }
+ ipc_ep_init(ep, sock, ipc_ep_accept_cb);
+ ep->nlistener = listener;
- if (((rv = nni_aio_alloc(&ep->conn_aio, ipc_ep_accept_cb, ep)) != 0) ||
- ((rv = nni_aio_alloc(&ep->time_aio, ipc_ep_timer_cb, ep)) != 0) ||
- ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
- ipc_ep_fini(ep);
+ if ((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0) {
return (rv);
}
#ifdef NNG_ENABLE_STATS
nni_listener_add_stat(listener, &ep->st_rcv_max);
#endif
- *dp = ep;
return (0);
}
@@ -939,7 +879,7 @@ ipc_ep_connect(void *arg, nni_aio *aio)
return;
}
ep->user_aio = aio;
- nng_stream_dialer_dial(ep->dialer, ep->conn_aio);
+ nng_stream_dialer_dial(ep->dialer, &ep->conn_aio);
nni_mtx_unlock(&ep->mtx);
}
@@ -1013,7 +953,7 @@ ipc_ep_accept(void *arg, nni_aio *aio)
ep->user_aio = aio;
if (!ep->started) {
ep->started = true;
- nng_stream_listener_accept(ep->listener, ep->conn_aio);
+ nng_stream_listener_accept(ep->listener, &ep->conn_aio);
} else {
ipc_ep_match(ep);
}
@@ -1030,6 +970,7 @@ ipc_pipe_get(void *arg, const char *name, void *buf, size_t *szp, nni_type t)
}
static nni_sp_pipe_ops ipc_tran_pipe_ops = {
+ .p_size = sizeof(ipc_pipe),
.p_init = ipc_pipe_init,
.p_fini = ipc_pipe_fini,
.p_stop = ipc_pipe_stop,
@@ -1117,20 +1058,24 @@ ipc_listener_set_sec_desc(void *arg, void *pdesc)
}
static nni_sp_dialer_ops ipc_dialer_ops = {
+ .d_size = sizeof(ipc_ep),
.d_init = ipc_ep_init_dialer,
.d_fini = ipc_ep_fini,
.d_connect = ipc_ep_connect,
.d_close = ipc_ep_close,
+ .d_stop = ipc_ep_stop,
.d_getopt = ipc_dialer_get,
.d_setopt = ipc_dialer_set,
};
static nni_sp_listener_ops ipc_listener_ops = {
+ .l_size = sizeof(ipc_ep),
.l_init = ipc_ep_init_listener,
.l_fini = ipc_ep_fini,
.l_bind = ipc_ep_bind,
.l_accept = ipc_ep_accept,
.l_close = ipc_ep_close,
+ .l_stop = ipc_ep_stop,
.l_getopt = ipc_listener_get,
.l_setopt = ipc_listener_set,
.l_set_security_descriptor = ipc_listener_set_sec_desc,