aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/core/endpt.c31
-rw-r--r--src/core/endpt.h2
-rw-r--r--src/core/msgqueue.c46
-rw-r--r--src/core/msgqueue.h10
-rw-r--r--src/core/pipe.c39
-rw-r--r--src/core/pipe.h5
-rw-r--r--src/core/platform.h11
-rw-r--r--src/core/protocol.h7
-rw-r--r--src/core/socket.c130
-rw-r--r--src/core/socket.h2
-rw-r--r--src/core/transport.h33
-rw-r--r--src/platform/posix/posix_config.h70
-rw-r--r--src/platform/posix/posix_impl.h1
-rw-r--r--src/platform/posix/posix_thread.c27
-rw-r--r--src/protocol/pair/pair.c24
-rw-r--r--src/transport/inproc/inproc.c2
16 files changed, 293 insertions, 147 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index b7181ce8..bc947dfd 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -109,15 +109,42 @@ nni_endpt_close(nni_endpt *ep)
return;
}
ep->ep_close = 1;
+ ep->ep_ops.ep_close(ep->ep_data);
nni_cond_broadcast(&ep->ep_cv);
nni_mutex_exit(&ep->ep_mx);
- ep->ep_ops.ep_close(ep->ep_data);
}
+int
+nni_endpt_listen(nni_endpt *ep)
+{
+ if (ep->ep_close) {
+ return (NNG_ECLOSED);
+ }
+ return (ep->ep_ops.ep_listen(ep->ep_data));
+}
+
+int
+nni_endpt_dial(nni_endpt *ep, nni_pipe **pp)
+{
+ nni_pipe *pipe;
+ int rv;
+
+ if (ep->ep_close) {
+ return (NNG_ECLOSED);
+ }
+ if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) {
+ return (rv);
+ }
+ if ((rv = ep->ep_ops.ep_dial(ep->ep_data, &pipe->p_data)) != 0) {
+ nni_pipe_destroy(pipe);
+ return (rv);
+ }
+ *pp = pipe;
+ return (0);
+}
#if 0
int nni_endpt_dial(nni_endpt *, nni_pipe **);
int nni_endpt_listen(nni_endpt *);
int nni_endpt_accept(nni_endpt *, nni_pipe **);
-int nni_endpt_close(nni_endpt *);
#endif
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 4054a997..7512daa8 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -16,7 +16,7 @@
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS
// OR TRANSPORTS.
struct nng_endpt {
- struct nni_endpt_ops ep_ops;
+ nni_endpt_ops ep_ops;
void * ep_data; // Transport private
nni_list_node ep_sock_node; // Per socket list
nni_socket * ep_sock;
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index b8d11e8c..266a8e26 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -18,6 +18,7 @@ struct nni_msgqueue {
nni_mutex mq_lock;
nni_cond mq_readable;
nni_cond mq_writeable;
+ nni_cond mq_drained;
int mq_cap;
int mq_len;
int mq_get;
@@ -52,7 +53,14 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap)
nni_mutex_fini(&mq->mq_lock);
return (NNG_ENOMEM);
}
+ if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) {
+ nni_cond_fini(&mq->mq_writeable);
+ nni_cond_fini(&mq->mq_readable);
+ nni_mutex_fini(&mq->mq_lock);
+ return (NNG_ENOMEM);
+ }
if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) {
+ nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -75,6 +83,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq)
{
nni_msg *msg;
+ nni_cond_fini(&mq->mq_drained);
nni_cond_fini(&mq->mq_writeable);
nni_cond_fini(&mq->mq_readable);
nni_mutex_fini(&mq->mq_lock);
@@ -153,7 +162,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg,
}
mq->mq_len++;
if (mq->mq_len == 1) {
- (void) nni_cond_signal(&mq->mq_readable);
+ nni_cond_signal(&mq->mq_readable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -181,6 +190,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
// provide it, so that the reader can drain.
if (mq->mq_len == 0) {
if (mq->mq_closed) {
+ nni_cond_signal(&mq->mq_drained);
nni_mutex_exit(&mq->mq_lock);
return (NNG_ECLOSED);
}
@@ -198,9 +208,8 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp,
if (mq->mq_get == mq->mq_cap) {
mq->mq_get = 0;
}
- mq->mq_len++;
if (mq->mq_len == (mq->mq_cap - 1)) {
- (void) nni_cond_signal(&mq->mq_writeable);
+ nni_cond_signal(&mq->mq_writeable);
}
nni_mutex_exit(&mq->mq_lock);
return (0);
@@ -258,10 +267,35 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire)
void
-nni_msgqueue_close(nni_msgqueue *mq)
+nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire)
{
- nni_msg *msg;
+ nni_mutex_enter(&mq->mq_lock);
+ mq->mq_closed = 1;
+ nni_cond_broadcast(&mq->mq_writeable);
+ nni_cond_broadcast(&mq->mq_readable);
+ while (mq->mq_len > 0) {
+ if (nni_cond_waituntil(&mq->mq_drained, expire) ==
+ NNG_ETIMEDOUT) {
+ break;
+ }
+ }
+ // If we timedout, free any remaining messages in the queue.
+ while (mq->mq_len > 0) {
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
+ mq->mq_get++;
+ if (mq->mq_get > mq->mq_cap) {
+ mq->mq_get = 0;
+ }
+ mq->mq_len--;
+ nni_msg_free(msg);
+ }
+ nni_mutex_exit(&mq->mq_lock);
+}
+
+void
+nni_msgqueue_close(nni_msgqueue *mq)
+{
nni_mutex_enter(&mq->mq_lock);
mq->mq_closed = 1;
nni_cond_broadcast(&mq->mq_writeable);
@@ -269,7 +303,7 @@ nni_msgqueue_close(nni_msgqueue *mq)
// Free the messages orphaned in the queue.
while (mq->mq_len > 0) {
- msg = mq->mq_msgs[mq->mq_get];
+ nni_msg *msg = mq->mq_msgs[mq->mq_get];
mq->mq_get++;
if (mq->mq_get > mq->mq_cap) {
mq->mq_get = 0;
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index adea2dfa..2684d42d 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -17,8 +17,8 @@
// do have additional capabilities though.
//
// A closed message queue cannot be written to, but if there are messages
-// still in it, it can be read from. (This allows them to play a role in
-// draining/lingering.)
+// still in it and it is draining, it can be read from. This permits
+// linger operations to work.
//
// Message queues can be closed many times safely.
//
@@ -84,4 +84,10 @@ extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *);
// are freed. Unlike closing a go channel, this operation is idempotent.
extern void nni_msgqueue_close(nni_msgqueue *);
+// nni_msgqueue_drain is like nng_msgqueue_close, except that reads
+// against the queue are permitted for up to the time limit. The
+// operation blocks until either the queue is empty, or the timeout
+// has expired. Any messages still in the queue at the timeout are freed.
+extern void nni_msgqueue_drain(nni_msgqueue *, nni_time);
+
#endif // CORE_MSQUEUE_H
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 58a31d7f..bf2e64fe 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -16,23 +16,23 @@
// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
uint32_t
-nni_pipe_id(nni_pipe * p)
+nni_pipe_id(nni_pipe *p)
{
return (p->p_id);
}
int
-nni_pipe_send(nni_pipe * p, nng_msg *msg)
+nni_pipe_send(nni_pipe *p, nng_msg *msg)
{
- return (p->p_ops.p_send(p->p_tran, msg));
+ return (p->p_ops.p_send(p->p_data, msg));
}
int
-nni_pipe_recv(nni_pipe * p, nng_msg **msgp)
+nni_pipe_recv(nni_pipe *p, nng_msg **msgp)
{
- return (p->p_ops.p_recv(p->p_tran, msgp));
+ return (p->p_ops.p_recv(p->p_data, msgp));
}
@@ -40,22 +40,39 @@ nni_pipe_recv(nni_pipe * p, nng_msg **msgp)
// subsequent attempts receive or send (including any waiting receive) will
// simply return NNG_ECLOSED.
void
-nni_pipe_close(nni_pipe * p)
+nni_pipe_close(nni_pipe *p)
{
- p->p_ops.p_close(p->p_tran);
+ p->p_ops.p_close(p->p_data);
}
uint16_t
-nni_pipe_peer(nni_pipe * p)
+nni_pipe_peer(nni_pipe *p)
{
- return (p->p_ops.p_peer(p->p_tran));
+ return (p->p_ops.p_peer(p->p_data));
}
void
-nni_pipe_destroy(nni_pipe * p)
+nni_pipe_destroy(nni_pipe *p)
{
- p->p_ops.p_destroy(p->p_tran);
+ if (p->p_data != NULL) {
+ p->p_ops.p_destroy(p->p_data);
+ }
nni_free(p, sizeof (*p));
}
+
+
+int
+nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops)
+{
+ nni_pipe *p;
+
+ if ((p = nni_alloc(sizeof (*p))) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ p->p_data = NULL;
+ p->p_ops = *ops;
+ p->p_id = nni_plat_nextid();
+ return (0);
+}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index b78eaa2a..2ce4f1ec 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -21,14 +21,13 @@
struct nng_pipe {
uint32_t p_id;
struct nni_pipe_ops p_ops;
- void * p_tran;
+ void * p_data;
nni_list_node p_sock_node;
nni_socket * p_sock;
nni_list_node p_ep_node;
nni_endpt * p_ep;
};
-
// Pipe operations that protocols use.
extern int nni_pipe_recv(nni_pipe *, nng_msg **);
extern int nni_pipe_send(nni_pipe *, nng_msg *);
@@ -37,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *);
// Used only by the socket core - as we don't wish to expose the details
// of the pipe structure outside of pipe.c.
-extern int nni_pipe_create(nni_pipe **, struct nni_transport *);
+extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *);
extern void nni_pipe_destroy(nni_pipe *);
diff --git a/src/core/platform.h b/src/core/platform.h
index a674aac5..84477853 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -135,7 +135,7 @@ extern nni_time nni_clock(void);
// nni_usleep sleeps for the specified number of microseconds (at least).
extern void nni_usleep(nni_duration);
-// nni_platform_init is called to allow the platform the chance to
+// nni_plat_init is called to allow the platform the chance to
// do any necessary initialization. This routine MUST be idempotent,
// and threadsafe, and will be called before any other API calls, and
// may be called at any point thereafter. It is permitted to return
@@ -149,11 +149,18 @@ extern void nni_usleep(nni_duration);
// initialize the platform again at a later date.
extern int nni_plat_init(int (*)(void));
-// nni_platform_fini is called to clean up resources. It is intended to
+// nni_plat_fini is called to clean up resources. It is intended to
// be called as the last thing executed in the library, and no other functions
// will be called until nni_platform_init is called.
extern void nni_plat_fini(void);
+// nni_plat_nextid is used to generate a new pipe ID. This should be an
+// increasing value, taken from a random starting point. (The randomness
+// helps ensure we don't confuse pipe IDs from other connections.) The
+// value must be obtained in a threadsafe way (e.g. via an atomic counter
+// or under a lock protection.)
+extern uint32_t nni_plat_nextid(void);
+
// Actual platforms we support. This is included up front so that we can
// get the specific types that are supplied by the platform.
#if defined(PLATFORM_POSIX)
diff --git a/src/core/protocol.h b/src/core/protocol.h
index a4b24177..f73825d7 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -30,13 +30,6 @@ struct nni_protocol {
// Destroy the protocol instance.
void (*proto_destroy)(void *);
- // Shutdown the protocol instance, including giving time to
- // drain any outbound frames (linger). The protocol is not
- // required to honor the linger.
- // XXX: This is probably redundant -- protocol should notice
- // drain by getting NNG_ECLOSED on the upper write queue.
- void (*proto_shutdown)(void *);
-
// Add and remove pipes. These are called as connections are
// created or destroyed.
int (*proto_add_pipe)(void *, nni_pipe *);
diff --git a/src/core/socket.c b/src/core/socket.c
index 1331bb4c..70029278 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -73,7 +73,7 @@ nni_socket_close(nni_socket *sock)
{
nni_pipe *pipe;
nni_endpt *ep;
- uint64_t linger;
+ nni_time linger;
nni_mutex_enter(&sock->s_mx);
@@ -87,23 +87,20 @@ nni_socket_close(nni_socket *sock)
}
nni_mutex_exit(&sock->s_mx);
- // XXX: TODO. This is a place where we should drain the write side
- // msgqueue, effectively getting a linger on the socket. The
- // protocols will drain this queue, and should continue to run
- // handling both transmit and receive until that's done.
- // Note that *all* protocols need to monitor for this linger, even
- // those that do not transmit. This way they can notice and go ahead
- // and quickly shutdown their pipes; this keeps us from having to wait
- // for *our* pipelist to become empty. This helps us ensure that
- // they effectively get the chance to linger on their side, without
- // requring us to do anything magical for them.
-
- // nni_msgqueue_drain(sock->s_uwq, sock->s_linger);
-
- // Now we should attempt to wait for the list of pipes to drop to
- // zero -- indicating that the protocol has shut things down
- // cleanly, voluntarily. (I.e. it finished its drain.)
+ // XXX: TODO: add socket linger timeout to this, from socket option.
linger = nni_clock();
+
+ // We drain the upper write queue. This is just like closing it,
+ // except that the protocol gets a chance to get the messages and
+ // push them down to the transport. This operation can *block*
+ // until the linger time has expired.
+ nni_msgqueue_drain(sock->s_uwq, linger);
+
+ // Generally, unless the protocol is blocked trying to perform
+ // writes (e.g. a slow reader on the other side), it should be
+ // trying to shut things down -- the normal flow is for it to
+ // close pipes and call nni_sock_rem_pipe(). We wait to give it
+ // a chance to do so gracefully.
nni_mutex_enter(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
@@ -111,36 +108,44 @@ nni_socket_close(nni_socket *sock)
}
}
- // Time's up! Shut it down the hard way.
+ // At this point, we've done everything we politely can to give
+ // the protocol a chance to flush its write side. Now its time
+ // to be a little more insistent.
+
+ // Close the upper read queue immediately. This can happen
+ // safely while we hold the lock.
nni_msgqueue_close(sock->s_urq);
- nni_msgqueue_close(sock->s_uwq);
- // This gives the protocol notice, in case it didn't get the hint.
- // XXX: In retrospect, this entry point seems redundant.
- sock->s_ops.proto_shutdown(sock->s_data);
-
- // The protocol *MUST* give us back our pipes at this point, and
- // quickly too! If this blocks for any non-trivial amount of time
- // here, it indicates a protocol implementation bug.
+
+ // Go through and close all the pipes.
+ nni_mutex_enter(&sock->s_mx);
+ NNI_LIST_FOREACH (&sock->s_pipes, pipe) {
+ nni_pipe_close(pipe);
+ }
+
+ // At this point, the protocols should have all their operations
+ // failing, if they have any remaining, and they should be returning
+ // any pipes back to us very quickly. We'll wait for them to finish,
+ // as it MUST occur shortly.
while (nni_list_first(&sock->s_pipes) != NULL) {
nni_cond_wait(&sock->s_cv);
}
- // Wait to make sure endpoint listeners have shutdown and exited
- // as well. They should have done so *long* ago. Because this
- // requires waiting for threads to finish, which *could* in theory
- // overlap with this, we must drop the socket lock.
+ // We signaled the endpoints to shutdown and cleanup. We just
+ // need to wait for them to finish.
while ((ep = nni_list_first(&sock->s_eps)) != NULL) {
+ nni_cond_wait(&sock->s_cv);
+ }
+ nni_mutex_exit(&sock->s_mx);
- // TODO: This looks like it should happen as an endpt_remove
- // operation?
- nni_list_remove(&sock->s_eps, ep);
- nni_mutex_exit(&sock->s_mx);
+ // At this point nothing else should be referencing us.
- nni_endpt_destroy(ep);
- nni_mutex_enter(&sock->s_mx);
- }
+ // The protocol needs to clean up its state.
+ sock->s_ops.proto_destroy(&sock->s_data);
- nni_mutex_exit(&sock->s_mx);
+ // And we need to clean up *our* state.
+ nni_cond_fini(&sock->s_cv);
+ nni_mutex_fini(&sock->s_mx);
+ nni_free(sock, sizeof (*sock));
return (0);
}
@@ -197,6 +202,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
return (rv);
}
+
int
nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
{
@@ -239,6 +245,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
return (0);
}
+
// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
nni_socket_proto(nni_socket *sock)
@@ -246,6 +253,7 @@ nni_socket_proto(nni_socket *sock)
return (sock->s_ops.proto_self);
}
+
// nni_socket_rem_pipe removes the pipe from the socket. This is often
// called by the protocol when a pipe is removed due to close.
void
@@ -294,8 +302,54 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
+
+ // Add the pipe to its endpoint list.
+ nni_mutex_enter(&pipe->p_ep->ep_mx);
+ nni_list_append(&pipe->p_ep->ep_pipes, pipe);
+ nni_mutex_exit(&pipe->p_ep->ep_mx);
+
pipe->p_sock = sock;
// XXX: Publish event
nni_mutex_exit(&sock->s_mx);
return (0);
}
+
+
+void
+nni_sock_dialer(void *arg)
+{
+ nni_endpt *ep = arg;
+ nni_socket *sock = ep->ep_sock;
+ nni_pipe *pipe;
+ int rv;
+
+ for (;;) {
+ nni_mutex_enter(&ep->ep_mx);
+ while ((!ep->ep_close) &&
+ (nni_list_first(&ep->ep_pipes) != NULL)) {
+ nni_cond_wait(&ep->ep_cv);
+ }
+ if (ep->ep_close) {
+ nni_mutex_exit(&ep->ep_mx);
+ break;
+ }
+ nni_mutex_exit(&ep->ep_mx);
+
+ pipe = NULL;
+
+ if (((rv = nni_endpt_dial(ep, pipe)) != 0) ||
+ ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) {
+ if (rv == NNG_ECLOSED) {
+ return;
+ }
+ if (pipe != NULL) {
+ nni_pipe_destroy(pipe);
+ }
+ // XXX: Inject a wait for reconnect...
+ continue;
+ }
+
+ }
+
+ // XXX: move the endpoint to the sockets reap list
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index ec4acfdb..937f221a 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -33,6 +33,8 @@ struct nng_socket {
int s_besteffort; // Best effort mode delivery
int s_senderr; // Protocol state machine use
int s_recverr; // Protocol state machine use
+
+ uint32_t s_nextid; // Next Pipe ID.
};
extern int nni_socket_create(nni_socket **, uint16_t);
diff --git a/src/core/transport.h b/src/core/transport.h
index 4e39adaf..e4f07002 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -15,21 +15,18 @@
struct nni_transport {
// tran_scheme is the transport scheme, such as "tcp" or "inproc".
- const char * tran_scheme;
+ const char * tran_scheme;
// tran_ep_ops links our endpoint operations.
- const struct nni_endpt_ops * tran_ep_ops;
-
- // tran_pipe_ops links our pipe operations.
- const struct nni_pipe_ops * tran_pipe_ops;
+ const nni_endpt_ops * tran_ep_ops;
// tran_init, if not NULL, is called once during library
// initialization.
- int (*tran_init)(void);
+ int (*tran_init)(void);
// tran_fini, if not NULL, is called during library deinitialization.
// It should release any global resources, close any open files, etc.
- void (*tran_fini)(void);
+ void (*tran_fini)(void);
};
@@ -40,38 +37,44 @@ struct nni_transport {
struct nni_endpt_ops {
// ep_create creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
- int (*ep_create)(void **, const char *, uint16_t);
+ int (*ep_create)(void **, const char *,
+ uint16_t);
// ep_destroy frees the resources associated with the endpoint.
// The endpoint will already have been closed.
- void (*ep_destroy)(void *);
+ void (*ep_destroy)(void *);
// ep_dial starts dialing, and creates a new pipe,
// which is returned in the final argument. It can return errors
// NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED,
// NNG_ETIMEDOUT, and NNG_EPROTO.
- int (*ep_dial)(void *, void **);
+ int (*ep_dial)(void *, void **);
// ep_listen just does the bind() and listen() work,
// reserving the address but not creating any connections.
// It should return NNG_EADDRINUSE if the address is already
// taken. It can also return NNG_EBADADDR for an unsuitable
// address, or NNG_EACCESS for permission problems.
- int (*ep_listen)(void *);
+ int (*ep_listen)(void *);
// ep_accept accepts an inbound connection, and creates
// a transport pipe, which is returned in the final argument.
- int (*ep_accept)(void *, void **);
+ int (*ep_accept)(void *, void **);
// ep_close stops the endpoint from operating altogether. It does
// not affect pipes that have already been created.
- void (*ep_close)(void *);
+ void (*ep_close)(void *);
// ep_setopt sets an endpoint (transport-specific) option.
- int (*ep_setopt)(void *, int, const void *, size_t);
+ int (*ep_setopt)(void *, int, const void *,
+ size_t);
// ep_getopt gets an endpoint (transport-specific) option.
- int (*ep_getopt)(void *, int, void *, size_t *);
+ int (*ep_getopt)(void *, int, void *,
+ size_t *);
+
+ // ep_pipe_ops links our pipe operations.
+ const nni_pipe_ops * ep_pipe_ops;
};
// Pipe operations are entry points called by the socket. These may be called
diff --git a/src/platform/posix/posix_config.h b/src/platform/posix/posix_config.h
index dd2167ab..1510d739 100644
--- a/src/platform/posix/posix_config.h
+++ b/src/platform/posix/posix_config.h
@@ -1,38 +1,40 @@
-/*
- * Copyright 2016 Garrett D'Amore <garrett@damore.org>
- *
- * This software is supplied under the terms of the MIT License, a
- * copy of which should be located in the distribution where this
- * file was obtained (LICENSE.txt). A copy of the license may also be
- * found online at https://opensource.org/licenses/MIT.
- */
+//
+// Copyright 2016 Garrett D'Amore <garrett@damore.org>
+//
+// This software is supplied under the terms of the MIT License, a
+// copy of which should be located in the distribution where this
+// file was obtained (LICENSE.txt). A copy of the license may also be
+// found online at https://opensource.org/licenses/MIT.
+//
-/*
- * The following adjustments to the platform may be defined. These can
- * be defined in either platform/config.h or loaded in via external
- * defines using cmake.
- *
- * #define NNG_USE_GETTIMEOFDAY
- * This macro is defined if you lack a working clock_gettime,
- * nanosleep, or pthread_condattr_setclock. In this case the
- * library uses the system clock for relative sleeps, timers, etc.
- * This can be dangerous if the system clock is changed, so only
- * use this if you have no other choice. If it appears that
- * the system lacks clock_gettime, then it will choose this automatically.
- * This value may be ignored on platforms that don't use POSIX clocks.
- *
- * #define NNG_USE_CLOCKID
- * This macro may be defined to a different clock id (see
- * clock_gettime()). By default we use CLOCK_MONOTONIC if it exists,
- * or CLOCK_REALTIME otherwise. This is ignored if NNG_USE_GETTIMEOFDAY
- * is defined. Platforms that don't use POSIX clocks will probably
- * ignore any setting here.
- *
- * #define NNG_HAVE_BACKTRACE
- * If your system has a working backtrace(), and backtrace_symbols(),
- * along with <execinfo.h>, you can define this to get richer backtrace
- * information for debugging.
- */
+// The following adjustments to the platform may be defined. These can
+// be defined in either platform/config.h or loaded in via external
+// defines using cmake.
+//
+// #define NNG_USE_GETTIMEOFDAY
+// This macro is defined if you lack a working clock_gettime,
+// nanosleep, or pthread_condattr_setclock. In this case the
+// library uses the system clock for relative sleeps, timers, etc.
+// This can be dangerous if the system clock is changed, so only
+// use this if you have no other choice. If it appears that
+// the system lacks clock_gettime, then it will choose this automatically.
+// This value may be ignored on platforms that don't use POSIX clocks.
+//
+// #define NNG_USE_CLOCKID
+// This macro may be defined to a different clock id (see
+// clock_gettime()). By default we use CLOCK_MONOTONIC if it exists,
+// or CLOCK_REALTIME otherwise. This is ignored if NNG_USE_GETTIMEOFDAY
+// is defined. Platforms that don't use POSIX clocks will probably
+// ignore any setting here.
+//
+// #define NNG_HAVE_ARC4RANDOM
+// This indicates that the platform has the superior arc4random function
+// for getting entropy.
+//
+// #define NNG_HAVE_BACKTRACE
+// If your system has a working backtrace(), and backtrace_symbols(),
+// along with <execinfo.h>, you can define this to get richer backtrace
+// information for debugging.
#include <time.h>
diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h
index 0fa15b43..a07b5c92 100644
--- a/src/platform/posix/posix_impl.h
+++ b/src/platform/posix/posix_impl.h
@@ -21,6 +21,7 @@
#define PLATFORM_POSIX_ALLOC
#define PLATFORM_POSIX_DEBUG
#define PLATFORM_POSIX_CLOCK
+#define PLATFORM_POSIX_RANDOM
#define PLATFORM_POSIX_SYNCH
#define PLATFORM_POSIX_THREAD
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index 80f3b96f..24f97b81 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -16,6 +16,7 @@
#include <pthread.h>
#include <time.h>
#include <string.h>
+#include <stdlib.h>
struct nni_thread {
pthread_t tid;
@@ -26,6 +27,17 @@ struct nni_thread {
static pthread_mutex_t nni_plat_lock = PTHREAD_MUTEX_INITIALIZER;
static int nni_plat_inited = 0;
static int nni_plat_forked = 0;
+static int nni_plat_next = 0;
+
+uint32_t
+nni_plat_nextid(void)
+{
+ uint32_t id;
+ pthread_mutex_lock(&nni_plat_lock);
+ id = nni_plat_next++;
+ pthread_mutex_unlock(&nni_plat_lock);
+ return (id);
+}
static void *
nni_thrfunc(void *arg)
@@ -118,6 +130,20 @@ nni_plat_init(int (*helper)(void))
return (NNG_ENOMEM);
}
+ // Generate a starting ID (used for Pipe IDs)
+#ifdef NNG_HAVE_ARC4RANDOM
+ nni_plat_next = arc4random();
+#else
+ while (nni_plat_next == 0) {
+ uint16_t xsub[3];
+ nni_time now = nni_clock();
+
+ xsub[0] = (uint16_t)now;
+ xsub[1] = (uint16_t)(now >> 16);
+ xsub[2] = (uint16_t)(now >> 24);
+ nni_plat_next = nrand48(xsub);
+ }
+#endif
if (pthread_atfork(NULL, NULL, nni_atfork_child) != 0) {
pthread_mutex_unlock(&nni_plat_lock);
@@ -144,5 +170,4 @@ nni_plat_fini(void)
pthread_mutex_unlock(&nni_plat_lock);
}
-
#endif
diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c
index cabd3f06..692f4b0e 100644
--- a/src/protocol/pair/pair.c
+++ b/src/protocol/pair/pair.c
@@ -76,29 +76,6 @@ nni_pair_destroy(void *arg)
}
-static void
-nni_pair_shutdown(void *arg)
-{
- nni_pair_sock *pair = arg;
- nni_pipe_t pipe;
-
- // This just causes the protocol to close its various pipes.
- // The draining logic, if any, will have been performed in the
- // upper layer socket.
- //
- // Closing the pipes is intended to cause the receiver on them
- // to notice the failure, and ultimately call back into the socket
- // to unregister them. The socket can use this to wait for a clean
- // shutdown of all pipe workers.
- nni_mutex_enter(&pair->mx);
- pipe = pair->pipe;
- pair->pipe = NULL;
- nni_mutex_exit(&pair->mx);
-
- nni_pipe_close(pipe);
-}
-
-
static int
nni_pair_add_pipe(void *arg, nni_pipe *pipe)
{
@@ -252,7 +229,6 @@ struct nni_protocol nni_pair_protocol = {
.proto_name = "pair",
.proto_create = nni_pair_create,
.proto_destroy = nni_pair_destroy,
- .proto_shutdown = nni_pair_shutdown,
.proto_add_pipe = nni_pair_add_pipe,
.proto_rem_pipe = nni_pair_rem_pipe,
.proto_setopt = nni_pair_setopt,
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index a72d6b1e..a4da2b14 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -393,6 +393,7 @@ static struct nni_endpt_ops nni_inproc_ep_ops = {
.ep_listen = nni_inproc_ep_listen,
.ep_accept = nni_inproc_ep_accept,
.ep_close = nni_inproc_ep_close,
+ .ep_pipe_ops = &nni_inproc_pipe_ops,
.ep_setopt = NULL,
.ep_getopt = NULL,
};
@@ -402,7 +403,6 @@ static struct nni_endpt_ops nni_inproc_ep_ops = {
struct nni_transport nni_inproc_transport = {
.tran_scheme = "inproc",
.tran_ep_ops = &nni_inproc_ep_ops,
- .tran_pipe_ops = &nni_inproc_pipe_ops,
.tran_init = nni_inproc_init,
.tran_fini = nni_inproc_fini,
};