aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/pipe.c38
-rw-r--r--src/core/pipe.h20
-rw-r--r--src/core/protocol.h8
-rw-r--r--src/core/socket.c28
-rw-r--r--src/core/socket.h6
5 files changed, 81 insertions, 19 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 18c47c60..dd3b2984 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -140,19 +140,18 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
p->p_tran_data = NULL;
p->p_proto_data = NULL;
p->p_id = 0;
- NNI_LIST_NODE_INIT(&p->p_node);
+ NNI_LIST_NODE_INIT(&p->p_sock_node);
+ NNI_LIST_NODE_INIT(&p->p_ep_node);
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *ep->ep_tran->tran_pipe;
- if ((rv = ops->pipe_init(&pdata, p, sock->s_data)) != 0) {
+ if ((rv = nni_sock_pipe_add(sock, p)) != 0) {
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
return (rv);
}
- p->p_proto_data = pdata;
- nni_sock_pipe_add(sock, p);
*pp = p;
return (0);
@@ -169,9 +168,6 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_destroy(p->p_tran_data);
}
- if (p->p_proto_data != NULL) {
- p->p_sock->s_pipe_ops.pipe_fini(p->p_proto_data);
- }
nni_sock_pipe_rem(p->p_sock, p);
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
@@ -214,3 +210,31 @@ nni_pipe_start(nni_pipe *p)
return (0);
}
+
+
+void
+nni_pipe_set_proto_data(nni_pipe *p, void *data)
+{
+ p->p_proto_data = data;
+}
+
+
+void *
+nni_pipe_get_proto_data(nni_pipe *p)
+{
+ return (p->p_proto_data);
+}
+
+
+void
+nni_pipe_sock_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_pipe, p_sock_node);
+}
+
+
+void
+nni_pipe_ep_list_init(nni_list *list)
+{
+ NNI_LIST_INIT(list, nni_pipe, p_ep_node);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 6cabf4e7..e3539e1f 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -23,7 +23,8 @@ struct nni_pipe {
nni_tran_pipe p_tran_ops;
void * p_tran_data;
void * p_proto_data;
- nni_list_node p_node;
+ nni_list_node p_sock_node;
+ nni_list_node p_ep_node;
nni_sock * p_sock;
nni_ep * p_ep;
int p_reap;
@@ -55,4 +56,21 @@ extern uint16_t nni_pipe_peer(nni_pipe *);
extern int nni_pipe_start(nni_pipe *);
extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
+// nni_pipe_set_proto_data sets the protocol private data. No locking is
+// performed, and this routine should only be called once per pipe at
+// initialization.
+extern void nni_pipe_set_proto_data(nni_pipe *, void *);
+
+// nni_pipe_get_proto_data gets the protocol private data set with the
+// nni_pipe_set_proto_data function. No locking is performed.
+extern void *nni_pipe_get_proto_data(nni_pipe *);
+
+// nni_pipe_sock_list_init initializes a list of pipes, to be used by
+// a per-socket list.
+extern void nni_pipe_sock_list_init(nni_list *);
+
+// nni_pipe_ep_list_init initializes a list of pipes, to be used by
+// a per-endpoint list.
+extern void nni_pipe_ep_list_init(nni_list *);
+
#endif // CORE_PIPE_H
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 29befab2..5f7fc4c8 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -24,25 +24,25 @@
struct nni_proto_pipe_ops {
// pipe_init creates the protocol-specific per pipe data structure.
// The last argument is the per-socket protocol private data.
- int (*pipe_init)(void **, nni_pipe *, void *);
+ int (*pipe_init)(void **, nni_pipe *, void *);
// pipe_fini releases any pipe data structures. This is called after
// the pipe has been removed from the protocol, and the generic
// pipe threads have been stopped.
- void (*pipe_fini)(void *);
+ void (*pipe_fini)(void *);
// pipe_add is called to register a pipe with the protocol. The
// protocol can reject this, for example if another pipe is already
// active on a 1:1 protocol. The protocol may not block during this,
// as the socket lock is held.
- int (*pipe_add)(void *);
+ int (*pipe_add)(void *);
// pipe_rem is called to unregister a pipe from the protocol.
// Threads may still acccess data structures, so the protocol
// should not free anything yet. This is called with the socket
// lock held, so the protocol may not call back into the socket, and
// must not block.
- void (*pipe_rem)(void *);
+ void (*pipe_rem)(void *);
};
struct nni_proto_sock_ops {
diff --git a/src/core/socket.c b/src/core/socket.c
index abbd2c2c..42de8fee 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -128,12 +128,21 @@ nni_sock_held_close(nni_sock *sock)
}
-void
+int
nni_sock_pipe_add(nni_sock *sock, nni_pipe *pipe)
{
+ int rv;
+ void *pdata;
+
+ rv = sock->s_pipe_ops.pipe_init(&pdata, pipe, sock->s_data);
+ if (rv != 0) {
+ return (rv);
+ }
nni_mtx_lock(&sock->s_mx);
+ nni_pipe_set_proto_data(pipe, pdata);
nni_list_append(&sock->s_pipes, pipe);
nni_mtx_unlock(&sock->s_mx);
+ return (0);
}
@@ -141,6 +150,7 @@ int
nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
{
int rv;
+ void *pdata = nni_pipe_get_proto_data(pipe);
nni_mtx_lock(&sock->s_mx);
@@ -153,7 +163,7 @@ nni_sock_pipe_ready(nni_sock *sock, nni_pipe *pipe)
return (NNG_EPROTO);
}
- if ((rv = sock->s_pipe_ops.pipe_add(pipe->p_proto_data)) != 0) {
+ if ((rv = sock->s_pipe_ops.pipe_add(pdata)) != 0) {
nni_mtx_unlock(&sock->s_mx);
return (rv);
}
@@ -173,6 +183,7 @@ void
nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
{
nni_ep *ep;
+ void *pdata = nni_pipe_get_proto_data(pipe);
nni_mtx_lock(&sock->s_mx);
@@ -184,7 +195,7 @@ nni_sock_pipe_closed(nni_sock *sock, nni_pipe *pipe)
if (pipe->p_active) {
pipe->p_active = 0;
- sock->s_pipe_ops.pipe_rem(pipe->p_proto_data);
+ sock->s_pipe_ops.pipe_rem(pdata);
}
// Notify the endpoint that the pipe has closed.
@@ -200,10 +211,15 @@ void
nni_sock_pipe_rem(nni_sock *sock, nni_pipe *pipe)
{
nni_ep *ep;
+ void *pdata = nni_pipe_get_proto_data(pipe);
nni_mtx_lock(&sock->s_mx);
nni_list_remove(&sock->s_idles, pipe);
+ if (pdata != NULL) {
+ sock->s_pipe_ops.pipe_fini(pdata);
+ }
+
// Notify the endpoint that the pipe has closed - if not already done.
if (((ep = pipe->p_ep) != NULL) && ((ep->ep_pipe == pipe))) {
ep->ep_pipe = NULL;
@@ -404,8 +420,10 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = 0;
sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
- NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
- NNI_LIST_INIT(&sock->s_idles, nni_pipe, p_node);
+
+ nni_pipe_sock_list_init(&sock->s_pipes);
+ nni_pipe_sock_list_init(&sock->s_idles);
+
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
sock->s_send_fd.sn_init = 0;
sock->s_recv_fd.sn_init = 0;
diff --git a/src/core/socket.h b/src/core/socket.h
index a746a170..21449dc9 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -87,11 +87,13 @@ extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_sock_unnotify(nni_sock *, nni_notify *);
// nni_sock_pipe_add is called by the pipe to register the pipe with
-// with the socket. The pipe is added to the idle list.
-extern void nni_sock_pipe_add(nni_sock *, nni_pipe *);
+// with the socket. The pipe is added to the idle list. The protocol
+// private pipe data is initialized as well.
+extern int nni_sock_pipe_add(nni_sock *, nni_pipe *);
// nni_sock_pipe_rem deregisters the pipe from the socket. The socket
// will block during close if there are registered pipes outstanding.
+// This also frees any protocol private pipe data.
extern void nni_sock_pipe_rem(nni_sock *, nni_pipe *);
// nni_sock_pipe_ready lets the socket know the pipe is ready for