diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 81 |
1 files changed, 77 insertions, 4 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 541c2383..6a243650 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,51 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +struct nni_socket { + nni_list_node s_node; + nni_mtx s_mx; + nni_cv s_cv; + nni_cv s_close_cv; + + uint32_t s_id; + uint32_t s_flags; + unsigned s_refcnt; // protected by global lock + void * s_data; // Protocol private + + nni_msgq *s_uwq; // Upper write queue + nni_msgq *s_urq; // Upper read queue + + nni_proto_id s_self_id; + nni_proto_id s_peer_id; + + nni_proto_pipe_ops s_pipe_ops; + nni_proto_sock_ops s_sock_ops; + + // XXX: options + nni_duration s_linger; // linger time + nni_duration s_sndtimeo; // send timeout + nni_duration s_rcvtimeo; // receive timeout + nni_duration s_reconn; // reconnect time + nni_duration s_reconnmax; // max reconnect time + size_t s_rcvmaxsz; // maximum receive size + + nni_list s_eps; // active endpoints + nni_list s_pipes; // active pipes + + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock + int s_besteffort; // Best effort mode delivery + int s_senderr; // Protocol state machine use + int s_recverr; // Protocol state machine use + + nni_event s_recv_ev; // Event for readability + nni_event s_send_ev; // Event for sendability + + nni_notifyfd s_send_fd; + nni_notifyfd s_recv_fd; +}; + uint32_t nni_sock_id(nni_sock *s) { @@ -533,10 +578,21 @@ nni_sock_closeall(void) } int -nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) +nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) { - int rv; - int besteffort; + int rv; + int besteffort; + nni_time expire; + nni_time timeo = sock->s_sndtimeo; + + if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { + expire = NNI_TIME_ZERO; + } else if (timeo == NNI_TIME_NEVER) { + expire = NNI_TIME_NEVER; + } else { + expire = nni_clock(); + expire += timeo; + } // Senderr is typically set by protocols when the state machine // indicates that it is no longer valid to send a message. E.g. @@ -579,10 +635,21 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) } int -nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) +nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) { int rv; nni_msg *msg; + nni_time expire; + nni_time timeo = sock->s_rcvtimeo; + + if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { + expire = NNI_TIME_ZERO; + } else if (timeo == NNI_TIME_NEVER) { + expire = NNI_TIME_NEVER; + } else { + expire = nni_clock(); + expire += timeo; + } nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -791,3 +858,9 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) nni_mtx_unlock(&sock->s_mx); return (rv); } + +uint32_t +nni_sock_flags(nni_sock *sock) +{ + return (sock->s_flags); +} |
