diff options
| -rw-r--r-- | src/core/device.c | 30 | ||||
| -rw-r--r-- | src/core/options.c | 9 | ||||
| -rw-r--r-- | src/core/socket.c | 81 | ||||
| -rw-r--r-- | src/core/socket.h | 55 | ||||
| -rw-r--r-- | src/nng.c | 27 |
5 files changed, 114 insertions, 88 deletions
diff --git a/src/core/device.c b/src/core/device.c index 81aafc3d..bdfcf0a6 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -28,10 +28,10 @@ nni_device_loop(nni_sock *from, nni_sock *to) for (;;) { // Take messages sock[0], and send to sock[1]. // If an error occurs, we close both sockets. - if ((rv = nni_sock_recvmsg(from, &msg, NNI_TIME_NEVER)) != 0) { + if ((rv = nni_sock_recvmsg(from, &msg, 0)) != 0) { break; } - if ((rv = nni_sock_sendmsg(to, msg, NNI_TIME_NEVER)) != 0) { + if ((rv = nni_sock_sendmsg(to, msg, 0)) != 0) { nni_msg_free(msg); break; } @@ -65,6 +65,8 @@ nni_device(nni_sock *sock1, nni_sock *sock2) { nni_device_pair pair; int rv; + nni_time never = NNI_TIME_NEVER; + size_t sz; memset(&pair, 0, sizeof(pair)); pair.socks[0] = sock1; @@ -80,8 +82,19 @@ nni_device(nni_sock *sock1, nni_sock *sock2) rv = NNG_EINVAL; goto out; } - if ((sock1->s_peer_id.p_id != sock2->s_self_id.p_id) || - (sock2->s_peer_id.p_id != sock1->s_self_id.p_id)) { + if ((nni_sock_peer(sock1) != nni_sock_proto(sock2)) || + (nni_sock_peer(sock2) != nni_sock_proto(sock1))) { + rv = NNG_EINVAL; + goto out; + } + + // No timeouts. + sz = sizeof(never); + if ((nni_sock_setopt(sock1, NNG_OPT_RCVTIMEO, &never, sz) != 0) || + (nni_sock_setopt(sock2, NNG_OPT_RCVTIMEO, &never, sz) != 0) || + (nni_sock_setopt(sock1, NNG_OPT_SNDTIMEO, &never, sz) != 0) || + (nni_sock_setopt(sock2, NNG_OPT_SNDTIMEO, &never, sz) != 0)) { + // This should never happen. rv = NNG_EINVAL; goto out; } @@ -96,14 +109,15 @@ nni_device(nni_sock *sock1, nni_sock *sock2) nni_thr_fini(&pair.thrs[0]); goto out; } - if (((sock1->s_flags & NNI_PROTO_FLAG_RCV) != 0) && - ((sock2->s_flags & NNI_PROTO_FLAG_SND) != 0)) { + if (((nni_sock_flags(sock1) & NNI_PROTO_FLAG_RCV) != 0) && + ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_SND) != 0)) { nni_thr_run(&pair.thrs[0]); } // If the sockets are the same, then its a simple one way forwarder, // and we don't need two workers (but would be harmless if we did it). - if ((sock1 != sock2) && ((sock2->s_flags & NNI_PROTO_FLAG_RCV) != 0) && - ((sock1->s_flags & NNI_PROTO_FLAG_SND) != 0)) { + if ((sock1 != sock2) && + ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_RCV) != 0) && + ((nni_sock_flags(sock1) & NNI_PROTO_FLAG_SND) != 0)) { nni_thr_run(&pair.thrs[1]); } diff --git a/src/core/options.c b/src/core/options.c index 403630a3..b243b262 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -155,20 +155,23 @@ nni_notifyfd_push(struct nng_event *ev, void *arg) int nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp) { - int rv; + int rv; + uint32_t flags; if ((*szp < sizeof(int))) { return (NNG_EINVAL); } + flags = nni_sock_flags(s); + switch (mask) { case NNG_EV_CAN_SND: - if ((s->s_flags & NNI_PROTO_FLAG_SND) == 0) { + if ((flags & NNI_PROTO_FLAG_SND) == 0) { return (NNG_ENOTSUP); } break; case NNG_EV_CAN_RCV: - if ((s->s_flags & NNI_PROTO_FLAG_RCV) == 0) { + if ((flags & NNI_PROTO_FLAG_RCV) == 0) { return (NNG_ENOTSUP); } break; diff --git a/src/core/socket.c b/src/core/socket.c index 541c2383..6a243650 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,51 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +struct nni_socket { + nni_list_node s_node; + nni_mtx s_mx; + nni_cv s_cv; + nni_cv s_close_cv; + + uint32_t s_id; + uint32_t s_flags; + unsigned s_refcnt; // protected by global lock + void * s_data; // Protocol private + + nni_msgq *s_uwq; // Upper write queue + nni_msgq *s_urq; // Upper read queue + + nni_proto_id s_self_id; + nni_proto_id s_peer_id; + + nni_proto_pipe_ops s_pipe_ops; + nni_proto_sock_ops s_sock_ops; + + // XXX: options + nni_duration s_linger; // linger time + nni_duration s_sndtimeo; // send timeout + nni_duration s_rcvtimeo; // receive timeout + nni_duration s_reconn; // reconnect time + nni_duration s_reconnmax; // max reconnect time + size_t s_rcvmaxsz; // maximum receive size + + nni_list s_eps; // active endpoints + nni_list s_pipes; // active pipes + + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use + int s_recverr; // Protocol state machine use + + nni_event s_recv_ev; // Event for readability + nni_event s_send_ev; // Event for sendability + + nni_notifyfd s_send_fd; + nni_notifyfd s_recv_fd; +}; + uint32_t nni_sock_id(nni_sock *s) { @@ -533,10 +578,21 @@ nni_sock_closeall(void) } int -nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) +nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) { - int rv; - int besteffort; + int rv; + int besteffort; + nni_time expire; + nni_time timeo = sock->s_sndtimeo; + + if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { + expire = NNI_TIME_ZERO; + } else if (timeo == NNI_TIME_NEVER) { + expire = NNI_TIME_NEVER; + } else { + expire = nni_clock(); + expire += timeo; + } // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. @@ -579,10 +635,21 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) } int -nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) +nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) { int rv; nni_msg *msg; + nni_time expire; + nni_time timeo = sock->s_rcvtimeo; + + if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { + expire = NNI_TIME_ZERO; + } else if (timeo == NNI_TIME_NEVER) { + expire = NNI_TIME_NEVER; + } else { + expire = nni_clock(); + expire += timeo; + } nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -791,3 +858,9 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) nni_mtx_unlock(&sock->s_mx); return (rv); } + +uint32_t +nni_sock_flags(nni_sock *sock) +{ + return (sock->s_flags); +} diff --git a/src/core/socket.h b/src/core/socket.h index 9b7ac1f9..9fa6c0fa 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -11,54 +11,6 @@ #ifndef CORE_SOCKET_H #define CORE_SOCKET_H -// NB: This structure is supplied here for use by the CORE. Use of this library -// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR -// TRANSPORTS. -struct nni_socket { - nni_list_node s_node; - nni_mtx s_mx; - nni_cv s_cv; - nni_cv s_close_cv; - - uint32_t s_id; - uint32_t s_flags; - unsigned s_refcnt; // protected by global lock - void * s_data; // Protocol private - - nni_msgq *s_uwq; // Upper write queue - nni_msgq *s_urq; // Upper read queue - - nni_proto_id s_self_id; - nni_proto_id s_peer_id; - - nni_proto_pipe_ops s_pipe_ops; - nni_proto_sock_ops s_sock_ops; - - // XXX: options - nni_duration s_linger; // linger time - nni_duration s_sndtimeo; // send timeout - nni_duration s_rcvtimeo; // receive timeout - nni_duration s_reconn; // reconnect time - nni_duration s_reconnmax; // max reconnect time - size_t s_rcvmaxsz; // maximum receive size - - nni_list s_eps; // active endpoints - nni_list s_pipes; // active pipes - - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use - int s_recverr; // Protocol state machine use - - nni_event s_recv_ev; // Event for readability - nni_event s_send_ev; // Event for sendability - - nni_notifyfd s_send_fd; - nni_notifyfd s_recv_fd; -}; - extern int nni_sock_sys_init(void); extern void nni_sock_sys_fini(void); @@ -72,8 +24,8 @@ extern uint16_t nni_sock_proto(nni_sock *); extern uint16_t nni_sock_peer(nni_sock *); extern int nni_sock_setopt(nni_sock *, int, const void *, size_t); extern int nni_sock_getopt(nni_sock *, int, void *, size_t *); -extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time); -extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time); +extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int); +extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int); extern uint32_t nni_sock_id(nni_sock *); extern void nni_sock_lock(nni_sock *); @@ -115,4 +67,7 @@ extern nni_msgq *nni_sock_recvq(nni_sock *); extern size_t nni_sock_rcvmaxsz(nni_sock *); extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *); +// nni_sock_flags returns the socket flags, used to indicate whether read +// and or write are appropriate for the protocol. +extern uint32_t nni_sock_flags(nni_sock *); #endif // CORE_SOCKET_H @@ -132,28 +132,18 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { - nni_time expire; int rv; nni_sock *sock; if ((rv = nni_sock_find(&sock, sid)) != 0) { return (rv); } - if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_rcvtimeo == 0)) { - expire = NNI_TIME_ZERO; - } else if (sock->s_rcvtimeo < 0) { - expire = NNI_TIME_NEVER; - } else { - expire = nni_clock(); - expire += sock->s_rcvtimeo; - } - - rv = nni_sock_recvmsg(sock, msgp, expire); + rv = nni_sock_recvmsg(sock, msgp, flags); nni_sock_rele(sock); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (expire == NNI_TIME_ZERO)) { + if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } @@ -201,21 +191,12 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) if ((rv = nni_sock_find(&sock, sid)) != 0) { return (rv); } - if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_sndtimeo == 0)) { - expire = NNI_TIME_ZERO; - } else if (sock->s_sndtimeo < 0) { - expire = NNI_TIME_NEVER; - } else { - expire = nni_clock(); - expire += sock->s_sndtimeo; - } - - rv = nni_sock_sendmsg(sock, msg, expire); + rv = nni_sock_sendmsg(sock, msg, flags); nni_sock_rele(sock); // Possibly massage nonblocking attempt. Note that nonblocking is // still done asynchronously, and the calling thread loses context. - if ((rv == NNG_ETIMEDOUT) && (expire == NNI_TIME_ZERO)) { + if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) { rv = NNG_EAGAIN; } |
