summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c28
1 files changed, 28 insertions, 0 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 52df70a2..c0cf20b7 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -268,6 +268,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
// We make a copy of the protocol operations.
sock->s_protocol = proto->proto_self;
sock->s_peer = proto->proto_peer;
+ sock->s_flags = proto->proto_flags;
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
@@ -280,6 +281,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node);
NNI_LIST_INIT(&sock->s_events, nni_event, e_node);
+ sock->s_send_fd.sn_init = 0;
+ sock->s_recv_fd.sn_init = 0;
sock->s_sock_ops = *proto->proto_sock_ops;
sops = &sock->s_sock_ops;
@@ -535,6 +538,16 @@ nni_sock_close(nni_sock *sock)
nni_mtx_unlock(nni_idlock);
+ // Close any open notification pipes.
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_close(sock->s_recv_fd.sn_wfd,
+ sock->s_recv_fd.sn_rfd);
+ }
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_close(sock->s_send_fd.sn_wfd,
+ sock->s_send_fd.sn_rfd);
+ }
+
// The protocol needs to clean up its state.
sock->s_sock_ops.sock_fini(sock->s_data);
@@ -593,6 +606,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
// backpressure, we just throw it away, and don't complain.
expire = NNI_TIME_ZERO;
}
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
+ }
rv = nni_msgq_put_until(sock->s_uwq, msg, expire);
if (besteffort && (rv == NNG_EAGAIN)) {
// Pretend this worked... it didn't, but pretend.
@@ -620,6 +636,10 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
}
nni_mtx_unlock(&sock->s_mx);
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
+ }
+
for (;;) {
rv = nni_msgq_get_until(sock->s_urq, &msg, expire);
if (rv != 0) {
@@ -788,6 +808,14 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RCVBUF:
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
+ case NNG_OPT_SENDFD:
+ rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SEND,
+ val, sizep);
+ break;
+ case NNG_OPT_RECVFD:
+ rv = nni_getopt_fd(sock, &sock->s_recv_fd, NNG_EV_CAN_RECV,
+ val, sizep);
+ break;
}
nni_mtx_unlock(&sock->s_mx);
return (rv);