diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-22 20:52:45 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-22 20:52:45 -0800 |
| commit | ee45cbf4498a3c1d1868469bdb0c767d66c278e4 (patch) | |
| tree | b9116256f12a54c90f92bf5cf215f3d4c8152126 /src/core | |
| parent | 718de1828cc5b5256511c5b723360d499ae21c8f (diff) | |
| download | nng-ee45cbf4498a3c1d1868469bdb0c767d66c278e4.tar.gz nng-ee45cbf4498a3c1d1868469bdb0c767d66c278e4.tar.bz2 nng-ee45cbf4498a3c1d1868469bdb0c767d66c278e4.zip | |
Endpoint dialer implemented.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/endpt.c | 31 | ||||
| -rw-r--r-- | src/core/endpt.h | 2 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 46 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 10 | ||||
| -rw-r--r-- | src/core/pipe.c | 39 | ||||
| -rw-r--r-- | src/core/pipe.h | 5 | ||||
| -rw-r--r-- | src/core/platform.h | 11 | ||||
| -rw-r--r-- | src/core/protocol.h | 7 | ||||
| -rw-r--r-- | src/core/socket.c | 130 | ||||
| -rw-r--r-- | src/core/socket.h | 2 | ||||
| -rw-r--r-- | src/core/transport.h | 33 |
11 files changed, 229 insertions, 87 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index b7181ce8..bc947dfd 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -109,15 +109,42 @@ nni_endpt_close(nni_endpt *ep) return; } ep->ep_close = 1; + ep->ep_ops.ep_close(ep->ep_data); nni_cond_broadcast(&ep->ep_cv); nni_mutex_exit(&ep->ep_mx); - ep->ep_ops.ep_close(ep->ep_data); } +int +nni_endpt_listen(nni_endpt *ep) +{ + if (ep->ep_close) { + return (NNG_ECLOSED); + } + return (ep->ep_ops.ep_listen(ep->ep_data)); +} + +int +nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) +{ + nni_pipe *pipe; + int rv; + + if (ep->ep_close) { + return (NNG_ECLOSED); + } + if ((rv = nni_pipe_create(&pipe, ep->ep_ops.ep_pipe_ops)) != 0) { + return (rv); + } + if ((rv = ep->ep_ops.ep_dial(ep->ep_data, &pipe->p_data)) != 0) { + nni_pipe_destroy(pipe); + return (rv); + } + *pp = pipe; + return (0); +} #if 0 int nni_endpt_dial(nni_endpt *, nni_pipe **); int nni_endpt_listen(nni_endpt *); int nni_endpt_accept(nni_endpt *, nni_pipe **); -int nni_endpt_close(nni_endpt *); #endif diff --git a/src/core/endpt.h b/src/core/endpt.h index 4054a997..7512daa8 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -16,7 +16,7 @@ // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS // OR TRANSPORTS. struct nng_endpt { - struct nni_endpt_ops ep_ops; + nni_endpt_ops ep_ops; void * ep_data; // Transport private nni_list_node ep_sock_node; // Per socket list nni_socket * ep_sock; diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index b8d11e8c..266a8e26 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -18,6 +18,7 @@ struct nni_msgqueue { nni_mutex mq_lock; nni_cond mq_readable; nni_cond mq_writeable; + nni_cond mq_drained; int mq_cap; int mq_len; int mq_get; @@ -52,7 +53,14 @@ nni_msgqueue_create(nni_msgqueue **mqp, int cap) nni_mutex_fini(&mq->mq_lock); return (NNG_ENOMEM); } + if ((rv = nni_cond_init(&mq->mq_drained, &mq->mq_lock)) != 0) { + nni_cond_fini(&mq->mq_writeable); + nni_cond_fini(&mq->mq_readable); + nni_mutex_fini(&mq->mq_lock); + return (NNG_ENOMEM); + } if ((mq->mq_msgs = nni_alloc(sizeof (nng_msg *) * cap)) == NULL) { + nni_cond_fini(&mq->mq_drained); nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -75,6 +83,7 @@ nni_msgqueue_destroy(nni_msgqueue *mq) { nni_msg *msg; + nni_cond_fini(&mq->mq_drained); nni_cond_fini(&mq->mq_writeable); nni_cond_fini(&mq->mq_readable); nni_mutex_fini(&mq->mq_lock); @@ -153,7 +162,7 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } mq->mq_len++; if (mq->mq_len == 1) { - (void) nni_cond_signal(&mq->mq_readable); + nni_cond_signal(&mq->mq_readable); } nni_mutex_exit(&mq->mq_lock); return (0); @@ -181,6 +190,7 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, // provide it, so that the reader can drain. if (mq->mq_len == 0) { if (mq->mq_closed) { + nni_cond_signal(&mq->mq_drained); nni_mutex_exit(&mq->mq_lock); return (NNG_ECLOSED); } @@ -198,9 +208,8 @@ nni_msgqueue_get_impl(nni_msgqueue *mq, nni_msg **msgp, if (mq->mq_get == mq->mq_cap) { mq->mq_get = 0; } - mq->mq_len++; if (mq->mq_len == (mq->mq_cap - 1)) { - (void) nni_cond_signal(&mq->mq_writeable); + nni_cond_signal(&mq->mq_writeable); } nni_mutex_exit(&mq->mq_lock); return (0); @@ -258,10 +267,35 @@ nni_msgqueue_put_until(nni_msgqueue *mq, nni_msg *msg, nni_time expire) void -nni_msgqueue_close(nni_msgqueue *mq) +nni_msgqueue_drain(nni_msgqueue *mq, nni_time expire) { - nni_msg *msg; + nni_mutex_enter(&mq->mq_lock); + mq->mq_closed = 1; + nni_cond_broadcast(&mq->mq_writeable); + nni_cond_broadcast(&mq->mq_readable); + while (mq->mq_len > 0) { + if (nni_cond_waituntil(&mq->mq_drained, expire) == + NNG_ETIMEDOUT) { + break; + } + } + // If we timedout, free any remaining messages in the queue. + while (mq->mq_len > 0) { + nni_msg *msg = mq->mq_msgs[mq->mq_get]; + mq->mq_get++; + if (mq->mq_get > mq->mq_cap) { + mq->mq_get = 0; + } + mq->mq_len--; + nni_msg_free(msg); + } + nni_mutex_exit(&mq->mq_lock); +} + +void +nni_msgqueue_close(nni_msgqueue *mq) +{ nni_mutex_enter(&mq->mq_lock); mq->mq_closed = 1; nni_cond_broadcast(&mq->mq_writeable); @@ -269,7 +303,7 @@ nni_msgqueue_close(nni_msgqueue *mq) // Free the messages orphaned in the queue. while (mq->mq_len > 0) { - msg = mq->mq_msgs[mq->mq_get]; + nni_msg *msg = mq->mq_msgs[mq->mq_get]; mq->mq_get++; if (mq->mq_get > mq->mq_cap) { mq->mq_get = 0; diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index adea2dfa..2684d42d 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -17,8 +17,8 @@ // do have additional capabilities though. // // A closed message queue cannot be written to, but if there are messages -// still in it, it can be read from. (This allows them to play a role in -// draining/lingering.) +// still in it and it is draining, it can be read from. This permits +// linger operations to work. // // Message queues can be closed many times safely. // @@ -84,4 +84,10 @@ extern void nni_msgqueue_signal(nni_msgqueue *, nni_signal *); // are freed. Unlike closing a go channel, this operation is idempotent. extern void nni_msgqueue_close(nni_msgqueue *); +// nni_msgqueue_drain is like nng_msgqueue_close, except that reads +// against the queue are permitted for up to the time limit. The +// operation blocks until either the queue is empty, or the timeout +// has expired. Any messages still in the queue at the timeout are freed. +extern void nni_msgqueue_drain(nni_msgqueue *, nni_time); + #endif // CORE_MSQUEUE_H diff --git a/src/core/pipe.c b/src/core/pipe.c index 58a31d7f..bf2e64fe 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -16,23 +16,23 @@ // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. uint32_t -nni_pipe_id(nni_pipe * p) +nni_pipe_id(nni_pipe *p) { return (p->p_id); } int -nni_pipe_send(nni_pipe * p, nng_msg *msg) +nni_pipe_send(nni_pipe *p, nng_msg *msg) { - return (p->p_ops.p_send(p->p_tran, msg)); + return (p->p_ops.p_send(p->p_data, msg)); } int -nni_pipe_recv(nni_pipe * p, nng_msg **msgp) +nni_pipe_recv(nni_pipe *p, nng_msg **msgp) { - return (p->p_ops.p_recv(p->p_tran, msgp)); + return (p->p_ops.p_recv(p->p_data, msgp)); } @@ -40,22 +40,39 @@ nni_pipe_recv(nni_pipe * p, nng_msg **msgp) // subsequent attempts receive or send (including any waiting receive) will // simply return NNG_ECLOSED. void -nni_pipe_close(nni_pipe * p) +nni_pipe_close(nni_pipe *p) { - p->p_ops.p_close(p->p_tran); + p->p_ops.p_close(p->p_data); } uint16_t -nni_pipe_peer(nni_pipe * p) +nni_pipe_peer(nni_pipe *p) { - return (p->p_ops.p_peer(p->p_tran)); + return (p->p_ops.p_peer(p->p_data)); } void -nni_pipe_destroy(nni_pipe * p) +nni_pipe_destroy(nni_pipe *p) { - p->p_ops.p_destroy(p->p_tran); + if (p->p_data != NULL) { + p->p_ops.p_destroy(p->p_data); + } nni_free(p, sizeof (*p)); } + + +int +nni_pipe_create(nni_pipe **pp, const nni_pipe_ops *ops) +{ + nni_pipe *p; + + if ((p = nni_alloc(sizeof (*p))) == NULL) { + return (NNG_ENOMEM); + } + p->p_data = NULL; + p->p_ops = *ops; + p->p_id = nni_plat_nextid(); + return (0); +} diff --git a/src/core/pipe.h b/src/core/pipe.h index b78eaa2a..2ce4f1ec 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -21,14 +21,13 @@ struct nng_pipe { uint32_t p_id; struct nni_pipe_ops p_ops; - void * p_tran; + void * p_data; nni_list_node p_sock_node; nni_socket * p_sock; nni_list_node p_ep_node; nni_endpt * p_ep; }; - // Pipe operations that protocols use. extern int nni_pipe_recv(nni_pipe *, nng_msg **); extern int nni_pipe_send(nni_pipe *, nng_msg *); @@ -37,7 +36,7 @@ extern void nni_pipe_close(nni_pipe *); // Used only by the socket core - as we don't wish to expose the details // of the pipe structure outside of pipe.c. -extern int nni_pipe_create(nni_pipe **, struct nni_transport *); +extern int nni_pipe_create(nni_pipe **, const nni_pipe_ops *); extern void nni_pipe_destroy(nni_pipe *); diff --git a/src/core/platform.h b/src/core/platform.h index a674aac5..84477853 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -135,7 +135,7 @@ extern nni_time nni_clock(void); // nni_usleep sleeps for the specified number of microseconds (at least). extern void nni_usleep(nni_duration); -// nni_platform_init is called to allow the platform the chance to +// nni_plat_init is called to allow the platform the chance to // do any necessary initialization. This routine MUST be idempotent, // and threadsafe, and will be called before any other API calls, and // may be called at any point thereafter. It is permitted to return @@ -149,11 +149,18 @@ extern void nni_usleep(nni_duration); // initialize the platform again at a later date. extern int nni_plat_init(int (*)(void)); -// nni_platform_fini is called to clean up resources. It is intended to +// nni_plat_fini is called to clean up resources. It is intended to // be called as the last thing executed in the library, and no other functions // will be called until nni_platform_init is called. extern void nni_plat_fini(void); +// nni_plat_nextid is used to generate a new pipe ID. This should be an +// increasing value, taken from a random starting point. (The randomness +// helps ensure we don't confuse pipe IDs from other connections.) The +// value must be obtained in a threadsafe way (e.g. via an atomic counter +// or under a lock protection.) +extern uint32_t nni_plat_nextid(void); + // Actual platforms we support. This is included up front so that we can // get the specific types that are supplied by the platform. #if defined(PLATFORM_POSIX) diff --git a/src/core/protocol.h b/src/core/protocol.h index a4b24177..f73825d7 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -30,13 +30,6 @@ struct nni_protocol { // Destroy the protocol instance. void (*proto_destroy)(void *); - // Shutdown the protocol instance, including giving time to - // drain any outbound frames (linger). The protocol is not - // required to honor the linger. - // XXX: This is probably redundant -- protocol should notice - // drain by getting NNG_ECLOSED on the upper write queue. - void (*proto_shutdown)(void *); - // Add and remove pipes. These are called as connections are // created or destroyed. int (*proto_add_pipe)(void *, nni_pipe *); diff --git a/src/core/socket.c b/src/core/socket.c index 1331bb4c..70029278 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -73,7 +73,7 @@ nni_socket_close(nni_socket *sock) { nni_pipe *pipe; nni_endpt *ep; - uint64_t linger; + nni_time linger; nni_mutex_enter(&sock->s_mx); @@ -87,23 +87,20 @@ nni_socket_close(nni_socket *sock) } nni_mutex_exit(&sock->s_mx); - // XXX: TODO. This is a place where we should drain the write side - // msgqueue, effectively getting a linger on the socket. The - // protocols will drain this queue, and should continue to run - // handling both transmit and receive until that's done. - // Note that *all* protocols need to monitor for this linger, even - // those that do not transmit. This way they can notice and go ahead - // and quickly shutdown their pipes; this keeps us from having to wait - // for *our* pipelist to become empty. This helps us ensure that - // they effectively get the chance to linger on their side, without - // requring us to do anything magical for them. - - // nni_msgqueue_drain(sock->s_uwq, sock->s_linger); - - // Now we should attempt to wait for the list of pipes to drop to - // zero -- indicating that the protocol has shut things down - // cleanly, voluntarily. (I.e. it finished its drain.) + // XXX: TODO: add socket linger timeout to this, from socket option. linger = nni_clock(); + + // We drain the upper write queue. This is just like closing it, + // except that the protocol gets a chance to get the messages and + // push them down to the transport. This operation can *block* + // until the linger time has expired. + nni_msgqueue_drain(sock->s_uwq, linger); + + // Generally, unless the protocol is blocked trying to perform + // writes (e.g. a slow reader on the other side), it should be + // trying to shut things down -- the normal flow is for it to + // close pipes and call nni_sock_rem_pipe(). We wait to give it + // a chance to do so gracefully. nni_mutex_enter(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) { @@ -111,36 +108,44 @@ nni_socket_close(nni_socket *sock) } } - // Time's up! Shut it down the hard way. + // At this point, we've done everything we politely can to give + // the protocol a chance to flush its write side. Now its time + // to be a little more insistent. + + // Close the upper read queue immediately. This can happen + // safely while we hold the lock. nni_msgqueue_close(sock->s_urq); - nni_msgqueue_close(sock->s_uwq); - // This gives the protocol notice, in case it didn't get the hint. - // XXX: In retrospect, this entry point seems redundant. - sock->s_ops.proto_shutdown(sock->s_data); - - // The protocol *MUST* give us back our pipes at this point, and - // quickly too! If this blocks for any non-trivial amount of time - // here, it indicates a protocol implementation bug. + + // Go through and close all the pipes. + nni_mutex_enter(&sock->s_mx); + NNI_LIST_FOREACH (&sock->s_pipes, pipe) { + nni_pipe_close(pipe); + } + + // At this point, the protocols should have all their operations + // failing, if they have any remaining, and they should be returning + // any pipes back to us very quickly. We'll wait for them to finish, + // as it MUST occur shortly. while (nni_list_first(&sock->s_pipes) != NULL) { nni_cond_wait(&sock->s_cv); } - // Wait to make sure endpoint listeners have shutdown and exited - // as well. They should have done so *long* ago. Because this - // requires waiting for threads to finish, which *could* in theory - // overlap with this, we must drop the socket lock. + // We signaled the endpoints to shutdown and cleanup. We just + // need to wait for them to finish. while ((ep = nni_list_first(&sock->s_eps)) != NULL) { + nni_cond_wait(&sock->s_cv); + } + nni_mutex_exit(&sock->s_mx); - // TODO: This looks like it should happen as an endpt_remove - // operation? - nni_list_remove(&sock->s_eps, ep); - nni_mutex_exit(&sock->s_mx); + // At this point nothing else should be referencing us. - nni_endpt_destroy(ep); - nni_mutex_enter(&sock->s_mx); - } + // The protocol needs to clean up its state. + sock->s_ops.proto_destroy(&sock->s_data); - nni_mutex_exit(&sock->s_mx); + // And we need to clean up *our* state. + nni_cond_fini(&sock->s_cv); + nni_mutex_fini(&sock->s_mx); + nni_free(sock, sizeof (*sock)); return (0); } @@ -197,6 +202,7 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout) return (rv); } + int nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) { @@ -239,6 +245,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout) return (0); } + // nni_socket_protocol returns the socket's 16-bit protocol number. uint16_t nni_socket_proto(nni_socket *sock) @@ -246,6 +253,7 @@ nni_socket_proto(nni_socket *sock) return (sock->s_ops.proto_self); } + // nni_socket_rem_pipe removes the pipe from the socket. This is often // called by the protocol when a pipe is removed due to close. void @@ -294,8 +302,54 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) return (rv); } nni_list_append(&sock->s_pipes, pipe); + + // Add the pipe to its endpoint list. + nni_mutex_enter(&pipe->p_ep->ep_mx); + nni_list_append(&pipe->p_ep->ep_pipes, pipe); + nni_mutex_exit(&pipe->p_ep->ep_mx); + pipe->p_sock = sock; // XXX: Publish event nni_mutex_exit(&sock->s_mx); return (0); } + + +void +nni_sock_dialer(void *arg) +{ + nni_endpt *ep = arg; + nni_socket *sock = ep->ep_sock; + nni_pipe *pipe; + int rv; + + for (;;) { + nni_mutex_enter(&ep->ep_mx); + while ((!ep->ep_close) && + (nni_list_first(&ep->ep_pipes) != NULL)) { + nni_cond_wait(&ep->ep_cv); + } + if (ep->ep_close) { + nni_mutex_exit(&ep->ep_mx); + break; + } + nni_mutex_exit(&ep->ep_mx); + + pipe = NULL; + + if (((rv = nni_endpt_dial(ep, pipe)) != 0) || + ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) { + if (rv == NNG_ECLOSED) { + return; + } + if (pipe != NULL) { + nni_pipe_destroy(pipe); + } + // XXX: Inject a wait for reconnect... + continue; + } + + } + + // XXX: move the endpoint to the sockets reap list +} diff --git a/src/core/socket.h b/src/core/socket.h index ec4acfdb..937f221a 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -33,6 +33,8 @@ struct nng_socket { int s_besteffort; // Best effort mode delivery int s_senderr; // Protocol state machine use int s_recverr; // Protocol state machine use + + uint32_t s_nextid; // Next Pipe ID. }; extern int nni_socket_create(nni_socket **, uint16_t); diff --git a/src/core/transport.h b/src/core/transport.h index 4e39adaf..e4f07002 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -15,21 +15,18 @@ struct nni_transport { // tran_scheme is the transport scheme, such as "tcp" or "inproc". - const char * tran_scheme; + const char * tran_scheme; // tran_ep_ops links our endpoint operations. - const struct nni_endpt_ops * tran_ep_ops; - - // tran_pipe_ops links our pipe operations. - const struct nni_pipe_ops * tran_pipe_ops; + const nni_endpt_ops * tran_ep_ops; // tran_init, if not NULL, is called once during library // initialization. - int (*tran_init)(void); + int (*tran_init)(void); // tran_fini, if not NULL, is called during library deinitialization. // It should release any global resources, close any open files, etc. - void (*tran_fini)(void); + void (*tran_fini)(void); }; @@ -40,38 +37,44 @@ struct nni_transport { struct nni_endpt_ops { // ep_create creates a vanilla endpoint. The value created is // used for the first argument for all other endpoint functions. - int (*ep_create)(void **, const char *, uint16_t); + int (*ep_create)(void **, const char *, + uint16_t); // ep_destroy frees the resources associated with the endpoint. // The endpoint will already have been closed. - void (*ep_destroy)(void *); + void (*ep_destroy)(void *); // ep_dial starts dialing, and creates a new pipe, // which is returned in the final argument. It can return errors // NNG_EACCESS, NNG_ECONNREFUSED, NNG_EBADADDR, NNG_ECONNFAILED, // NNG_ETIMEDOUT, and NNG_EPROTO. - int (*ep_dial)(void *, void **); + int (*ep_dial)(void *, void **); // ep_listen just does the bind() and listen() work, // reserving the address but not creating any connections. // It should return NNG_EADDRINUSE if the address is already // taken. It can also return NNG_EBADADDR for an unsuitable // address, or NNG_EACCESS for permission problems. - int (*ep_listen)(void *); + int (*ep_listen)(void *); // ep_accept accepts an inbound connection, and creates // a transport pipe, which is returned in the final argument. - int (*ep_accept)(void *, void **); + int (*ep_accept)(void *, void **); // ep_close stops the endpoint from operating altogether. It does // not affect pipes that have already been created. - void (*ep_close)(void *); + void (*ep_close)(void *); // ep_setopt sets an endpoint (transport-specific) option. - int (*ep_setopt)(void *, int, const void *, size_t); + int (*ep_setopt)(void *, int, const void *, + size_t); // ep_getopt gets an endpoint (transport-specific) option. - int (*ep_getopt)(void *, int, void *, size_t *); + int (*ep_getopt)(void *, int, void *, + size_t *); + + // ep_pipe_ops links our pipe operations. + const nni_pipe_ops * ep_pipe_ops; }; // Pipe operations are entry points called by the socket. These may be called |
