aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/sp/transport/ws/websocket.c160
1 files changed, 87 insertions, 73 deletions
diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c
index 16929ec9..276a8b16 100644
--- a/src/sp/transport/ws/websocket.c
+++ b/src/sp/transport/ws/websocket.c
@@ -28,6 +28,7 @@ struct ws_dialer {
nni_mtx mtx;
nni_aio connaio;
nng_stream_dialer *dialer;
+ nni_dialer *ndialer;
bool started;
};
@@ -37,20 +38,26 @@ struct ws_listener {
nni_mtx mtx;
nni_aio accaio;
nng_stream_listener *listener;
+ nni_listener *nlistener;
+ nni_list wait_pipes;
bool started;
};
struct ws_pipe {
- nni_mtx mtx;
- bool closed;
- uint16_t peer;
- nni_aio *user_txaio;
- nni_aio *user_rxaio;
- nni_aio txaio;
- nni_aio rxaio;
- nng_stream *ws;
+ nni_mtx mtx;
+ bool closed;
+ uint16_t peer;
+ nni_aio *user_txaio;
+ nni_aio *user_rxaio;
+ nni_aio txaio;
+ nni_aio rxaio;
+ nng_stream *ws;
+ nni_pipe *npipe;
+ nni_list_node node;
};
+static void wstran_listener_match(ws_listener *l);
+
static void
wstran_pipe_send_cb(void *arg)
{
@@ -189,8 +196,14 @@ wstran_pipe_stop(void *arg)
static int
wstran_pipe_init(void *arg, nni_pipe *pipe)
{
- NNI_ARG_UNUSED(arg);
- NNI_ARG_UNUSED(pipe);
+ ws_pipe *p = arg;
+
+ p->npipe = pipe;
+ nni_mtx_init(&p->mtx);
+
+ // Initialize AIOs.
+ nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
+ nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);
return (0);
}
@@ -204,7 +217,6 @@ wstran_pipe_fini(void *arg)
nni_aio_fini(&p->txaio);
nni_mtx_fini(&p->mtx);
- NNI_FREE_STRUCT(p);
}
static void
@@ -218,25 +230,6 @@ wstran_pipe_close(void *arg)
nng_stream_close(p->ws);
}
-static int
-wstran_pipe_alloc(ws_pipe **pipep, void *ws)
-{
- ws_pipe *p;
-
- if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
- return (NNG_ENOMEM);
- }
- p->ws = ws;
- nni_mtx_init(&p->mtx);
-
- // Initialize AIOs.
- nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
- nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);
-
- *pipep = p;
- return (0);
-}
-
static uint16_t
wstran_pipe_peer(void *arg)
{
@@ -253,7 +246,6 @@ wstran_listener_bind(void *arg, nng_url *url)
if ((rv = nng_stream_listener_listen(l->listener)) == 0) {
int port;
- l->started = true;
nng_stream_listener_get_int(
l->listener, NNG_OPT_TCP_BOUND_PORT, &port);
url->u_port = (uint32_t) port;
@@ -293,9 +285,11 @@ wstran_listener_accept(void *arg, nni_aio *aio)
return;
}
nni_list_append(&l->aios, aio);
- if (aio == nni_list_first(&l->aios)) {
+ if (!l->started) {
+ l->started = true;
nng_stream_listener_accept(l->listener, &l->accaio);
}
+ wstran_listener_match(l);
nni_mtx_unlock(&l->mtx);
}
@@ -356,6 +350,7 @@ wstran_pipe_getopt(
}
static nni_sp_pipe_ops ws_pipe_ops = {
+ .p_size = sizeof(ws_pipe),
.p_init = wstran_pipe_init,
.p_fini = wstran_pipe_fini,
.p_stop = wstran_pipe_stop,
@@ -383,7 +378,6 @@ wstran_dialer_fini(void *arg)
nng_stream_dialer_free(d->dialer);
nni_aio_fini(&d->connaio);
nni_mtx_fini(&d->mtx);
- NNI_FREE_STRUCT(d);
}
static void
@@ -403,7 +397,6 @@ wstran_listener_fini(void *arg)
nng_stream_listener_free(l->listener);
nni_aio_fini(&l->accaio);
nni_mtx_fini(&l->mtx);
- NNI_FREE_STRUCT(l);
}
static void
@@ -422,7 +415,6 @@ wstran_connect_cb(void *arg)
}
if ((uaio = nni_list_first(&d->aios)) == NULL) {
// The client stopped caring about this!
- nng_stream_stop(ws);
nng_stream_free(ws);
nni_mtx_unlock(&d->mtx);
return;
@@ -431,14 +423,15 @@ wstran_connect_cb(void *arg)
NNI_ASSERT(nni_list_empty(&d->aios));
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
- } else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
- nng_stream_stop(ws);
+ } else if ((rv = nni_pipe_alloc_dialer((void **) &p, d->ndialer)) !=
+ 0) {
nng_stream_free(ws);
nni_aio_finish_error(uaio, rv);
} else {
p->peer = d->peer;
+ p->ws = ws;
- nni_aio_set_output(uaio, 0, p);
+ nni_aio_set_output(uaio, 0, p->npipe);
nni_aio_finish(uaio, 0, 0);
}
nni_mtx_unlock(&d->mtx);
@@ -457,66 +450,90 @@ static void
wstran_listener_close(void *arg)
{
ws_listener *l = arg;
+ ws_pipe *p;
nni_aio_close(&l->accaio);
+ NNI_LIST_FOREACH (&l->wait_pipes, p) {
+ nni_pipe_close(p->npipe);
+ }
nng_stream_listener_close(l->listener);
}
static void
+wstran_listener_match(ws_listener *l)
+{
+ nni_aio *uaio;
+ ws_pipe *p;
+ if (((uaio = nni_list_first(&l->aios)) == NULL) ||
+ ((p = nni_list_first(&l->wait_pipes)) == NULL)) {
+ return;
+ }
+
+ nni_list_remove(&l->wait_pipes, p);
+ nni_aio_list_remove(uaio);
+
+ nni_aio_set_output(uaio, 0, p->npipe);
+ nni_aio_finish(uaio, 0, 0);
+}
+
+static void
wstran_accept_cb(void *arg)
{
ws_listener *l = arg;
nni_aio *aaio = &l->accaio;
nni_aio *uaio;
int rv;
+ ws_pipe *p;
+ nng_stream *ws;
nni_mtx_lock(&l->mtx);
+
+ ws = nni_aio_get_output(aaio, 0);
uaio = nni_list_first(&l->aios);
if ((rv = nni_aio_result(aaio)) != 0) {
- if (uaio != NULL) {
- nni_aio_list_remove(uaio);
- nni_aio_finish_error(uaio, rv);
- }
- } else {
- nng_stream *ws = nni_aio_get_output(aaio, 0);
- if (uaio != NULL) {
- ws_pipe *p;
- // Make a pipe
- nni_aio_list_remove(uaio);
- if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
- nng_stream_close(ws);
- nni_aio_finish_error(uaio, rv);
- } else {
- p->peer = l->peer;
-
- nni_aio_set_output(uaio, 0, p);
- nni_aio_finish(uaio, 0, 0);
- }
- }
+ goto error;
}
- if (!nni_list_empty(&l->aios)) {
- nng_stream_listener_accept(l->listener, aaio);
+
+ rv = nni_pipe_alloc_listener((void **) &p, l->nlistener);
+ if (rv != 0) {
+ nng_stream_free(ws);
+ goto error;
}
+ p->peer = l->peer;
+ p->ws = ws;
+
+ nni_list_append(&l->wait_pipes, p);
+ wstran_listener_match(l);
+ nng_stream_listener_accept(l->listener, aaio);
+ nni_mtx_unlock(&l->mtx);
+ return;
+
+error:
+
+ // possibly report this upstream
+ if ((uaio = nni_list_first(&l->aios)) != NULL) {
+ nni_aio_list_remove(uaio);
+ nni_aio_finish_error(uaio, rv);
+ }
+ nng_stream_listener_accept(l->listener, aaio);
nni_mtx_unlock(&l->mtx);
}
static int
wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
{
- ws_dialer *d;
+ ws_dialer *d = (void *) dp;
nni_sock *s = nni_dialer_sock(ndialer);
int rv;
char name[64];
- if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
- return (NNG_ENOMEM);
- }
nni_mtx_init(&d->mtx);
nni_aio_list_init(&d->aios);
nni_aio_init(&d->connaio, wstran_connect_cb, d);
- d->peer = nni_sock_peer_id(s);
+ d->peer = nni_sock_peer_id(s);
+ d->ndialer = ndialer;
snprintf(
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s));
@@ -526,29 +543,26 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_dialer_set_string(
d->dialer, NNG_OPT_WS_PROTOCOL, name)) != 0)) {
- wstran_dialer_fini(d);
return (rv);
}
- *dp = d;
return (0);
}
static int
wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
{
- ws_listener *l;
+ ws_listener *l = (void *) lp;
int rv;
nni_sock *s = nni_listener_sock(listener);
char name[64];
- if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
- return (NNG_ENOMEM);
- }
+ l->nlistener = listener;
nni_mtx_init(&l->mtx);
nni_aio_list_init(&l->aios);
nni_aio_init(&l->accaio, wstran_accept_cb, l);
+ NNI_LIST_INIT(&l->wait_pipes, ws_pipe, node);
l->peer = nni_sock_peer_id(s);
@@ -560,10 +574,8 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_listener_set_string(
l->listener, NNG_OPT_WS_PROTOCOL, name)) != 0)) {
- wstran_listener_fini(l);
return (rv);
}
- *lp = l;
return (0);
}
@@ -669,6 +681,7 @@ wstran_listener_set_tls(void *arg, nng_tls_config *tls)
}
static nni_sp_dialer_ops ws_dialer_ops = {
+ .d_size = sizeof(ws_dialer),
.d_init = wstran_dialer_init,
.d_fini = wstran_dialer_fini,
.d_connect = wstran_dialer_connect,
@@ -681,6 +694,7 @@ static nni_sp_dialer_ops ws_dialer_ops = {
};
static nni_sp_listener_ops ws_listener_ops = {
+ .l_size = sizeof(ws_listener),
.l_init = wstran_listener_init,
.l_fini = wstran_listener_fini,
.l_bind = wstran_listener_bind,