aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c31
-rw-r--r--src/core/endpt.h2
-rw-r--r--src/core/msgqueue.c46
-rw-r--r--src/core/msgqueue.h10
-rw-r--r--src/core/pipe.c39
-rw-r--r--src/core/pipe.h5
-rw-r--r--src/core/platform.h11
-rw-r--r--src/core/protocol.h7
-rw-r--r--src/core/socket.c130
-rw-r--r--src/core/socket.h2
-rw-r--r--src/core/transport.h33
11 files changed, 229 insertions, 87 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index b7181ce8..bc947dfd 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -109,15 +109,42 @@ nni_endpt_close(nni_endpt *ep)
return;
}
ep->ep_close = 1;
+ ep->ep_ops.ep_close(ep->ep_data);
nni_cond_broadcast(&ep->ep_cv);
nni_mutex_exit(&ep->ep_mx);
- ep->ep_ops.ep_close(ep->ep_data);
}
+int
+nni_endpt_listen(nni_endpt *ep)
+{
+ if (ep->ep_close) {
+ return (NNG_ECLOSED);
+ }
+ return (ep->ep_ops.ep_listen(ep->ep_data));
+}
+
+int
+nni_endpt_dial(nni_endpt *ep, nni_pipe **pp)
+{
+ nni_pipe *pipe;
+ int rv;
+
+ if (ep->ep_close) {
+ return (NNG_ECLOSED);
+ }
+ if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ return (rv);
+ }
+ if ((rv = ep->ep_ops.ep_dial(ep->ep_data, &pipe->p_data)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
+}
#if 0
int nni_endpt_dial(nni_endpt *, nni_pipe **);
int nni_endpt_listen(nni_endpt *);
int nni_endpt_accept(nni_endpt *, nni_pipe **);
-int nni_endpt_close(nni_endpt *);
#endif
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 4054a997..7512daa8 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -16,7 +16,7 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS
// OR TRANSPORTS.
struct nng_endpt {
- struct nni_endpt_ops ep_ops;
+ nni_endpt_ops ep_ops;
void * ep_data; // Transport private
nni_list_node ep_sock_node; // Per socket list
nni_socket * ep_sock;
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index b8d11e8c..266a8e26 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -18,6 +18,7 @@ struct nni_msgqueue {
nni_mutex mq_lock;
nni_cond mq_readable;
nni_cond mq_writeable;
+ nni_cond mq_drained;
int mq_cap;
int mq_len;
int mq_get;
@@ -52,7 +53,14 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
+ if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
+ return (NNG_ENOMEM);
+ }
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) {
+ nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -75,6 +83,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
{
nni_msg *msg;
+ nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -153,7 +162,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
mq->mq_len++;
if (mq->mq_len == 1) {
- (void) nni_cond_signal(&mq->mq_readable);
+ nni_cond_signal(&mq->mq_readable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -181,6 +190,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
// provide it, so that the reader can drain.
if (mq->mq_len == 0) {
if (mq->mq_closed) {
+ nni_cond_signal(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
@@ -198,9 +208,8 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
if (mq->mq_get == mq->mq_cap) {
mq->mq_get = 0;
}
- mq->mq_len++;
if (mq->mq_len == (mq->mq_cap - 1)) {
- (void) nni_cond_signal(&mq->mq_writeable);
+ nni_cond_signal(&mq->mq_writeable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -258,10 +267,35 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
void
-nni_msgqueue_close(nni_msgqueue *mq)
+nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
{
- nni_msg *msg;
+ nni_mutex_enter(&mq->mq_lock);
+ mq->mq_closed = 1;
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_readable);
+ while (mq->mq_len > 0) {
+ if (nni_cond_waituntil(&mq->mq_drained, expire) ==
+ NNG_ETIMEDOUT) {
+ break;
+ }
+ }
+ // If we timedout, free any remaining messages in the queue.
+ while (mq->mq_len > 0) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_mutex_exit(&mq->mq_lock);
+}
+
+void
+nni_msgqueue_close(nni_msgqueue *mq)
+{
nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;
nni_cond_broadcast(&mq->mq_writeable);
@@ -269,7 +303,7 @@ nni_msgqueue_close(nni_msgqueue *mq)
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
- msg = mq->mq_msgs[mq->mq_get];
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
mq->mq_get++;
if (mq->mq_get > mq->mq_cap) {
mq->mq_get = 0;
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index adea2dfa..2684d42d 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -17,8 +17,8 @@
// do have additional capabilities though.
//
// A closed message queue cannot be written to, but if there are messages
-// still in it, it can be read from. (This allows them to play a role in
-// draining/lingering.)
+// still in it and it is draining, it can be read from. This permits
+// linger operations to work.
//
// Message queues can be closed many times safely.
//
@@ -84,4 +84,10 @@ extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *);
// are freed. Unlike closing a go channel, this operation is idempotent.
extern void nni_msgqueue_close(nni_msgqueue *);
+// nni_msgqueue_drain is like nng_msgqueue_close, except that reads
+// against the queue are permitted for up to the time limit. The
+// operation blocks until either the queue is empty, or the timeout
+// has expired. Any messages still in the queue at the timeout are freed.
+extern void nni_msgqueue_drain(nni_msgqueue *, nni_time);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 58a31d7f..bf2e64fe 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -16,23 +16,23 @@
// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
uint32_t
-nni_pipe_id(nni_pipe * p)
+nni_pipe_id(nni_pipe *p)
{
return (p->p_id);
}
int
-nni_pipe_send(nni_pipe * p, nng_msg *msg)
+nni_pipe_send(nni_pipe *p, nng_msg *msg)
{
- return (p->p_ops.p_send(p->p_tran, msg));
+ return (p->p_ops.p_send(p->p_data, msg));
}
int
-nni_pipe_recv(nni_pipe * p, nng_msg **msgp)
+nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
{
- return (p->p_ops.p_recv(p->p_tran, msgp));
+ return (p->p_ops.p_recv(p->p_data, msgp));
}
@@ -40,22 +40,39 @@ nni_pipe_recv(nni_pipe * p, nng_msg **msgp)
// subsequent attempts receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
void
-nni_pipe_close(nni_pipe * p)
+nni_pipe_close(nni_pipe *p)
{
- p->p_ops.p_close(p->p_tran);
+ p->p_ops.p_close(p->p_data);
}
uint16_t
-nni_pipe_peer(nni_pipe * p)
+nni_pipe_peer(nni_pipe *p)
{
- return (p->p_ops.p_peer(p->p_tran));
+ return (p->p_ops.p_peer(p->p_data));
}
void
-nni_pipe_destroy(nni_pipe * p)
+nni_pipe_destroy(nni_pipe *p)
{
- p->p_ops.p_destroy(p->p_tran);
+ if (p->p_data != NULL) {
+ p->p_ops.p_destroy(p->p_data);
+ }
nni_free(p, sizeof (*p));
}
+
+
+int
+nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops)
+{
+ nni_pipe *p;
+
+ if ((p = nni_alloc(sizeof (*p))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ p->p_data = NULL;
+ p->p_ops = *ops;
+ p->p_id = nni_plat_nextid();
+ return (0);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index b78eaa2a..2ce4f1ec 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -21,14 +21,13 @@
struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
- void * p_tran;
+ void * p_data;
nni_list_node p_sock_node;
nni_socket * p_sock;
nni_list_node p_ep_node;
nni_endpt * p_ep;
};
-
// Pipe operations that protocols use.
extern int nni_pipe_recv(nni_pipe *, nng_msg **);
extern int nni_pipe_send(nni_pipe *, nng_msg *);
@@ -37,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, struct nni_transport *);
+extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *);
extern void nni_pipe_destroy(nni_pipe *);
diff --git a/src/core/platform.h b/src/core/platform.h
index a674aac5..84477853 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -135,7 +135,7 @@ extern nni_time nni_clock(void);
// nni_usleep sleeps for the specified number of microseconds (at least).
extern void nni_usleep(nni_duration);
-// nni_platform_init is called to allow the platform the chance to
+// nni_plat_init is called to allow the platform the chance to
// do any necessary initialization. This routine MUST be idempotent,
// and threadsafe, and will be called before any other API calls, and
// may be called at any point thereafter. It is permitted to return
@@ -149,11 +149,18 @@ extern void nni_usleep(nni_duration);
// initialize the platform again at a later date.
extern int nni_plat_init(int (*)(void));
-// nni_platform_fini is called to clean up resources. It is intended to
+// nni_plat_fini is called to clean up resources. It is intended to
// be called as the last thing executed in the library, and no other functions
// will be called until nni_platform_init is called.
extern void nni_plat_fini(void);
+// nni_plat_nextid is used to generate a new pipe ID. This should be an
+// increasing value, taken from a random starting point. (The randomness
+// helps ensure we don't confuse pipe IDs from other connections.) The
+// value must be obtained in a threadsafe way (e.g. via an atomic counter
+// or under a lock protection.)
+extern uint32_t nni_plat_nextid(void);
+
// Actual platforms we support. This is included up front so that we can
// get the specific types that are supplied by the platform.
#if defined(PLATFORM_POSIX)
diff --git a/src/core/protocol.h b/src/core/protocol.h
index a4b24177..f73825d7 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -30,13 +30,6 @@ struct nni_protocol {
// Destroy the protocol instance.
void (*proto_destroy)(void *);
- // Shutdown the protocol instance, including giving time to
- // drain any outbound frames (linger). The protocol is not
- // required to honor the linger.
- // XXX: This is probably redundant -- protocol should notice
- // drain by getting NNG_ECLOSED on the upper write queue.
- void (*proto_shutdown)(void *);
-
// Add and remove pipes. These are called as connections are
// created or destroyed.
int (*proto_add_pipe)(void *, nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 1331bb4c..70029278 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -73,7 +73,7 @@ nni_socket_close(nni_socket *sock)
{
nni_pipe *pipe;
nni_endpt *ep;
- uint64_t linger;
+ nni_time linger;
nni_mutex_enter(&sock->s_mx);
@@ -87,23 +87,20 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
- // XXX: TODO. This is a place where we should drain the write side
- // msgqueue, effectively getting a linger on the socket. The
- // protocols will drain this queue, and should continue to run
- // handling both transmit and receive until that's done.
- // Note that *all* protocols need to monitor for this linger, even
- // those that do not transmit. This way they can notice and go ahead
- // and quickly shutdown their pipes; this keeps us from having to wait
- // for *our* pipelist to become empty. This helps us ensure that
- // they effectively get the chance to linger on their side, without
- // requring us to do anything magical for them.
-
- // nni_msgqueue_drain(sock->s_uwq, sock->s_linger);
-
- // Now we should attempt to wait for the list of pipes to drop to
- // zero -- indicating that the protocol has shut things down
- // cleanly, voluntarily. (I.e. it finished its drain.)
+ // XXX: TODO: add socket linger timeout to this, from socket option.
linger = nni_clock();
+
+ // We drain the upper write queue. This is just like closing it,
+ // except that the protocol gets a chance to get the messages and
+ // push them down to the transport. This operation can *block*
+ // until the linger time has expired.
+ nni_msgqueue_drain(sock->s_uwq, linger);
+
+ // Generally, unless the protocol is blocked trying to perform
+ // writes (e.g. a slow reader on the other side), it should be
+ // trying to shut things down -- the normal flow is for it to
+ // close pipes and call nni_sock_rem_pipe(). We wait to give it
+ // a chance to do so gracefully.
nni_mutex_enter(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
@@ -111,36 +108,44 @@ nni_socket_close(nni_socket *sock)
}
}
- // Time's up! Shut it down the hard way.
+ // At this point, we've done everything we politely can to give
+ // the protocol a chance to flush its write side. Now its time
+ // to be a little more insistent.
+
+ // Close the upper read queue immediately. This can happen
+ // safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- nni_msgqueue_close(sock->s_uwq);
- // This gives the protocol notice, in case it didn't get the hint.
- // XXX: In retrospect, this entry point seems redundant.
- sock->s_ops.proto_shutdown(sock->s_data);
-
- // The protocol *MUST* give us back our pipes at this point, and
- // quickly too! If this blocks for any non-trivial amount of time
- // here, it indicates a protocol implementation bug.
+
+ // Go through and close all the pipes.
+ nni_mutex_enter(&sock->s_mx);
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
+ // At this point, the protocols should have all their operations
+ // failing, if they have any remaining, and they should be returning
+ // any pipes back to us very quickly. We'll wait for them to finish,
+ // as it MUST occur shortly.
while (nni_list_first(&sock->s_pipes) != NULL) {
nni_cond_wait(&sock->s_cv);
}
- // Wait to make sure endpoint listeners have shutdown and exited
- // as well. They should have done so *long* ago. Because this
- // requires waiting for threads to finish, which *could* in theory
- // overlap with this, we must drop the socket lock.
+ // We signaled the endpoints to shutdown and cleanup. We just
+ // need to wait for them to finish.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_cond_wait(&sock->s_cv);
+ }
+ nni_mutex_exit(&sock->s_mx);
- // TODO: This looks like it should happen as an endpt_remove
- // operation?
- nni_list_remove(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_mx);
+ // At this point nothing else should be referencing us.
- nni_endpt_destroy(ep);
- nni_mutex_enter(&sock->s_mx);
- }
+ // The protocol needs to clean up its state.
+ sock->s_ops.proto_destroy(&sock->s_data);
- nni_mutex_exit(&sock->s_mx);
+ // And we need to clean up *our* state.
+ nni_cond_fini(&sock->s_cv);
+ nni_mutex_fini(&sock->s_mx);
+ nni_free(sock, sizeof (*sock));
return (0);
}
@@ -197,6 +202,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
return (rv);
}
+
int
nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
{
@@ -239,6 +245,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
return (0);
}
+
// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
nni_socket_proto(nni_socket *sock)
@@ -246,6 +253,7 @@ nni_socket_proto(nni_socket *sock)
return (sock->s_ops.proto_self);
}
+
// nni_socket_rem_pipe removes the pipe from the socket. This is often
// called by the protocol when a pipe is removed due to close.
void
@@ -294,8 +302,54 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+
+ // Add the pipe to its endpoint list.
+ nni_mutex_enter(&pipe->p_ep->ep_mx);
+ nni_list_append(&pipe->p_ep->ep_pipes, pipe);
+ nni_mutex_exit(&pipe->p_ep->ep_mx);
+
pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
}
+
+
+void
+nni_sock_dialer(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ for (;;) {
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_close) &&
+ (nni_list_first(&ep->ep_pipes) != NULL)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_dial(ep, pipe)) != 0) ||
+ ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) {
+ if (rv == NNG_ECLOSED) {
+ return;
+ }
+ if (pipe != NULL) {
+ nni_pipe_destroy(pipe);
+ }
+ // XXX: Inject a wait for reconnect...
+ continue;
+ }
+
+ }
+
+ // XXX: move the endpoint to the sockets reap list
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index ec4acfdb..937f221a 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -33,6 +33,8 @@ struct nng_socket {
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
int s_recverr; // Protocol state machine use
+
+ uint32_t s_nextid; // Next Pipe ID.
};
extern int nni_socket_create(nni_socket **, uint16_t);
diff --git a/src/core/transport.h b/src/core/transport.h
index 4e39adaf..e4f07002 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -15,21 +15,18 @@
struct nni_transport {
// tran_scheme is the transport scheme, such as "tcp" or "inproc".
- const char * tran_scheme;
+ const char * tran_scheme;
// tran_ep_ops links our endpoint operations.
- const struct nni_endpt_ops * tran_ep_ops;
-
- // tran_pipe_ops links our pipe operations.
- const struct nni_pipe_ops * tran_pipe_ops;
+ const nni_endpt_ops * tran_ep_ops;
// tran_init, if not NULL, is called once during library
// initialization.
- int (*tran_init)(void);
+ int (*tran_init)(void);
// tran_fini, if not NULL, is called during library deinitialization.
// It should release any global resources, close any open files, etc.
- void (*tran_fini)(void);
+ void (*tran_fini)(void);
};
@@ -40,38 +37,44 @@ struct nni_transport {
struct nni_endpt_ops {
// ep_create creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
- int (*ep_create)(void **, const char *, uint16_t);
+ int (*ep_create)(void **, const char *,
+ uint16_t);
// ep_destroy frees the resources associated with the endpoint.
// The endpoint will already have been closed.
- void (*ep_destroy)(void *);
+ void (*ep_destroy)(void *);
// ep_dial starts dialing, and creates a new pipe,
// which is returned in the final argument. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
// NNG_ETIMEDOUT, and NNG_EPROTO.
- int (*ep_dial)(void *, void **);
+ int (*ep_dial)(void *, void **);
// ep_listen just does the bind() and listen() work,
// reserving the address but not creating any connections.
// It should return NNG_EADDRINUSE if the address is already
// taken. It can also return NNG_EBADADDR for an unsuitable
// address, or NNG_EACCESS for permission problems.
- int (*ep_listen)(void *);
+ int (*ep_listen)(void *);
// ep_accept accepts an inbound connection, and creates
// a transport pipe, which is returned in the final argument.
- int (*ep_accept)(void *, void **);
+ int (*ep_accept)(void *, void **);
// ep_close stops the endpoint from operating altogether. It does
// not affect pipes that have already been created.
- void (*ep_close)(void *);
+ void (*ep_close)(void *);
// ep_setopt sets an endpoint (transport-specific) option.
- int (*ep_setopt)(void *, int, const void *, size_t);
+ int (*ep_setopt)(void *, int, const void *,
+ size_t);
// ep_getopt gets an endpoint (transport-specific) option.
- int (*ep_getopt)(void *, int, void *, size_t *);
+ int (*ep_getopt)(void *, int, void *,
+ size_t *);
+
+ // ep_pipe_ops links our pipe operations.
+ const nni_pipe_ops * ep_pipe_ops;
};
// Pipe operations are entry points called by the socket. These may be called