aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c113
1 files changed, 71 insertions, 42 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index f5acd941..66580300 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -81,12 +81,11 @@ struct nni_socket {
nni_list s_pipes; // active pipes
nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
- bool s_ctxwait; // Waiting for contexts to close.
-
- nni_notifyfd s_send_fd;
- nni_notifyfd s_recv_fd;
+ bool s_closing; // Socket is closing
+ bool s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
+ nng_pipe_cb s_pipe_cb; // User callback for pipe events.
+ void * s_pipe_cbarg; // Argument for pipe events.
};
static void nni_ctx_destroy(nni_ctx *);
@@ -435,21 +434,41 @@ nni_sock_rele(nni_sock *s)
int
nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
{
- void *pdata = nni_pipe_get_proto_data(pipe);
- int rv;
+ void * pdata = nni_pipe_get_proto_data(pipe);
+ nng_pipe_cb cb;
+ int rv;
NNI_ASSERT(s != NULL);
nni_mtx_lock(&s->s_mx);
- if (s->s_closing) {
- // We're closing, bail out.
- rv = NNG_ECLOSED;
- } else if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
+ if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
// Peer protocol mismatch.
- rv = NNG_EPROTO;
- } else {
- // Protocol can reject for other reasons.
- rv = s->s_pipe_ops.pipe_start(pdata);
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EPROTO);
+ }
+ if ((cb = s->s_pipe_cb) != NULL) {
+ nng_pipe p;
+ void * arg = s->s_pipe_cbarg;
+ nni_mtx_unlock(&s->s_mx);
+ p.id = nni_pipe_id(pipe);
+ cb(p, NNG_PIPE_ADD, arg);
+ if (nni_pipe_closed(pipe)) {
+ return (NNG_ECLOSED);
+ }
+ nni_mtx_lock(&s->s_mx);
}
+ if (s->s_closing) {
+ // We're closing, bail out. This has to be done after
+ // we have dropped the lock above in case the sock is closed
+ // while the user callback runs.
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ // Protocol can reject for other reasons.
+ // This must be the last operation, until this point
+ // the protocol has not actually "seen" the pipe.
+ rv = s->s_pipe_ops.pipe_start(pdata);
+
nni_mtx_unlock(&s->s_mx);
return (rv);
}
@@ -484,9 +503,18 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
void
nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
{
- void *pdata;
+ void * pdata;
+ nng_pipe_cb cb;
nni_mtx_lock(&sock->s_mx);
+ if ((cb = sock->s_pipe_cb) != NULL) {
+ void * arg = sock->s_pipe_cbarg;
+ nng_pipe p;
+ nni_mtx_unlock(&sock->s_mx);
+ p.id = nni_pipe_id(pipe);
+ cb(p, NNG_PIPE_REM, arg);
+ nni_mtx_lock(&sock->s_mx);
+ }
pdata = nni_pipe_get_proto_data(pipe);
if (pdata != NULL) {
sock->s_pipe_ops.pipe_stop(pdata);
@@ -507,14 +535,6 @@ nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
- // Close any open notification pipes.
- if (s->s_recv_fd.sn_init) {
- nni_plat_pipe_close(s->s_recv_fd.sn_wfd, s->s_recv_fd.sn_rfd);
- }
- if (s->s_send_fd.sn_init) {
- nni_plat_pipe_close(s->s_send_fd.sn_wfd, s->s_send_fd.sn_rfd);
- }
-
// The protocol needs to clean up its state.
if (s->s_data != NULL) {
s->s_sock_ops.sock_fini(s->s_data);
@@ -546,21 +566,21 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->s_sndtimeo = -1;
- s->s_rcvtimeo = -1;
- s->s_closing = 0;
- s->s_reconn = NNI_SECOND;
- s->s_reconnmax = 0;
- s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
- s->s_id = 0;
- s->s_refcnt = 0;
- s->s_send_fd.sn_init = 0;
- s->s_recv_fd.sn_init = 0;
- s->s_self_id = proto->proto_self;
- s->s_peer_id = proto->proto_peer;
- s->s_flags = proto->proto_flags;
- s->s_sock_ops = *proto->proto_sock_ops;
- s->s_pipe_ops = *proto->proto_pipe_ops;
+ s->s_sndtimeo = -1;
+ s->s_rcvtimeo = -1;
+ s->s_closing = 0;
+ s->s_reconn = NNI_SECOND;
+ s->s_reconnmax = 0;
+ s->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
+ s->s_id = 0;
+ s->s_refcnt = 0;
+ s->s_self_id = proto->proto_self;
+ s->s_peer_id = proto->proto_peer;
+ s->s_flags = proto->proto_flags;
+ s->s_sock_ops = *proto->proto_sock_ops;
+ s->s_pipe_ops = *proto->proto_pipe_ops;
+ s->s_closed = false;
+ s->s_closing = false;
if (proto->proto_ctx_ops != NULL) {
s->s_ctx_ops = *proto->proto_ctx_ops;
@@ -692,7 +712,7 @@ nni_sock_shutdown(nni_sock *sock)
return (NNG_ECLOSED);
}
// Mark us closing, so no more EPs or changes can occur.
- sock->s_closing = 1;
+ sock->s_closing = true;
// Close the EPs. This prevents new connections from forming but
// but allows existing ones to drain.
@@ -800,7 +820,7 @@ nni_sock_close(nni_sock *s)
nni_sock_rele(s);
return;
}
- s->s_closed = 1;
+ s->s_closed = true;
nni_idhash_remove(nni_sock_hash, s->s_id);
// We might have been removed from the list already, e.g. by
@@ -1157,6 +1177,15 @@ nni_sock_flags(nni_sock *sock)
return (sock->s_flags);
}
+void
+nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg)
+{
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_pipe_cb = cb;
+ sock->s_pipe_cbarg = arg;
+ nni_mtx_unlock(&sock->s_mx);
+}
+
int
nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
{