aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-15 01:47:12 -0700
committerGitHub <noreply@github.com>2018-05-15 01:47:12 -0700
commit1d033484ee1a2ec26d3eead073e7bc0f889ffdf4 (patch)
tree15d3897d405cb0beb1ada6270ecf70241451ca70 /src
parent16b4c4019c7b7904de171c588ed8c72ca732d2cf (diff)
downloadnng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.tar.gz
nng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.tar.bz2
nng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.zip
fixes #419 want to nni_aio_stop without blocking (#428)
* fixes #419 want to nni_aio_stop without blocking This actually introduces an nni_aio_close() API that causes nni_aio_begin to return NNG_ECLOSED, while scheduling a callback on the AIO to do an NNG_ECLOSED as well. This should be called in non-blocking close() contexts instead of nni_aio_stop(), and the cases where we call nni_aio_fini() multiple times are updated updated to add nni_aio_stop() calls on all "interlinked" aios before finalizing them. Furthermore, we call nni_aio_close() as soon as practical in the close path. This closes an annoying race condition where the callback from a lower subsystem could wind up rescheduling an operation that we wanted to abort.
Diffstat (limited to 'src')
-rw-r--r--src/core/aio.c45
-rw-r--r--src/core/aio.h6
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/device.c4
-rw-r--r--src/core/endpt.c64
-rw-r--r--src/core/pipe.c93
-rw-r--r--src/core/pipe.h1
-rw-r--r--src/core/protocol.h19
-rw-r--r--src/core/socket.c54
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/transport.c2
-rw-r--r--src/core/transport.h8
-rw-r--r--src/protocol/bus0/bus.c26
-rw-r--r--src/protocol/pair0/pair.c22
-rw-r--r--src/protocol/pair1/pair.c27
-rw-r--r--src/protocol/pipeline0/pull.c16
-rw-r--r--src/protocol/pipeline0/push.c19
-rw-r--r--src/protocol/pubsub0/pub.c23
-rw-r--r--src/protocol/pubsub0/sub.c16
-rw-r--r--src/protocol/reqrep0/rep.c16
-rw-r--r--src/protocol/reqrep0/req.c19
-rw-r--r--src/protocol/reqrep0/xrep.c25
-rw-r--r--src/protocol/reqrep0/xreq.c25
-rw-r--r--src/protocol/survey0/respond.c18
-rw-r--r--src/protocol/survey0/survey.c19
-rw-r--r--src/protocol/survey0/xrespond.c26
-rw-r--r--src/protocol/survey0/xsurvey.c25
-rw-r--r--src/supplemental/http/http_conn.c40
-rw-r--r--src/supplemental/http/http_server.c16
-rw-r--r--src/supplemental/tls/mbedtls/tls.c3
-rw-r--r--src/supplemental/websocket/websocket.c11
-rw-r--r--src/transport/inproc/inproc.c6
-rw-r--r--src/transport/ipc/ipc.c14
-rw-r--r--src/transport/tcp/tcp.c18
-rw-r--r--src/transport/tls/tls.c14
-rw-r--r--src/transport/ws/websocket.c16
-rw-r--r--src/transport/zerotier/zerotier.c6
37 files changed, 495 insertions, 278 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a5b6c088..2f90aa34 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -63,10 +63,10 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
- bool a_stop; // shutting down (no new operations)
- bool a_sleep; // sleeping with no action
- int a_sleeprv; // result when sleep wakes
- int a_cancelrv; // if canceled between begin and schedule
+ bool a_stop; // shutting down (no new operations)
+ bool a_closed; // close called, but not fini (yet)
+ bool a_sleep; // sleeping with no action
+ int a_sleeprv; // result when sleep wakes
nni_task *a_task;
// Read/write operations.
@@ -205,6 +205,24 @@ nni_aio_stop(nni_aio *aio)
}
void
+nni_aio_close(nni_aio *aio)
+{
+ if (aio != NULL) {
+ nni_aio_cancelfn cancelfn;
+
+ nni_mtx_lock(&nni_aio_lk);
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
+ aio->a_closed = true;
+ nni_mtx_unlock(&nni_aio_lk);
+
+ if (cancelfn != NULL) {
+ cancelfn(aio, NNG_ECLOSED);
+ }
+ }
+}
+
+void
nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
aio->a_timeout = when;
@@ -306,11 +324,18 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
nni_task_prep(aio->a_task);
+ if (aio->a_closed) {
+ aio->a_result = NNG_ECLOSED;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_dispatch(aio->a_task);
+ return (NNG_ECLOSED);
+ }
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
@@ -318,7 +343,6 @@ nni_aio_begin(nni_aio *aio)
int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
- int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
@@ -339,19 +363,16 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- if ((rv = aio->a_cancelrv) != 0) {
+ if (aio->a_closed) {
nni_mtx_unlock(&nni_aio_lk);
- return (rv);
+ return (NNG_ECLOSED);
}
// If cancellation occurred in between "begin" and "schedule",
// then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- if ((rv = aio->a_cancelrv) != 0) {
- aio->a_expire = 0;
- nni_aio_expire_add(aio);
- } else if (aio->a_expire != NNI_TIME_NEVER) {
+ if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
diff --git a/src/core/aio.h b/src/core/aio.h
index 2ed0fb5b..5462c40b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -42,6 +42,12 @@ extern void nni_aio_fini(nni_aio *);
// use nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);
+// nni_aio_close closes the aio for further activity. It aborts any in-progress
+// transaction (if it can), and future calls nni_aio_begin or nni_aio_schedule
+// with both result in NNG_ECLOSED. The expectation is that protocols call this
+// for all their aios in a stop routine, before calling fini on any of them.
+extern void nni_aio_close(nni_aio *);
+
// nni_aio_set_data sets user data. This should only be done by the
// consumer, initiating the I/O. The intention is to be able to store
// additional data for use when the operation callback is executed.
diff --git a/src/core/defs.h b/src/core/defs.h
index b9c9b7e5..29d1b826 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -45,9 +45,9 @@ typedef struct nni_ctx nni_ctx;
typedef struct nni_ep nni_ep;
typedef struct nni_pipe nni_pipe;
typedef struct nni_tran nni_tran;
-typedef struct nni_tran_ep nni_tran_ep;
+typedef struct nni_tran_ep_ops nni_tran_ep_ops;
typedef struct nni_tran_ep_option nni_tran_ep_option;
-typedef struct nni_tran_pipe nni_tran_pipe;
+typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
typedef struct nni_tran_pipe_option nni_tran_pipe_option;
typedef struct nni_proto_ctx_option nni_proto_ctx_option;
diff --git a/src/core/device.c b/src/core/device.c
index 1f3bf233..0fd23add 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -120,8 +120,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
if ((s1 == NULL) || (s2 == NULL)) {
return (NNG_EINVAL);
}
- if ((nni_sock_peer(s1) != nni_sock_proto(s2)) ||
- (nni_sock_peer(s2) != nni_sock_proto(s1))) {
+ if ((nni_sock_peer_id(s1) != nni_sock_proto_id(s2)) ||
+ (nni_sock_peer_id(s2) != nni_sock_proto_id(s1))) {
return (NNG_EINVAL);
}
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 7593fb42..87566d42 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -15,30 +15,30 @@
#include <string.h>
struct nni_ep {
- nni_tran_ep ep_ops; // transport ops
- nni_tran * ep_tran; // transport pointer
- void * ep_data; // transport private
- uint64_t ep_id; // endpoint id
- nni_list_node ep_node; // per socket list
- nni_sock * ep_sock;
- nni_url * ep_url;
- int ep_mode;
- int ep_started;
- int ep_closed; // full shutdown
- int ep_closing; // close pending (waiting on refcnt)
- int ep_refcnt;
- bool ep_tmo_run;
- nni_mtx ep_mtx;
- nni_cv ep_cv;
- nni_list ep_pipes;
- nni_aio * ep_acc_aio;
- nni_aio * ep_con_aio;
- nni_aio * ep_con_syn; // used for sync connect
- nni_aio * ep_tmo_aio; // backoff timer
- nni_duration ep_maxrtime; // maximum time for reconnect
- nni_duration ep_currtime; // current time for reconnect
- nni_duration ep_inirtime; // initial time for reconnect
- nni_time ep_conntime; // time of last good connect
+ nni_tran_ep_ops ep_ops; // transport ops
+ nni_tran * ep_tran; // transport pointer
+ void * ep_data; // transport private
+ uint64_t ep_id; // endpoint id
+ nni_list_node ep_node; // per socket list
+ nni_sock * ep_sock;
+ nni_url * ep_url;
+ int ep_mode;
+ int ep_started;
+ int ep_closed; // full shutdown
+ int ep_closing; // close pending (waiting on refcnt)
+ int ep_refcnt;
+ bool ep_tmo_run;
+ nni_mtx ep_mtx;
+ nni_cv ep_cv;
+ nni_list ep_pipes;
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
+ nni_duration ep_maxrtime; // maximum time for reconnect
+ nni_duration ep_currtime; // current time for reconnect
+ nni_duration ep_inirtime; // initial time for reconnect
+ nni_time ep_conntime; // time of last good connect
};
// Functionality related to end points.
@@ -249,10 +249,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_abort(ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_abort(ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_abort(ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_abort(ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_close(ep->ep_acc_aio);
+ nni_aio_close(ep->ep_con_aio);
+ nni_aio_close(ep->ep_con_syn);
+ nni_aio_close(ep->ep_tmo_aio);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -276,10 +276,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
+ nni_aio_close(ep->ep_acc_aio);
+ nni_aio_close(ep->ep_con_aio);
+ nni_aio_close(ep->ep_con_syn);
+ nni_aio_close(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 010d306d..ba3027cf 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -18,21 +18,22 @@
// performed in the context of the protocol.
struct nni_pipe {
- uint32_t p_id;
- nni_tran_pipe p_tran_ops;
- void * p_tran_data;
- void * p_proto_data;
- nni_list_node p_sock_node;
- nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_ep * p_ep;
- bool p_closed;
- bool p_stop;
- int p_refcnt;
- nni_mtx p_mtx;
- nni_cv p_cv;
- nni_list_node p_reap_node;
- nni_aio * p_start_aio;
+ uint32_t p_id;
+ nni_tran_pipe_ops p_tran_ops;
+ nni_proto_pipe_ops p_proto_ops;
+ void * p_tran_data;
+ void * p_proto_data;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
+ nni_sock * p_sock;
+ nni_ep * p_ep;
+ bool p_closed;
+ bool p_stop;
+ int p_refcnt;
+ nni_mtx p_mtx;
+ nni_cv p_cv;
+ nni_list_node p_reap_node;
+ nni_aio * p_start_aio;
};
static nni_idhash *nni_pipes;
@@ -106,6 +107,10 @@ nni_pipe_destroy(nni_pipe *p)
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_stop(p->p_proto_data);
+ }
+
// We have exclusive access at this point, so we can check if
// we are still on any lists.
if (nni_list_node_active(&p->p_ep_node)) {
@@ -126,6 +131,9 @@ nni_pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_fini(p->p_proto_data);
+ }
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -189,6 +197,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
+ // abort any pending negotiation/start process.
+ nni_aio_close(p->p_start_aio);
+
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -196,16 +207,16 @@ nni_pipe_close(nni_pipe *p)
return;
}
p->p_closed = true;
+ nni_mtx_unlock(&p->p_mtx);
+
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+ }
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
}
-
- nni_mtx_unlock(&p->p_mtx);
-
- // abort any pending negotiation/start process.
- nni_aio_abort(p->p_start_aio, NNG_ECLOSED);
}
bool
@@ -222,7 +233,6 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
- nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -231,6 +241,8 @@ nni_pipe_stop(nni_pipe *p)
p->p_stop = true;
nni_mtx_unlock(&p->p_mtx);
+ nni_pipe_close(p);
+
// Put it on the reaplist for async cleanup
nni_mtx_lock(&nni_pipe_reap_lk);
nni_list_append(&nni_pipe_reap_list, p);
@@ -250,12 +262,9 @@ nni_pipe_start_cb(void *arg)
nni_pipe *p = arg;
nni_aio * aio = p->p_start_aio;
- if (nni_aio_result(aio) != 0) {
- nni_pipe_stop(p);
- return;
- }
-
- if (nni_sock_pipe_start(p->p_sock, p) != 0) {
+ if ((nni_aio_result(aio) != 0) ||
+ (nni_sock_pipe_start(p->p_sock, p) != 0) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
nni_pipe_stop(p);
}
}
@@ -263,10 +272,12 @@ nni_pipe_start_cb(void *arg)
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
- nni_pipe *p;
- int rv;
- nni_tran *tran = nni_ep_tran(ep);
- nni_sock *sock = nni_ep_sock(ep);
+ nni_pipe * p;
+ int rv;
+ nni_tran * tran = nni_ep_tran(ep);
+ nni_sock * sock = nni_ep_sock(ep);
+ void * sdata = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -277,6 +288,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
// Make a private copy of the transport ops.
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
+ p->p_proto_ops = *pops;
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
@@ -290,6 +302,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
+
if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
uint64_t id;
nni_mtx_lock(&nni_pipe_lk);
@@ -299,11 +312,19 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_unlock(&nni_pipe_lk);
}
- if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
- ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
+ if ((rv != 0) ||
+ ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
+ ((rv = nni_ep_pipe_add(ep, p)) != 0)) {
nni_pipe_destroy(p);
+ return (rv);
}
+ // At this point the endpoint knows about it, and the protocol
+ // might too, so on failure we have to tear it down fully as if done
+ // after a successful result.
+ if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
+ nni_pipe_stop(p);
+ }
return (rv);
}
@@ -339,12 +360,6 @@ nni_pipe_get_proto_data(nni_pipe *p)
}
void
-nni_pipe_set_proto_data(nni_pipe *p, void *data)
-{
- p->p_proto_data = data;
-}
-
-void
nni_pipe_sock_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_sock_node);
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 18a59ddb..4ccbeae2 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -65,7 +65,6 @@ extern int nni_pipe_getopt(nni_pipe *, const char *, void *, size_t *, int);
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
extern void *nni_pipe_get_proto_data(nni_pipe *);
-extern void nni_pipe_set_proto_data(nni_pipe *, void *);
// nni_pipe_sock_list_init initializes a list of pipes, to be used by
// a per-socket list.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 964aee1a..9c3b4d33 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -34,16 +34,19 @@ struct nni_proto_pipe_ops {
// pipe_start is called to register a pipe with the protocol. The
// protocol can reject this, for example if another pipe is already
- // active on a 1:1 protocol. The protocol may not block during this,
- // as the socket lock is held.
+ // active on a 1:1 protocol. The protocol may not block during this.
int (*pipe_start)(void *);
- // pipe_stop is called to unregister a pipe from the protocol.
- // Threads may still acccess data structures, so the protocol
- // should not free anything yet. This is called with the socket
- // lock held, so the protocol may not call back into the socket, and
- // must not block. This operation must be idempotent, and may
- // be called even if pipe_start was not.
+ // pipe_close is an idempotent, non-blocking, operation, called
+ // when the pipe is being closed. Any operations pending on the
+ // pipe should be canceled with NNG_ECLOSED. (Best option is to
+ // use nng_aio_close() on them)
+ void (*pipe_close)(void *);
+
+ // pipe_stop is called during finalization, to ensure that
+ // the protocol is absolutely finished with the pipe. It should
+ // wait if necessary to ensure that the pipe is not referenced
+ // anymore by the protocol. It should not destroy resources.
void (*pipe_stop)(void *);
};
diff --git a/src/core/socket.c b/src/core/socket.c
index 66580300..f14c15fd 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -253,13 +253,13 @@ nni_sock_setopt_sockname(nni_sock *s, const void *buf, size_t sz, int typ)
static int
nni_sock_getopt_proto(nni_sock *s, void *buf, size_t *szp, int typ)
{
- return (nni_copyout_int(nni_sock_proto(s), buf, szp, typ));
+ return (nni_copyout_int(nni_sock_proto_id(s), buf, szp, typ));
}
static int
nni_sock_getopt_peer(nni_sock *s, void *buf, size_t *szp, int typ)
{
- return (nni_copyout_int(nni_sock_peer(s), buf, szp, typ));
+ return (nni_copyout_int(nni_sock_peer_id(s), buf, szp, typ));
}
static int
@@ -434,9 +434,7 @@ nni_sock_rele(nni_sock *s)
int
nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- void * pdata = nni_pipe_get_proto_data(pipe);
nng_pipe_cb cb;
- int rv;
NNI_ASSERT(s != NULL);
nni_mtx_lock(&s->s_mx);
@@ -464,26 +462,13 @@ nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
return (NNG_ECLOSED);
}
- // Protocol can reject for other reasons.
- // This must be the last operation, until this point
- // the protocol has not actually "seen" the pipe.
- rv = s->s_pipe_ops.pipe_start(pdata);
-
nni_mtx_unlock(&s->s_mx);
- return (rv);
+ return (0);
}
int
nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
{
- int rv;
- void *pdata;
-
- if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) {
- return (rv);
- }
- nni_pipe_set_proto_data(p, pdata);
-
// Initialize protocol pipe data.
nni_mtx_lock(&s->s_mx);
if (s->s_closing) {
@@ -503,7 +488,6 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- void * pdata;
nng_pipe_cb cb;
nni_mtx_lock(&sock->s_mx);
@@ -515,14 +499,8 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
cb(p, NNG_PIPE_REM, arg);
nni_mtx_lock(&sock->s_mx);
}
- pdata = nni_pipe_get_proto_data(pipe);
- if (pdata != NULL) {
- sock->s_pipe_ops.pipe_stop(pdata);
- nni_pipe_set_proto_data(pipe, NULL);
- if (nni_list_active(&sock->s_pipes, pipe)) {
- nni_list_remove(&sock->s_pipes, pipe);
- }
- sock->s_pipe_ops.pipe_fini(pdata);
+ if (nni_list_active(&sock->s_pipes, pipe)) {
+ nni_list_remove(&sock->s_pipes, pipe);
}
if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
nni_cv_wake(&sock->s_cv);
@@ -589,9 +567,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
- NNI_ASSERT(s->s_pipe_ops.pipe_start != NULL);
- NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL);
-
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
@@ -883,15 +858,16 @@ nni_sock_recv(nni_sock *sock, nni_aio *aio)
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
-// nni_sock_protocol returns the socket's 16-bit protocol number.
+// nni_sock_proto_id returns the socket's 16-bit protocol number.
uint16_t
-nni_sock_proto(nni_sock *sock)
+nni_sock_proto_id(nni_sock *sock)
{
return (sock->s_self_id.p_id);
}
+// nni_sock_peer_id returns the socket peer's 16-bit protocol number.
uint16_t
-nni_sock_peer(nni_sock *sock)
+nni_sock_peer_id(nni_sock *sock)
{
return (sock->s_peer_id.p_id);
}
@@ -908,6 +884,18 @@ nni_sock_peer_name(nni_sock *sock)
return (sock->s_peer_id.p_name);
}
+struct nni_proto_pipe_ops *
+nni_sock_proto_pipe_ops(nni_sock *sock)
+{
+ return (&sock->s_pipe_ops);
+}
+
+void *
+nni_sock_proto_data(nni_sock *sock)
+{
+ return (sock->s_data);
+}
+
void
nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index bccda5b3..22a13ef7 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -20,10 +20,13 @@ extern int nni_sock_open(nni_sock **, const nni_proto *);
extern void nni_sock_close(nni_sock *);
extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
-extern uint16_t nni_sock_proto(nni_sock *);
-extern uint16_t nni_sock_peer(nni_sock *);
+extern uint16_t nni_sock_proto_id(nni_sock *);
+extern uint16_t nni_sock_peer_id(nni_sock *);
extern const char *nni_sock_proto_name(nni_sock *);
extern const char *nni_sock_peer_name(nni_sock *);
+extern void * nni_sock_proto_data(nni_sock *);
+
+extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
extern int nni_sock_setopt(
nni_sock *, const char *, const void *, size_t, int);
diff --git a/src/core/transport.c b/src/core/transport.c
index 94e5caa1..0cd84461 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -118,7 +118,7 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ)
nni_mtx_lock(&nni_tran_lk);
NNI_LIST_FOREACH (&nni_tran_list, t) {
- const nni_tran_ep * ep;
+ const nni_tran_ep_ops * ep;
const nni_tran_ep_option *eo;
// Generally we look for endpoint options.
diff --git a/src/core/transport.h b/src/core/transport.h
index b1fecaa2..96976efd 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -23,10 +23,10 @@ struct nni_tran {
const char *tran_scheme;
// tran_ep links our endpoint-specific operations.
- const nni_tran_ep *tran_ep;
+ const nni_tran_ep_ops *tran_ep;
// tran_pipe links our pipe-specific operations.
- const nni_tran_pipe *tran_pipe;
+ const nni_tran_pipe_ops *tran_pipe;
// tran_init, if not NULL, is called once during library
// initialization.
@@ -77,7 +77,7 @@ struct nni_tran_ep_option {
// For a given endpoint, the framework holds a lock so that each entry
// point is run exclusively of the others. (Transports must still guard
// against any asynchronous operations they manage themselves, though.)
-struct nni_tran_ep {
+struct nni_tran_ep_ops {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
int (*ep_init)(void **, nni_url *, nni_sock *, int);
@@ -128,7 +128,7 @@ struct nni_tran_pipe_option {
// with socket locks held, so it is forbidden for the transport to call
// back into the socket at this point. (Which is one reason pointers back
// to socket or even enclosing pipe state, are not provided.)
-struct nni_tran_pipe {
+struct nni_tran_pipe_ops {
// p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used by
// the pipe. After this call returns, the system will not make
diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c
index 2a2a1228..01f20130 100644
--- a/src/protocol/bus0/bus.c
+++ b/src/protocol/bus0/bus.c
@@ -67,7 +67,6 @@ bus0_sock_fini(void *arg)
{
bus0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
@@ -108,7 +107,18 @@ bus0_sock_close(void *arg)
{
bus0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_aio_close(s->aio_getq);
+}
+
+static void
+bus0_pipe_stop(void *arg)
+{
+ bus0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
}
static void
@@ -168,18 +178,17 @@ bus0_pipe_start(void *arg)
}
static void
-bus0_pipe_stop(void *arg)
+bus0_pipe_close(void *arg)
{
bus0_pipe *p = arg;
bus0_sock *s = p->psock;
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_putq);
nni_msgq_close(p->sendq);
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
-
nni_mtx_lock(&s->mtx);
if (nni_list_active(&s->pipes, p)) {
nni_list_remove(&s->pipes, p);
@@ -351,6 +360,7 @@ static nni_proto_pipe_ops bus0_pipe_ops = {
.pipe_init = bus0_pipe_init,
.pipe_fini = bus0_pipe_fini,
.pipe_start = bus0_pipe_start,
+ .pipe_close = bus0_pipe_close,
.pipe_stop = bus0_pipe_stop,
};
diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c
index e275e52c..2fb42df5 100644
--- a/src/protocol/pair0/pair.c
+++ b/src/protocol/pair0/pair.c
@@ -78,6 +78,17 @@ pair0_sock_fini(void *arg)
}
static void
+pair0_pipe_stop(void *arg)
+{
+ pair0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_getq);
+}
+
+static void
pair0_pipe_fini(void *arg)
{
pair0_pipe *p = arg;
@@ -135,15 +146,15 @@ pair0_pipe_start(void *arg)
}
static void
-pair0_pipe_stop(void *arg)
+pair0_pipe_close(void *arg)
{
pair0_pipe *p = arg;
pair0_sock *s = p->psock;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_putq);
+ nni_aio_close(p->aio_getq);
nni_mtx_lock(&s->mtx);
if (s->ppipe == p) {
@@ -249,6 +260,7 @@ static nni_proto_pipe_ops pair0_pipe_ops = {
.pipe_init = pair0_pipe_init,
.pipe_fini = pair0_pipe_fini,
.pipe_start = pair0_pipe_start,
+ .pipe_close = pair0_pipe_close,
.pipe_stop = pair0_pipe_stop,
};
diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c
index a3c01d46..ab01e451 100644
--- a/src/protocol/pair1/pair.c
+++ b/src/protocol/pair1/pair.c
@@ -117,9 +117,21 @@ pair1_sock_init_raw(void **sp, nni_sock *nsock)
}
static void
+pair1_pipe_stop(void *arg)
+{
+ pair1_pipe *p = arg;
+
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_getq);
+}
+
+static void
pair1_pipe_fini(void *arg)
{
pair1_pipe *p = arg;
+
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
nni_aio_fini(p->aio_putq);
@@ -198,21 +210,22 @@ pair1_pipe_start(void *arg)
}
static void
-pair1_pipe_stop(void *arg)
+pair1_pipe_close(void *arg)
{
pair1_pipe *p = arg;
pair1_sock *s = p->psock;
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_putq);
+ nni_aio_close(p->aio_getq);
+
nni_mtx_lock(&s->mtx);
nni_idhash_remove(s->pipes, nni_pipe_id(p->npipe));
nni_list_node_remove(&p->node);
nni_mtx_unlock(&s->mtx);
nni_msgq_close(p->sendq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_getq);
}
static void
@@ -405,7 +418,8 @@ pair1_sock_open(void *arg)
static void
pair1_sock_close(void *arg)
{
- NNI_ARG_UNUSED(arg);
+ pair1_sock *s = arg;
+ nni_aio_close(s->aio_getq);
}
static int
@@ -464,6 +478,7 @@ static nni_proto_pipe_ops pair1_pipe_ops = {
.pipe_init = pair1_pipe_init,
.pipe_fini = pair1_pipe_fini,
.pipe_start = pair1_pipe_start,
+ .pipe_close = pair1_pipe_close,
.pipe_stop = pair1_pipe_stop,
};
diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c
index c5017d50..81f6c137 100644
--- a/src/protocol/pipeline0/pull.c
+++ b/src/protocol/pipeline0/pull.c
@@ -68,6 +68,15 @@ pull0_sock_fini(void *arg)
}
static void
+pull0_pipe_stop(void *arg)
+{
+ pull0_pipe *p = arg;
+
+ nni_aio_stop(p->putq_aio);
+ nni_aio_stop(p->recv_aio);
+}
+
+static void
pull0_pipe_fini(void *arg)
{
pull0_pipe *p = arg;
@@ -110,12 +119,12 @@ pull0_pipe_start(void *arg)
}
static void
-pull0_pipe_stop(void *arg)
+pull0_pipe_close(void *arg)
{
pull0_pipe *p = arg;
- nni_aio_stop(p->putq_aio);
- nni_aio_stop(p->recv_aio);
+ nni_aio_close(p->putq_aio);
+ nni_aio_close(p->recv_aio);
}
static void
@@ -198,6 +207,7 @@ static nni_proto_pipe_ops pull0_pipe_ops = {
.pipe_init = pull0_pipe_init,
.pipe_fini = pull0_pipe_fini,
.pipe_start = pull0_pipe_start,
+ .pipe_close = pull0_pipe_close,
.pipe_stop = pull0_pipe_stop,
};
diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c
index 2ad657b6..e413cf46 100644
--- a/src/protocol/pipeline0/push.c
+++ b/src/protocol/pipeline0/push.c
@@ -83,6 +83,16 @@ push0_sock_close(void *arg)
}
static void
+push0_pipe_stop(void *arg)
+{
+ push0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_getq);
+}
+
+static void
push0_pipe_fini(void *arg)
{
push0_pipe *p = arg;
@@ -136,13 +146,13 @@ push0_pipe_start(void *arg)
}
static void
-push0_pipe_stop(void *arg)
+push0_pipe_close(void *arg)
{
push0_pipe *p = arg;
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_getq);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_getq);
}
static void
@@ -214,6 +224,7 @@ static nni_proto_pipe_ops push0_pipe_ops = {
.pipe_init = push0_pipe_init,
.pipe_fini = push0_pipe_fini,
.pipe_start = push0_pipe_start,
+ .pipe_close = push0_pipe_close,
.pipe_stop = push0_pipe_stop,
};
diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c
index 45f4b7d9..4db48754 100644
--- a/src/protocol/pubsub0/pub.c
+++ b/src/protocol/pubsub0/pub.c
@@ -61,7 +61,6 @@ pub0_sock_fini(void *arg)
{
pub0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
@@ -103,13 +102,24 @@ pub0_sock_close(void *arg)
{
pub0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_aio_close(s->aio_getq);
+}
+
+static void
+pub0_pipe_stop(void *arg)
+{
+ pub0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
}
static void
pub0_pipe_fini(void *arg)
{
pub0_pipe *p = arg;
+
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
@@ -164,14 +174,14 @@ pub0_pipe_start(void *arg)
}
static void
-pub0_pipe_stop(void *arg)
+pub0_pipe_close(void *arg)
{
pub0_pipe *p = arg;
pub0_sock *s = p->pub;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
nni_msgq_close(p->sendq);
@@ -290,6 +300,7 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
.pipe_init = pub0_pipe_init,
.pipe_fini = pub0_pipe_fini,
.pipe_start = pub0_pipe_start,
+ .pipe_close = pub0_pipe_close,
.pipe_stop = pub0_pipe_stop,
};
diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c
index b41b33ea..c244b0ad 100644
--- a/src/protocol/pubsub0/sub.c
+++ b/src/protocol/pubsub0/sub.c
@@ -99,6 +99,15 @@ sub0_sock_close(void *arg)
}
static void
+sub0_pipe_stop(void *arg)
+{
+ sub0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
+}
+
+static void
sub0_pipe_fini(void *arg)
{
sub0_pipe *p = arg;
@@ -139,12 +148,12 @@ sub0_pipe_start(void *arg)
}
static void
-sub0_pipe_stop(void *arg)
+sub0_pipe_close(void *arg)
{
sub0_pipe *p = arg;
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_recv);
+ nni_aio_close(p->aio_putq);
+ nni_aio_close(p->aio_recv);
}
static void
@@ -338,6 +347,7 @@ static nni_proto_pipe_ops sub0_pipe_ops = {
.pipe_init = sub0_pipe_init,
.pipe_fini = sub0_pipe_fini,
.pipe_start = sub0_pipe_start,
+ .pipe_close = sub0_pipe_close,
.pipe_stop = sub0_pipe_stop,
};
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c
index 965cbea7..c483b777 100644
--- a/src/protocol/reqrep0/rep.c
+++ b/src/protocol/reqrep0/rep.c
@@ -297,6 +297,15 @@ rep0_sock_close(void *arg)
}
static void
+rep0_pipe_stop(void *arg)
+{
+ rep0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+}
+
+static void
rep0_pipe_fini(void *arg)
{
rep0_pipe *p = arg;
@@ -347,14 +356,14 @@ rep0_pipe_start(void *arg)
}
static void
-rep0_pipe_stop(void *arg)
+rep0_pipe_close(void *arg)
{
rep0_pipe *p = arg;
rep0_sock *s = p->rep;
rep0_ctx * ctx;
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
nni_mtx_lock(&s->lk);
if (nni_list_active(&s->recvpipes, p)) {
@@ -647,6 +656,7 @@ static nni_proto_pipe_ops rep0_pipe_ops = {
.pipe_init = rep0_pipe_init,
.pipe_fini = rep0_pipe_fini,
.pipe_start = rep0_pipe_start,
+ .pipe_close = rep0_pipe_close,
.pipe_stop = rep0_pipe_stop,
};
diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c
index bbb0b886..fc9d7271 100644
--- a/src/protocol/reqrep0/req.c
+++ b/src/protocol/reqrep0/req.c
@@ -186,6 +186,15 @@ req0_sock_fini(void *arg)
}
static void
+req0_pipe_stop(void *arg)
+{
+ req0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+}
+
+static void
req0_pipe_fini(void *arg)
{
req0_pipe *p = arg;
@@ -243,17 +252,14 @@ req0_pipe_start(void *arg)
}
static void
-req0_pipe_stop(void *arg)
+req0_pipe_close(void *arg)
{
req0_pipe *p = arg;
req0_sock *s = p->req;
req0_ctx * ctx;
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_send);
-
- // At this point there should not be any further AIOs running.
- // Further, any completion tasks have completed.
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_send);
nni_mtx_lock(&s->mtx);
// This removes the node from either busypipes or readypipes.
@@ -837,6 +843,7 @@ static nni_proto_pipe_ops req0_pipe_ops = {
.pipe_init = req0_pipe_init,
.pipe_fini = req0_pipe_fini,
.pipe_start = req0_pipe_start,
+ .pipe_close = req0_pipe_close,
.pipe_stop = req0_pipe_stop,
};
diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c
index 4773677e..6dcfe6be 100644
--- a/src/protocol/reqrep0/xrep.c
+++ b/src/protocol/reqrep0/xrep.c
@@ -62,7 +62,6 @@ xrep0_sock_fini(void *arg)
{
xrep0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->lk);
@@ -108,7 +107,18 @@ xrep0_sock_close(void *arg)
{
xrep0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_aio_close(s->aio_getq);
+}
+
+static void
+xrep0_pipe_stop(void *arg)
+{
+ xrep0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
}
static void
@@ -178,16 +188,16 @@ xrep0_pipe_start(void *arg)
}
static void
-xrep0_pipe_stop(void *arg)
+xrep0_pipe_close(void *arg)
{
xrep0_pipe *p = arg;
xrep0_sock *s = p->rep;
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_putq);
nni_msgq_close(p->sendq);
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
nni_mtx_lock(&s->lk);
nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe));
@@ -389,6 +399,7 @@ static nni_proto_pipe_ops xrep0_pipe_ops = {
.pipe_init = xrep0_pipe_init,
.pipe_fini = xrep0_pipe_fini,
.pipe_start = xrep0_pipe_start,
+ .pipe_close = xrep0_pipe_close,
.pipe_stop = xrep0_pipe_stop,
};
diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c
index 13ae7418..793411af 100644
--- a/src/protocol/reqrep0/xreq.c
+++ b/src/protocol/reqrep0/xreq.c
@@ -90,6 +90,17 @@ xreq0_sock_fini(void *arg)
}
static void
+xreq0_pipe_stop(void *arg)
+{
+ xreq0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_send);
+}
+
+static void
xreq0_pipe_fini(void *arg)
{
xreq0_pipe *p = arg;
@@ -140,17 +151,14 @@ xreq0_pipe_start(void *arg)
}
static void
-xreq0_pipe_stop(void *arg)
+xreq0_pipe_close(void *arg)
{
xreq0_pipe *p = arg;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_send);
-
- // At this point there should not be any further AIOs running.
- // Further, any completion tasks have completed.
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_putq);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_send);
}
// For raw mode we can just let the pipes "contend" via getq to get a
@@ -277,6 +285,7 @@ static nni_proto_pipe_ops xreq0_pipe_ops = {
.pipe_init = xreq0_pipe_init,
.pipe_fini = xreq0_pipe_fini,
.pipe_start = xreq0_pipe_start,
+ .pipe_close = xreq0_pipe_close,
.pipe_stop = xreq0_pipe_stop,
};
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c
index e553f6ce..7738a8b7 100644
--- a/src/protocol/survey0/respond.c
+++ b/src/protocol/survey0/respond.c
@@ -290,6 +290,15 @@ resp0_sock_close(void *arg)
}
static void
+resp0_pipe_stop(void *arg)
+{
+ resp0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+}
+
+static void
resp0_pipe_fini(void *arg)
{
resp0_pipe *p = arg;
@@ -344,12 +353,15 @@ resp0_pipe_start(void *arg)
}
static void
-resp0_pipe_stop(void *arg)
+resp0_pipe_close(void *arg)
{
resp0_pipe *p = arg;
resp0_sock *s = p->psock;
resp0_ctx * ctx;
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+
nni_mtx_lock(&s->mtx);
while ((ctx = nni_list_first(&p->sendq)) != NULL) {
nni_aio *aio;
@@ -369,9 +381,6 @@ resp0_pipe_stop(void *arg)
}
nni_idhash_remove(s->pipes, p->id);
nni_mtx_unlock(&s->mtx);
-
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
}
static void
@@ -626,6 +635,7 @@ static nni_proto_pipe_ops resp0_pipe_ops = {
.pipe_init = resp0_pipe_init,
.pipe_fini = resp0_pipe_fini,
.pipe_start = resp0_pipe_start,
+ .pipe_close = resp0_pipe_close,
.pipe_stop = resp0_pipe_stop,
};
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index e725d2b3..51bce0c8 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -284,6 +284,16 @@ surv0_sock_close(void *arg)
}
static void
+surv0_pipe_stop(void *arg)
+{
+ surv0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+}
+
+static void
surv0_pipe_fini(void *arg)
{
surv0_pipe *p = arg;
@@ -338,14 +348,14 @@ surv0_pipe_start(void *arg)
}
static void
-surv0_pipe_stop(void *arg)
+surv0_pipe_close(void *arg)
{
surv0_pipe *p = arg;
surv0_sock *s = p->sock;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
nni_msgq_close(p->sendq);
@@ -532,6 +542,7 @@ static nni_proto_pipe_ops surv0_pipe_ops = {
.pipe_init = surv0_pipe_init,
.pipe_fini = surv0_pipe_fini,
.pipe_start = surv0_pipe_start,
+ .pipe_close = surv0_pipe_close,
.pipe_stop = surv0_pipe_stop,
};
diff --git a/src/protocol/survey0/xrespond.c b/src/protocol/survey0/xrespond.c
index 7aaed6da..bcbbcbc7 100644
--- a/src/protocol/survey0/xrespond.c
+++ b/src/protocol/survey0/xrespond.c
@@ -63,7 +63,6 @@ xresp0_sock_fini(void *arg)
{
xresp0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
nni_idhash_fini(s->pipes);
nni_mtx_fini(&s->mtx);
@@ -107,7 +106,18 @@ xresp0_sock_close(void *arg)
{
xresp0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_aio_close(s->aio_getq);
+}
+
+static void
+xresp0_pipe_stop(void *arg)
+{
+ xresp0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_putq);
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
}
static void
@@ -170,16 +180,17 @@ xresp0_pipe_start(void *arg)
}
static void
-xresp0_pipe_stop(void *arg)
+xresp0_pipe_close(void *arg)
{
xresp0_pipe *p = arg;
xresp0_sock *s = p->psock;
+ nni_aio_close(p->aio_putq);
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+
nni_msgq_close(p->sendq);
- nni_aio_stop(p->aio_putq);
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
nni_mtx_lock(&s->mtx);
nni_idhash_remove(s->pipes, p->id);
@@ -366,6 +377,7 @@ static nni_proto_pipe_ops xresp0_pipe_ops = {
.pipe_init = xresp0_pipe_init,
.pipe_fini = xresp0_pipe_fini,
.pipe_start = xresp0_pipe_start,
+ .pipe_close = xresp0_pipe_close,
.pipe_stop = xresp0_pipe_stop,
};
diff --git a/src/protocol/survey0/xsurvey.c b/src/protocol/survey0/xsurvey.c
index cf311b15..47ebef3c 100644
--- a/src/protocol/survey0/xsurvey.c
+++ b/src/protocol/survey0/xsurvey.c
@@ -61,7 +61,6 @@ xsurv0_sock_fini(void *arg)
{
xsurv0_sock *s = arg;
- nni_aio_stop(s->aio_getq);
nni_aio_fini(s->aio_getq);
nni_mtx_fini(&s->mtx);
NNI_FREE_STRUCT(s);
@@ -104,7 +103,18 @@ xsurv0_sock_close(void *arg)
{
xsurv0_sock *s = arg;
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_aio_close(s->aio_getq);
+}
+
+static void
+xsurv0_pipe_stop(void *arg)
+{
+ xsurv0_pipe *p = arg;
+
+ nni_aio_stop(p->aio_getq);
+ nni_aio_stop(p->aio_send);
+ nni_aio_stop(p->aio_recv);
+ nni_aio_stop(p->aio_putq);
}
static void
@@ -166,15 +176,15 @@ xsurv0_pipe_start(void *arg)
}
static void
-xsurv0_pipe_stop(void *arg)
+xsurv0_pipe_close(void *arg)
{
xsurv0_pipe *p = arg;
xsurv0_sock *s = p->psock;
- nni_aio_stop(p->aio_getq);
- nni_aio_stop(p->aio_send);
- nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
+ nni_aio_close(p->aio_getq);
+ nni_aio_close(p->aio_send);
+ nni_aio_close(p->aio_recv);
+ nni_aio_close(p->aio_putq);
nni_msgq_close(p->sendq);
@@ -338,6 +348,7 @@ static nni_proto_pipe_ops xsurv0_pipe_ops = {
.pipe_init = xsurv0_pipe_init,
.pipe_fini = xsurv0_pipe_fini,
.pipe_start = xsurv0_pipe_start,
+ .pipe_close = xsurv0_pipe_close,
.pipe_stop = xsurv0_pipe_stop,
};
diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c
index f3b16370..15d1f776 100644
--- a/src/supplemental/http/http_conn.c
+++ b/src/supplemental/http/http_conn.c
@@ -110,22 +110,26 @@ http_close(nni_http_conn *conn)
}
conn->closed = true;
- if (nni_list_first(&conn->wrq)) {
- nni_aio_abort(conn->wr_aio, NNG_ECLOSED);
- // Abort all operations except the one in flight.
- while ((aio = nni_list_last(&conn->wrq)) !=
- nni_list_first(&conn->wrq)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
+ nni_aio_close(conn->wr_aio);
+ nni_aio_close(conn->rd_aio);
+
+ if ((aio = conn->rd_uaio) != NULL) {
+ conn->rd_uaio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
- if (nni_list_first(&conn->rdq)) {
- nni_aio_abort(conn->rd_aio, NNG_ECLOSED);
- while ((aio = nni_list_last(&conn->rdq)) !=
- nni_list_first(&conn->rdq)) {
- nni_aio_list_remove(aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
- }
+ if ((aio = conn->wr_uaio) != NULL) {
+ conn->wr_uaio = NULL;
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+
+ // Abort all operations except the one in flight.
+ while ((aio = nni_list_first(&conn->wrq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ while ((aio = nni_list_first(&conn->rdq)) != NULL) {
+ nni_aio_list_remove(aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
}
if (conn->sock != NULL) {
@@ -668,6 +672,9 @@ nni_http_tls_verified(nni_http_conn *conn)
void
nni_http_conn_fini(nni_http_conn *conn)
{
+ nni_aio_stop(conn->wr_aio);
+ nni_aio_stop(conn->rd_aio);
+
nni_mtx_lock(&conn->mtx);
http_close(conn);
if ((conn->sock != NULL) && (conn->fini != NULL)) {
@@ -675,8 +682,7 @@ nni_http_conn_fini(nni_http_conn *conn)
conn->sock = NULL;
}
nni_mtx_unlock(&conn->mtx);
- nni_aio_stop(conn->wr_aio);
- nni_aio_stop(conn->rd_aio);
+
nni_aio_fini(conn->wr_aio);
nni_aio_fini(conn->rd_aio);
nni_free(conn->rd_buf, conn->rd_bufsz);
diff --git a/src/supplemental/http/http_server.c b/src/supplemental/http/http_server.c
index c92de586..c7738aee 100644
--- a/src/supplemental/http/http_server.c
+++ b/src/supplemental/http/http_server.c
@@ -229,12 +229,6 @@ http_sconn_reap(void *arg)
}
static void
-http_sconn_fini(http_sconn *sc)
-{
- nni_reap(&sc->reap, http_sconn_reap, sc);
-}
-
-static void
http_sconn_close_locked(http_sconn *sc)
{
nni_http_conn *conn;
@@ -245,15 +239,15 @@ http_sconn_close_locked(http_sconn *sc)
NNI_ASSERT(!sc->finished);
sc->closed = true;
- nni_aio_abort(sc->rxaio, NNG_ECLOSED);
- nni_aio_abort(sc->txaio, NNG_ECLOSED);
- nni_aio_abort(sc->txdataio, NNG_ECLOSED);
- nni_aio_abort(sc->cbaio, NNG_ECLOSED);
+ nni_aio_close(sc->rxaio);
+ nni_aio_close(sc->txaio);
+ nni_aio_close(sc->txdataio);
+ nni_aio_close(sc->cbaio);
if ((conn = sc->conn) != NULL) {
nni_http_conn_close(conn);
}
- http_sconn_fini(sc);
+ nni_reap(&sc->reap, http_sconn_reap, sc);
}
static void
diff --git a/src/supplemental/tls/mbedtls/tls.c b/src/supplemental/tls/mbedtls/tls.c
index f721d4aa..cdd226cd 100644
--- a/src/supplemental/tls/mbedtls/tls.c
+++ b/src/supplemental/tls/mbedtls/tls.c
@@ -761,6 +761,9 @@ nni_tls_close(nni_tls *tp)
{
nni_aio *aio;
+ nni_aio_close(tp->tcp_send);
+ nni_aio_close(tp->tcp_recv);
+
nni_mtx_lock(&tp->lk);
tp->tls_closed = true;
diff --git a/src/supplemental/websocket/websocket.c b/src/supplemental/websocket/websocket.c
index 9a3f5519..6d4d3c13 100644
--- a/src/supplemental/websocket/websocket.c
+++ b/src/supplemental/websocket/websocket.c
@@ -407,14 +407,15 @@ ws_close_cb(void *arg)
nni_ws *ws = arg;
ws_msg *wm;
+ nni_aio_close(ws->txaio);
+ nni_aio_close(ws->rxaio);
+ nni_aio_close(ws->httpaio);
+
// Either we sent a close frame, or we didn't. Either way,
// we are done, and its time to abort everything else.
nni_mtx_lock(&ws->mtx);
nni_http_conn_close(ws->http);
- nni_aio_abort(ws->txaio, NNG_ECLOSED);
- nni_aio_abort(ws->rxaio, NNG_ECLOSED);
- nni_aio_abort(ws->httpaio, NNG_ECLOSED);
// This list (receive) should be empty.
while ((wm = nni_list_first(&ws->rxmsgs)) != NULL) {
@@ -464,8 +465,8 @@ ws_close(nni_ws *ws, uint16_t code)
// pending connect request.
if (!ws->closed) {
// ABORT connection negotiation.
- nni_aio_abort(ws->connaio, NNG_ECLOSED);
- nni_aio_abort(ws->httpaio, NNG_ECLOSED);
+ nni_aio_close(ws->connaio);
+ nni_aio_close(ws->httpaio);
ws_send_close(ws, code);
}
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index 0f159d3a..bc51d971 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -198,7 +198,7 @@ nni_inproc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
}
ep->mode = mode;
- ep->proto = nni_sock_proto(sock);
+ ep->proto = nni_sock_proto_id(sock);
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
nni_aio_list_init(&ep->aios);
@@ -452,7 +452,7 @@ static nni_tran_pipe_option nni_inproc_pipe_options[] = {
},
};
-static nni_tran_pipe nni_inproc_pipe_ops = {
+static nni_tran_pipe_ops nni_inproc_pipe_ops = {
.p_fini = nni_inproc_pipe_fini,
.p_send = nni_inproc_pipe_send,
.p_recv = nni_inproc_pipe_recv,
@@ -468,7 +468,7 @@ static nni_tran_ep_option nni_inproc_ep_options[] = {
},
};
-static nni_tran_ep nni_inproc_ep_ops = {
+static nni_tran_ep_ops nni_inproc_ep_ops = {
.ep_init = nni_inproc_ep_init,
.ep_fini = nni_inproc_ep_fini,
.ep_connect = nni_inproc_ep_connect,
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 2347e24c..1740bfcb 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -81,6 +81,10 @@ nni_ipc_pipe_close(void *arg)
{
nni_ipc_pipe *pipe = arg;
+ nni_aio_close(pipe->rxaio);
+ nni_aio_close(pipe->txaio);
+ nni_aio_close(pipe->negaio);
+
nni_plat_ipc_pipe_close(pipe->ipp);
}
@@ -644,7 +648,7 @@ nni_ipc_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
nni_ipc_ep_fini(ep);
return (rv);
}
- ep->proto = nni_sock_proto(sock);
+ ep->proto = nni_sock_proto_id(sock);
*epp = ep;
return (0);
@@ -655,11 +659,11 @@ nni_ipc_ep_close(void *arg)
{
nni_ipc_ep *ep = arg;
+ nni_aio_close(ep->aio);
+
nni_mtx_lock(&ep->mtx);
nni_plat_ipc_ep_close(ep->iep);
nni_mtx_unlock(&ep->mtx);
-
- nni_aio_stop(ep->aio);
}
static int
@@ -883,7 +887,7 @@ static nni_tran_pipe_option nni_ipc_pipe_options[] = {
},
};
-static nni_tran_pipe nni_ipc_pipe_ops = {
+static nni_tran_pipe_ops nni_ipc_pipe_ops = {
.p_fini = nni_ipc_pipe_fini,
.p_start = nni_ipc_pipe_start,
.p_send = nni_ipc_pipe_send,
@@ -924,7 +928,7 @@ static nni_tran_ep_option nni_ipc_ep_options[] = {
},
};
-static nni_tran_ep nni_ipc_ep_ops = {
+static nni_tran_ep_ops nni_ipc_ep_ops = {
.ep_init = nni_ipc_ep_init,
.ep_fini = nni_ipc_ep_fini,
.ep_connect = nni_ipc_ep_connect,
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index f2cdf8ac..22217699 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -81,9 +81,13 @@ nni_tcp_tran_fini(void)
static void
nni_tcp_pipe_close(void *arg)
{
- nni_tcp_pipe *pipe = arg;
+ nni_tcp_pipe *p = arg;
+
+ nni_aio_close(p->rxaio);
+ nni_aio_close(p->txaio);
+ nni_aio_close(p->negaio);
- nni_plat_tcp_pipe_close(pipe->tpp);
+ nni_plat_tcp_pipe_close(p->tpp);
}
static void
@@ -666,7 +670,7 @@ nni_tcp_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
nni_tcp_ep_fini(ep);
return (rv);
}
- ep->proto = nni_sock_proto(sock);
+ ep->proto = nni_sock_proto_id(sock);
ep->mode = mode;
ep->nodelay = true;
ep->keepalive = false;
@@ -680,11 +684,11 @@ nni_tcp_ep_close(void *arg)
{
nni_tcp_ep *ep = arg;
+ nni_aio_close(ep->aio);
+
nni_mtx_lock(&ep->mtx);
nni_plat_tcp_ep_close(ep->tep);
nni_mtx_unlock(&ep->mtx);
-
- nni_aio_stop(ep->aio);
}
static int
@@ -910,7 +914,7 @@ static nni_tran_pipe_option nni_tcp_pipe_options[] = {
},
};
-static nni_tran_pipe nni_tcp_pipe_ops = {
+static nni_tran_pipe_ops nni_tcp_pipe_ops = {
.p_fini = nni_tcp_pipe_fini,
.p_start = nni_tcp_pipe_start,
.p_send = nni_tcp_pipe_send,
@@ -951,7 +955,7 @@ static nni_tran_ep_option nni_tcp_ep_options[] = {
},
};
-static nni_tran_ep nni_tcp_ep_ops = {
+static nni_tran_ep_ops nni_tcp_ep_ops = {
.ep_init = nni_tcp_ep_init,
.ep_fini = nni_tcp_ep_fini,
.ep_connect = nni_tcp_ep_connect,
diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c
index c863a85e..21557270 100644
--- a/src/transport/tls/tls.c
+++ b/src/transport/tls/tls.c
@@ -91,6 +91,10 @@ nni_tls_pipe_close(void *arg)
{
nni_tls_pipe *p = arg;
+ nni_aio_close(p->rxaio);
+ nni_aio_close(p->txaio);
+ nni_aio_close(p->negaio);
+
nni_tls_close(p->tls);
}
@@ -687,7 +691,7 @@ nni_tls_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
return (rv);
}
}
- ep->proto = nni_sock_proto(sock);
+ ep->proto = nni_sock_proto_id(sock);
ep->authmode = authmode;
*epp = ep;
@@ -699,11 +703,11 @@ nni_tls_ep_close(void *arg)
{
nni_tls_ep *ep = arg;
+ nni_aio_close(ep->aio);
+
nni_mtx_lock(&ep->mtx);
nni_plat_tcp_ep_close(ep->tep);
nni_mtx_unlock(&ep->mtx);
-
- nni_aio_stop(ep->aio);
}
static int
@@ -1036,7 +1040,7 @@ static nni_tran_pipe_option nni_tls_pipe_options[] = {
},
};
-static nni_tran_pipe nni_tls_pipe_ops = {
+static nni_tran_pipe_ops nni_tls_pipe_ops = {
.p_fini = nni_tls_pipe_fini,
.p_start = nni_tls_pipe_start,
.p_send = nni_tls_pipe_send,
@@ -1107,7 +1111,7 @@ static nni_tran_ep_option nni_tls_ep_options[] = {
},
};
-static nni_tran_ep nni_tls_ep_ops = {
+static nni_tran_ep_ops nni_tls_ep_ops = {
.ep_init = nni_tls_ep_init,
.ep_fini = nni_tls_ep_fini,
.ep_connect = nni_tls_ep_connect,
diff --git a/src/transport/ws/websocket.c b/src/transport/ws/websocket.c
index 12f3aeb5..a4081b25 100644
--- a/src/transport/ws/websocket.c
+++ b/src/transport/ws/websocket.c
@@ -205,6 +205,9 @@ ws_pipe_close(void *arg)
{
ws_pipe *p = arg;
+ nni_aio_close(p->rxaio);
+ nni_aio_close(p->txaio);
+
nni_mtx_lock(&p->mtx);
nni_ws_close(p->ws);
nni_mtx_unlock(&p->mtx);
@@ -588,7 +591,7 @@ static nni_tran_pipe_option ws_pipe_options[] = {
}
};
-static nni_tran_pipe ws_pipe_ops = {
+static nni_tran_pipe_ops ws_pipe_ops = {
.p_fini = ws_pipe_fini,
.p_send = ws_pipe_send,
.p_recv = ws_pipe_recv,
@@ -690,6 +693,9 @@ ws_ep_close(void *arg)
{
ws_ep *ep = arg;
+ nni_aio_close(ep->accaio);
+ nni_aio_close(ep->connaio);
+
if (ep->mode == NNI_EP_MODE_LISTEN) {
nni_ws_listener_close(ep->listener);
} else {
@@ -750,8 +756,8 @@ ws_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
nni_aio_list_init(&ep->aios);
ep->mode = mode;
- ep->lproto = nni_sock_proto(sock);
- ep->rproto = nni_sock_peer(sock);
+ ep->lproto = nni_sock_proto_id(sock);
+ ep->rproto = nni_sock_peer_id(sock);
if (mode == NNI_EP_MODE_DIAL) {
pname = nni_sock_peer_name(sock);
@@ -795,7 +801,7 @@ ws_tran_fini(void)
{
}
-static nni_tran_ep ws_ep_ops = {
+static nni_tran_ep_ops ws_ep_ops = {
.ep_init = ws_ep_init,
.ep_fini = ws_ep_fini,
.ep_connect = ws_ep_connect,
@@ -1015,7 +1021,7 @@ static nni_tran_ep_option wss_ep_options[] = {
},
};
-static nni_tran_ep wss_ep_ops = {
+static nni_tran_ep_ops wss_ep_ops = {
.ep_init = ws_ep_init,
.ep_fini = ws_ep_fini,
.ep_connect = ws_ep_connect,
diff --git a/src/transport/zerotier/zerotier.c b/src/transport/zerotier/zerotier.c
index f7139b94..46fe476e 100644
--- a/src/transport/zerotier/zerotier.c
+++ b/src/transport/zerotier/zerotier.c
@@ -2182,7 +2182,7 @@ zt_ep_init(void **epp, nni_url *url, nni_sock *sock, int mode)
ep->ze_ping_time = zt_ping_time;
ep->ze_conn_time = zt_conn_time;
ep->ze_conn_tries = zt_conn_tries;
- ep->ze_proto = nni_sock_proto(sock);
+ ep->ze_proto = nni_sock_proto_id(sock);
nni_aio_list_init(&ep->ze_aios);
@@ -2888,7 +2888,7 @@ static nni_tran_pipe_option zt_pipe_options[] = {
},
};
-static nni_tran_pipe zt_pipe_ops = {
+static nni_tran_pipe_ops zt_pipe_ops = {
.p_fini = zt_pipe_fini,
.p_start = zt_pipe_start,
.p_send = zt_pipe_send,
@@ -2983,7 +2983,7 @@ static nni_tran_ep_option zt_ep_options[] = {
},
};
-static nni_tran_ep zt_ep_ops = {
+static nni_tran_ep_ops zt_ep_ops = {
.ep_init = zt_ep_init,
.ep_fini = zt_ep_fini,
.ep_connect = zt_ep_connect,