aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-05-15 01:47:12 -0700
committerGitHub <noreply@github.com>2018-05-15 01:47:12 -0700
commit1d033484ee1a2ec26d3eead073e7bc0f889ffdf4 (patch)
tree15d3897d405cb0beb1ada6270ecf70241451ca70 /src/core
parent16b4c4019c7b7904de171c588ed8c72ca732d2cf (diff)
downloadnng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.tar.gz
nng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.tar.bz2
nng-1d033484ee1a2ec26d3eead073e7bc0f889ffdf4.zip
fixes #419 want to nni_aio_stop without blocking (#428)
* fixes #419 want to nni_aio_stop without blocking This actually introduces an nni_aio_close() API that causes nni_aio_begin to return NNG_ECLOSED, while scheduling a callback on the AIO to do an NNG_ECLOSED as well. This should be called in non-blocking close() contexts instead of nni_aio_stop(), and the cases where we call nni_aio_fini() multiple times are updated updated to add nni_aio_stop() calls on all "interlinked" aios before finalizing them. Furthermore, we call nni_aio_close() as soon as practical in the close path. This closes an annoying race condition where the callback from a lower subsystem could wind up rescheduling an operation that we wanted to abort.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/aio.c45
-rw-r--r--src/core/aio.h6
-rw-r--r--src/core/defs.h4
-rw-r--r--src/core/device.c4
-rw-r--r--src/core/endpt.c64
-rw-r--r--src/core/pipe.c93
-rw-r--r--src/core/pipe.h1
-rw-r--r--src/core/protocol.h19
-rw-r--r--src/core/socket.c54
-rw-r--r--src/core/socket.h7
-rw-r--r--src/core/transport.c2
-rw-r--r--src/core/transport.h8
12 files changed, 171 insertions, 136 deletions
diff --git a/src/core/aio.c b/src/core/aio.c
index a5b6c088..2f90aa34 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -63,10 +63,10 @@ struct nng_aio {
nni_duration a_timeout; // Relative timeout
// These fields are private to the aio framework.
- bool a_stop; // shutting down (no new operations)
- bool a_sleep; // sleeping with no action
- int a_sleeprv; // result when sleep wakes
- int a_cancelrv; // if canceled between begin and schedule
+ bool a_stop; // shutting down (no new operations)
+ bool a_closed; // close called, but not fini (yet)
+ bool a_sleep; // sleeping with no action
+ int a_sleeprv; // result when sleep wakes
nni_task *a_task;
// Read/write operations.
@@ -205,6 +205,24 @@ nni_aio_stop(nni_aio *aio)
}
void
+nni_aio_close(nni_aio *aio)
+{
+ if (aio != NULL) {
+ nni_aio_cancelfn cancelfn;
+
+ nni_mtx_lock(&nni_aio_lk);
+ cancelfn = aio->a_prov_cancel;
+ aio->a_prov_cancel = NULL;
+ aio->a_closed = true;
+ nni_mtx_unlock(&nni_aio_lk);
+
+ if (cancelfn != NULL) {
+ cancelfn(aio, NNG_ECLOSED);
+ }
+ }
+}
+
+void
nni_aio_set_timeout(nni_aio *aio, nni_duration when)
{
aio->a_timeout = when;
@@ -306,11 +324,18 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_prov_cancel = NULL;
aio->a_prov_data = NULL;
- aio->a_cancelrv = 0;
for (unsigned i = 0; i < NNI_NUM_ELEMENTS(aio->a_outputs); i++) {
aio->a_outputs[i] = NULL;
}
nni_task_prep(aio->a_task);
+ if (aio->a_closed) {
+ aio->a_result = NNG_ECLOSED;
+ aio->a_expire = NNI_TIME_NEVER;
+ aio->a_sleep = false;
+ nni_mtx_unlock(&nni_aio_lk);
+ nni_task_dispatch(aio->a_task);
+ return (NNG_ECLOSED);
+ }
nni_mtx_unlock(&nni_aio_lk);
return (0);
}
@@ -318,7 +343,6 @@ nni_aio_begin(nni_aio *aio)
int
nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
{
- int rv;
if (!aio->a_sleep) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
@@ -339,19 +363,16 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data)
nni_mtx_unlock(&nni_aio_lk);
return (NNG_ECANCELED);
}
- if ((rv = aio->a_cancelrv) != 0) {
+ if (aio->a_closed) {
nni_mtx_unlock(&nni_aio_lk);
- return (rv);
+ return (NNG_ECLOSED);
}
// If cancellation occurred in between "begin" and "schedule",
// then cancel it right now.
aio->a_prov_cancel = cancelfn;
aio->a_prov_data = data;
- if ((rv = aio->a_cancelrv) != 0) {
- aio->a_expire = 0;
- nni_aio_expire_add(aio);
- } else if (aio->a_expire != NNI_TIME_NEVER) {
+ if (aio->a_expire != NNI_TIME_NEVER) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&nni_aio_lk);
diff --git a/src/core/aio.h b/src/core/aio.h
index 2ed0fb5b..5462c40b 100644
--- a/src/core/aio.h
+++ b/src/core/aio.h
@@ -42,6 +42,12 @@ extern void nni_aio_fini(nni_aio *);
// use nni_aio_cancel instead.)
extern void nni_aio_stop(nni_aio *);
+// nni_aio_close closes the aio for further activity. It aborts any in-progress
+// transaction (if it can), and future calls nni_aio_begin or nni_aio_schedule
+// with both result in NNG_ECLOSED. The expectation is that protocols call this
+// for all their aios in a stop routine, before calling fini on any of them.
+extern void nni_aio_close(nni_aio *);
+
// nni_aio_set_data sets user data. This should only be done by the
// consumer, initiating the I/O. The intention is to be able to store
// additional data for use when the operation callback is executed.
diff --git a/src/core/defs.h b/src/core/defs.h
index b9c9b7e5..29d1b826 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -45,9 +45,9 @@ typedef struct nni_ctx nni_ctx;
typedef struct nni_ep nni_ep;
typedef struct nni_pipe nni_pipe;
typedef struct nni_tran nni_tran;
-typedef struct nni_tran_ep nni_tran_ep;
+typedef struct nni_tran_ep_ops nni_tran_ep_ops;
typedef struct nni_tran_ep_option nni_tran_ep_option;
-typedef struct nni_tran_pipe nni_tran_pipe;
+typedef struct nni_tran_pipe_ops nni_tran_pipe_ops;
typedef struct nni_tran_pipe_option nni_tran_pipe_option;
typedef struct nni_proto_ctx_option nni_proto_ctx_option;
diff --git a/src/core/device.c b/src/core/device.c
index 1f3bf233..0fd23add 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -120,8 +120,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2)
if ((s1 == NULL) || (s2 == NULL)) {
return (NNG_EINVAL);
}
- if ((nni_sock_peer(s1) != nni_sock_proto(s2)) ||
- (nni_sock_peer(s2) != nni_sock_proto(s1))) {
+ if ((nni_sock_peer_id(s1) != nni_sock_proto_id(s2)) ||
+ (nni_sock_peer_id(s2) != nni_sock_proto_id(s1))) {
return (NNG_EINVAL);
}
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 7593fb42..87566d42 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -15,30 +15,30 @@
#include <string.h>
struct nni_ep {
- nni_tran_ep ep_ops; // transport ops
- nni_tran * ep_tran; // transport pointer
- void * ep_data; // transport private
- uint64_t ep_id; // endpoint id
- nni_list_node ep_node; // per socket list
- nni_sock * ep_sock;
- nni_url * ep_url;
- int ep_mode;
- int ep_started;
- int ep_closed; // full shutdown
- int ep_closing; // close pending (waiting on refcnt)
- int ep_refcnt;
- bool ep_tmo_run;
- nni_mtx ep_mtx;
- nni_cv ep_cv;
- nni_list ep_pipes;
- nni_aio * ep_acc_aio;
- nni_aio * ep_con_aio;
- nni_aio * ep_con_syn; // used for sync connect
- nni_aio * ep_tmo_aio; // backoff timer
- nni_duration ep_maxrtime; // maximum time for reconnect
- nni_duration ep_currtime; // current time for reconnect
- nni_duration ep_inirtime; // initial time for reconnect
- nni_time ep_conntime; // time of last good connect
+ nni_tran_ep_ops ep_ops; // transport ops
+ nni_tran * ep_tran; // transport pointer
+ void * ep_data; // transport private
+ uint64_t ep_id; // endpoint id
+ nni_list_node ep_node; // per socket list
+ nni_sock * ep_sock;
+ nni_url * ep_url;
+ int ep_mode;
+ int ep_started;
+ int ep_closed; // full shutdown
+ int ep_closing; // close pending (waiting on refcnt)
+ int ep_refcnt;
+ bool ep_tmo_run;
+ nni_mtx ep_mtx;
+ nni_cv ep_cv;
+ nni_list ep_pipes;
+ nni_aio * ep_acc_aio;
+ nni_aio * ep_con_aio;
+ nni_aio * ep_con_syn; // used for sync connect
+ nni_aio * ep_tmo_aio; // backoff timer
+ nni_duration ep_maxrtime; // maximum time for reconnect
+ nni_duration ep_currtime; // current time for reconnect
+ nni_duration ep_inirtime; // initial time for reconnect
+ nni_time ep_conntime; // time of last good connect
};
// Functionality related to end points.
@@ -249,10 +249,10 @@ nni_ep_shutdown(nni_ep *ep)
nni_mtx_unlock(&ep->ep_mtx);
// Abort any remaining in-flight operations.
- nni_aio_abort(ep->ep_acc_aio, NNG_ECLOSED);
- nni_aio_abort(ep->ep_con_aio, NNG_ECLOSED);
- nni_aio_abort(ep->ep_con_syn, NNG_ECLOSED);
- nni_aio_abort(ep->ep_tmo_aio, NNG_ECLOSED);
+ nni_aio_close(ep->ep_acc_aio);
+ nni_aio_close(ep->ep_con_aio);
+ nni_aio_close(ep->ep_con_syn);
+ nni_aio_close(ep->ep_tmo_aio);
// Stop the underlying transport.
ep->ep_ops.ep_close(ep->ep_data);
@@ -276,10 +276,10 @@ nni_ep_close(nni_ep *ep)
nni_ep_shutdown(ep);
- nni_aio_stop(ep->ep_acc_aio);
- nni_aio_stop(ep->ep_con_aio);
- nni_aio_stop(ep->ep_con_syn);
- nni_aio_stop(ep->ep_tmo_aio);
+ nni_aio_close(ep->ep_acc_aio);
+ nni_aio_close(ep->ep_con_aio);
+ nni_aio_close(ep->ep_con_syn);
+ nni_aio_close(ep->ep_tmo_aio);
nni_mtx_lock(&ep->ep_mtx);
NNI_LIST_FOREACH (&ep->ep_pipes, p) {
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 010d306d..ba3027cf 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -18,21 +18,22 @@
// performed in the context of the protocol.
struct nni_pipe {
- uint32_t p_id;
- nni_tran_pipe p_tran_ops;
- void * p_tran_data;
- void * p_proto_data;
- nni_list_node p_sock_node;
- nni_list_node p_ep_node;
- nni_sock * p_sock;
- nni_ep * p_ep;
- bool p_closed;
- bool p_stop;
- int p_refcnt;
- nni_mtx p_mtx;
- nni_cv p_cv;
- nni_list_node p_reap_node;
- nni_aio * p_start_aio;
+ uint32_t p_id;
+ nni_tran_pipe_ops p_tran_ops;
+ nni_proto_pipe_ops p_proto_ops;
+ void * p_tran_data;
+ void * p_proto_data;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
+ nni_sock * p_sock;
+ nni_ep * p_ep;
+ bool p_closed;
+ bool p_stop;
+ int p_refcnt;
+ nni_mtx p_mtx;
+ nni_cv p_cv;
+ nni_list_node p_reap_node;
+ nni_aio * p_start_aio;
};
static nni_idhash *nni_pipes;
@@ -106,6 +107,10 @@ nni_pipe_destroy(nni_pipe *p)
// Stop any pending negotiation.
nni_aio_stop(p->p_start_aio);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_stop(p->p_proto_data);
+ }
+
// We have exclusive access at this point, so we can check if
// we are still on any lists.
if (nni_list_node_active(&p->p_ep_node)) {
@@ -126,6 +131,9 @@ nni_pipe_destroy(nni_pipe *p)
}
nni_mtx_unlock(&nni_pipe_lk);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_fini(p->p_proto_data);
+ }
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
@@ -189,6 +197,9 @@ nni_pipe_send(nni_pipe *p, nni_aio *aio)
void
nni_pipe_close(nni_pipe *p)
{
+ // abort any pending negotiation/start process.
+ nni_aio_close(p->p_start_aio);
+
nni_mtx_lock(&p->p_mtx);
if (p->p_closed) {
// We already did a close.
@@ -196,16 +207,16 @@ nni_pipe_close(nni_pipe *p)
return;
}
p->p_closed = true;
+ nni_mtx_unlock(&p->p_mtx);
+
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_close(p->p_proto_data);
+ }
// Close the underlying transport.
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_close(p->p_tran_data);
}
-
- nni_mtx_unlock(&p->p_mtx);
-
- // abort any pending negotiation/start process.
- nni_aio_abort(p->p_start_aio, NNG_ECLOSED);
}
bool
@@ -222,7 +233,6 @@ void
nni_pipe_stop(nni_pipe *p)
{
// Guard against recursive calls.
- nni_pipe_close(p);
nni_mtx_lock(&p->p_mtx);
if (p->p_stop) {
nni_mtx_unlock(&p->p_mtx);
@@ -231,6 +241,8 @@ nni_pipe_stop(nni_pipe *p)
p->p_stop = true;
nni_mtx_unlock(&p->p_mtx);
+ nni_pipe_close(p);
+
// Put it on the reaplist for async cleanup
nni_mtx_lock(&nni_pipe_reap_lk);
nni_list_append(&nni_pipe_reap_list, p);
@@ -250,12 +262,9 @@ nni_pipe_start_cb(void *arg)
nni_pipe *p = arg;
nni_aio * aio = p->p_start_aio;
- if (nni_aio_result(aio) != 0) {
- nni_pipe_stop(p);
- return;
- }
-
- if (nni_sock_pipe_start(p->p_sock, p) != 0) {
+ if ((nni_aio_result(aio) != 0) ||
+ (nni_sock_pipe_start(p->p_sock, p) != 0) ||
+ (p->p_proto_ops.pipe_start(p->p_proto_data) != 0)) {
nni_pipe_stop(p);
}
}
@@ -263,10 +272,12 @@ nni_pipe_start_cb(void *arg)
int
nni_pipe_create(nni_ep *ep, void *tdata)
{
- nni_pipe *p;
- int rv;
- nni_tran *tran = nni_ep_tran(ep);
- nni_sock *sock = nni_ep_sock(ep);
+ nni_pipe * p;
+ int rv;
+ nni_tran * tran = nni_ep_tran(ep);
+ nni_sock * sock = nni_ep_sock(ep);
+ void * sdata = nni_sock_proto_data(sock);
+ nni_proto_pipe_ops *pops = nni_sock_proto_pipe_ops(sock);
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
// In this case we just toss the pipe...
@@ -277,6 +288,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
// Make a private copy of the transport ops.
p->p_tran_ops = *tran->tran_pipe;
p->p_tran_data = tdata;
+ p->p_proto_ops = *pops;
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
@@ -290,6 +302,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_init(&p->p_mtx);
nni_cv_init(&p->p_cv, &nni_pipe_lk);
+
if ((rv = nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p)) == 0) {
uint64_t id;
nni_mtx_lock(&nni_pipe_lk);
@@ -299,11 +312,19 @@ nni_pipe_create(nni_ep *ep, void *tdata)
nni_mtx_unlock(&nni_pipe_lk);
}
- if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
- ((rv = nni_sock_pipe_add(sock, p)) != 0)) {
+ if ((rv != 0) ||
+ ((rv = pops->pipe_init(&p->p_proto_data, p, sdata)) != 0) ||
+ ((rv = nni_ep_pipe_add(ep, p)) != 0)) {
nni_pipe_destroy(p);
+ return (rv);
}
+ // At this point the endpoint knows about it, and the protocol
+ // might too, so on failure we have to tear it down fully as if done
+ // after a successful result.
+ if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
+ nni_pipe_stop(p);
+ }
return (rv);
}
@@ -339,12 +360,6 @@ nni_pipe_get_proto_data(nni_pipe *p)
}
void
-nni_pipe_set_proto_data(nni_pipe *p, void *data)
-{
- p->p_proto_data = data;
-}
-
-void
nni_pipe_sock_list_init(nni_list *list)
{
NNI_LIST_INIT(list, nni_pipe, p_sock_node);
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 18a59ddb..4ccbeae2 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -65,7 +65,6 @@ extern int nni_pipe_getopt(nni_pipe *, const char *, void *, size_t *, int);
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
extern void *nni_pipe_get_proto_data(nni_pipe *);
-extern void nni_pipe_set_proto_data(nni_pipe *, void *);
// nni_pipe_sock_list_init initializes a list of pipes, to be used by
// a per-socket list.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 964aee1a..9c3b4d33 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -34,16 +34,19 @@ struct nni_proto_pipe_ops {
// pipe_start is called to register a pipe with the protocol. The
// protocol can reject this, for example if another pipe is already
- // active on a 1:1 protocol. The protocol may not block during this,
- // as the socket lock is held.
+ // active on a 1:1 protocol. The protocol may not block during this.
int (*pipe_start)(void *);
- // pipe_stop is called to unregister a pipe from the protocol.
- // Threads may still acccess data structures, so the protocol
- // should not free anything yet. This is called with the socket
- // lock held, so the protocol may not call back into the socket, and
- // must not block. This operation must be idempotent, and may
- // be called even if pipe_start was not.
+ // pipe_close is an idempotent, non-blocking, operation, called
+ // when the pipe is being closed. Any operations pending on the
+ // pipe should be canceled with NNG_ECLOSED. (Best option is to
+ // use nng_aio_close() on them)
+ void (*pipe_close)(void *);
+
+ // pipe_stop is called during finalization, to ensure that
+ // the protocol is absolutely finished with the pipe. It should
+ // wait if necessary to ensure that the pipe is not referenced
+ // anymore by the protocol. It should not destroy resources.
void (*pipe_stop)(void *);
};
diff --git a/src/core/socket.c b/src/core/socket.c
index 66580300..f14c15fd 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -253,13 +253,13 @@ nni_sock_setopt_sockname(nni_sock *s, const void *buf, size_t sz, int typ)
static int
nni_sock_getopt_proto(nni_sock *s, void *buf, size_t *szp, int typ)
{
- return (nni_copyout_int(nni_sock_proto(s), buf, szp, typ));
+ return (nni_copyout_int(nni_sock_proto_id(s), buf, szp, typ));
}
static int
nni_sock_getopt_peer(nni_sock *s, void *buf, size_t *szp, int typ)
{
- return (nni_copyout_int(nni_sock_peer(s), buf, szp, typ));
+ return (nni_copyout_int(nni_sock_peer_id(s), buf, szp, typ));
}
static int
@@ -434,9 +434,7 @@ nni_sock_rele(nni_sock *s)
int
nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- void * pdata = nni_pipe_get_proto_data(pipe);
nng_pipe_cb cb;
- int rv;
NNI_ASSERT(s != NULL);
nni_mtx_lock(&s->s_mx);
@@ -464,26 +462,13 @@ nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
return (NNG_ECLOSED);
}
- // Protocol can reject for other reasons.
- // This must be the last operation, until this point
- // the protocol has not actually "seen" the pipe.
- rv = s->s_pipe_ops.pipe_start(pdata);
-
nni_mtx_unlock(&s->s_mx);
- return (rv);
+ return (0);
}
int
nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
{
- int rv;
- void *pdata;
-
- if ((rv = s->s_pipe_ops.pipe_init(&pdata, p, s->s_data)) != 0) {
- return (rv);
- }
- nni_pipe_set_proto_data(p, pdata);
-
// Initialize protocol pipe data.
nni_mtx_lock(&s->s_mx);
if (s->s_closing) {
@@ -503,7 +488,6 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- void * pdata;
nng_pipe_cb cb;
nni_mtx_lock(&sock->s_mx);
@@ -515,14 +499,8 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
cb(p, NNG_PIPE_REM, arg);
nni_mtx_lock(&sock->s_mx);
}
- pdata = nni_pipe_get_proto_data(pipe);
- if (pdata != NULL) {
- sock->s_pipe_ops.pipe_stop(pdata);
- nni_pipe_set_proto_data(pipe, NULL);
- if (nni_list_active(&sock->s_pipes, pipe)) {
- nni_list_remove(&sock->s_pipes, pipe);
- }
- sock->s_pipe_ops.pipe_fini(pdata);
+ if (nni_list_active(&sock->s_pipes, pipe)) {
+ nni_list_remove(&sock->s_pipes, pipe);
}
if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
nni_cv_wake(&sock->s_cv);
@@ -589,9 +567,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_ASSERT(s->s_sock_ops.sock_open != NULL);
NNI_ASSERT(s->s_sock_ops.sock_close != NULL);
- NNI_ASSERT(s->s_pipe_ops.pipe_start != NULL);
- NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL);
-
NNI_LIST_NODE_INIT(&s->s_node);
NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
@@ -883,15 +858,16 @@ nni_sock_recv(nni_sock *sock, nni_aio *aio)
sock->s_sock_ops.sock_recv(sock->s_data, aio);
}
-// nni_sock_protocol returns the socket's 16-bit protocol number.
+// nni_sock_proto_id returns the socket's 16-bit protocol number.
uint16_t
-nni_sock_proto(nni_sock *sock)
+nni_sock_proto_id(nni_sock *sock)
{
return (sock->s_self_id.p_id);
}
+// nni_sock_peer_id returns the socket peer's 16-bit protocol number.
uint16_t
-nni_sock_peer(nni_sock *sock)
+nni_sock_peer_id(nni_sock *sock)
{
return (sock->s_peer_id.p_id);
}
@@ -908,6 +884,18 @@ nni_sock_peer_name(nni_sock *sock)
return (sock->s_peer_id.p_name);
}
+struct nni_proto_pipe_ops *
+nni_sock_proto_pipe_ops(nni_sock *sock)
+{
+ return (&sock->s_pipe_ops);
+}
+
+void *
+nni_sock_proto_data(nni_sock *sock)
+{
+ return (sock->s_data);
+}
+
void
nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index bccda5b3..22a13ef7 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -20,10 +20,13 @@ extern int nni_sock_open(nni_sock **, const nni_proto *);
extern void nni_sock_close(nni_sock *);
extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
-extern uint16_t nni_sock_proto(nni_sock *);
-extern uint16_t nni_sock_peer(nni_sock *);
+extern uint16_t nni_sock_proto_id(nni_sock *);
+extern uint16_t nni_sock_peer_id(nni_sock *);
extern const char *nni_sock_proto_name(nni_sock *);
extern const char *nni_sock_peer_name(nni_sock *);
+extern void * nni_sock_proto_data(nni_sock *);
+
+extern struct nni_proto_pipe_ops *nni_sock_proto_pipe_ops(nni_sock *);
extern int nni_sock_setopt(
nni_sock *, const char *, const void *, size_t, int);
diff --git a/src/core/transport.c b/src/core/transport.c
index 94e5caa1..0cd84461 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -118,7 +118,7 @@ nni_tran_chkopt(const char *name, const void *v, size_t sz, int typ)
nni_mtx_lock(&nni_tran_lk);
NNI_LIST_FOREACH (&nni_tran_list, t) {
- const nni_tran_ep * ep;
+ const nni_tran_ep_ops * ep;
const nni_tran_ep_option *eo;
// Generally we look for endpoint options.
diff --git a/src/core/transport.h b/src/core/transport.h
index b1fecaa2..96976efd 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -23,10 +23,10 @@ struct nni_tran {
const char *tran_scheme;
// tran_ep links our endpoint-specific operations.
- const nni_tran_ep *tran_ep;
+ const nni_tran_ep_ops *tran_ep;
// tran_pipe links our pipe-specific operations.
- const nni_tran_pipe *tran_pipe;
+ const nni_tran_pipe_ops *tran_pipe;
// tran_init, if not NULL, is called once during library
// initialization.
@@ -77,7 +77,7 @@ struct nni_tran_ep_option {
// For a given endpoint, the framework holds a lock so that each entry
// point is run exclusively of the others. (Transports must still guard
// against any asynchronous operations they manage themselves, though.)
-struct nni_tran_ep {
+struct nni_tran_ep_ops {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
int (*ep_init)(void **, nni_url *, nni_sock *, int);
@@ -128,7 +128,7 @@ struct nni_tran_pipe_option {
// with socket locks held, so it is forbidden for the transport to call
// back into the socket at this point. (Which is one reason pointers back
// to socket or even enclosing pipe state, are not provided.)
-struct nni_tran_pipe {
+struct nni_tran_pipe_ops {
// p_fini destroys the pipe. This should clean up all local
// resources, including closing files and freeing memory, used by
// the pipe. After this call returns, the system will not make