aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-07 21:49:48 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-07 21:52:30 -0800
commitbc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1 (patch)
tree55ca7c800e9dfa54bb58b3f2323b1cb5996fab09 /src/core/socket.c
parentffdceebc19214f384f1b1b6b358f1b2301384135 (diff)
downloadnng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.gz
nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.tar.bz2
nng-bc7a6f22f23e95aad3ecd42adf9ac2b7b75a47e1.zip
Simplify locking for protocols.
In an attempt to simplify the protocol implementation, and hopefully track down a close related race, we've made it so that most protocols need not worry about locks, and can access the socket lock if they do need a lock. They also let the socket manage their workers, for the most part. (The req protocol is special, since it needs a top level work distributor, *and* a resender.)
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c120
1 files changed, 98 insertions, 22 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 4e8eccbf..2ef35d7a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -56,7 +56,7 @@ nni_reaper(void *arg)
// Note that if a protocol has rejected the pipe, it
// won't have any data.
if (pipe->p_active) {
- pipe->p_proto_ops.pipe_rem(pipe->p_proto_data);
+ sock->s_pipe_ops.pipe_rem(pipe->p_proto_data);
}
nni_mtx_unlock(&sock->s_mx);
@@ -78,6 +78,13 @@ nni_reaper(void *arg)
}
+nni_mtx *
+nni_sock_mtx(nni_sock *sock)
+{
+ return (&sock->s_mx);
+}
+
+
static nni_msg *
nni_sock_nullfilter(void *arg, nni_msg *mp)
{
@@ -108,6 +115,22 @@ nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz)
}
+static void
+nni_sock_nullop(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+}
+
+
+static int
+nni_sock_nulladdpipe(void *arg)
+{
+ NNI_ARG_UNUSED(arg);
+
+ return (0);
+}
+
+
// nn_sock_open creates the underlying socket.
int
nni_sock_open(nni_sock **sockp, uint16_t pnum)
@@ -124,7 +147,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
// We make a copy of the protocol operations.
- sock->s_proto = *proto;
+ sock->s_protocol = proto->proto_self;
+ sock->s_peer = proto->proto_peer;
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
@@ -135,6 +159,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
+ sock->s_sock_ops = *proto->proto_sock_ops;
+ if (sock->s_sock_ops.sock_sfilter == NULL) {
+ sock->s_sock_ops.sock_sfilter = nni_sock_nullfilter;
+ }
+ if (sock->s_sock_ops.sock_rfilter == NULL) {
+ sock->s_sock_ops.sock_rfilter = nni_sock_nullfilter;
+ }
+ if (sock->s_sock_ops.sock_getopt == NULL) {
+ sock->s_sock_ops.sock_getopt = nni_sock_nullgetopt;
+ }
+ if (sock->s_sock_ops.sock_setopt == NULL) {
+ sock->s_sock_ops.sock_setopt = nni_sock_nullsetopt;
+ }
+ if (sock->s_sock_ops.sock_close == NULL) {
+ sock->s_sock_ops.sock_close = nni_sock_nullop;
+ }
+ sock->s_pipe_ops = *proto->proto_pipe_ops;
+ if (sock->s_pipe_ops.pipe_add == NULL) {
+ sock->s_pipe_ops.pipe_add = nni_sock_nulladdpipe;
+ }
+ if (sock->s_pipe_ops.pipe_rem == NULL) {
+ sock->s_pipe_ops.pipe_rem = nni_sock_nullop;
+ }
+
if ((rv = nni_mtx_init(&sock->s_mx)) != 0) {
NNI_FREE_STRUCT(sock);
return (rv);
@@ -145,6 +193,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
return (rv);
}
+
if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) {
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
@@ -161,14 +210,14 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
}
if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) {
nni_msgq_fini(sock->s_uwq);
- nni_thr_fini(&sock->s_reaper);
+ nni_thr_fini(&sock->s_recver);
nni_cv_fini(&sock->s_cv);
nni_mtx_fini(&sock->s_mx);
NNI_FREE_STRUCT(sock);
return (rv);
}
- if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) {
+ if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) {
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
@@ -177,19 +226,39 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
NNI_FREE_STRUCT(sock);
return (rv);
}
- if (sock->s_proto.proto_send_filter == NULL) {
- sock->s_proto.proto_send_filter = nni_sock_nullfilter;
- }
- if (sock->s_proto.proto_recv_filter == NULL) {
- sock->s_proto.proto_recv_filter = nni_sock_nullfilter;
- }
- if (sock->s_proto.proto_getopt == NULL) {
- sock->s_proto.proto_getopt = nni_sock_nullgetopt;
+
+ // NB: If these functions are NULL, the thread initialization is
+ // largely a NO-OP. The system won't actually create the threads.
+ rv = nni_thr_init(&sock->s_sender, sock->s_sock_ops.sock_send,
+ sock->s_data);
+ if (rv != 0) {
+ nni_thr_wait(&sock->s_reaper);
+ sock->s_sock_ops.sock_fini(&sock->s_data);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_thr_fini(&sock->s_reaper);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
}
- if (sock->s_proto.proto_setopt == NULL) {
- sock->s_proto.proto_setopt = nni_sock_nullsetopt;
+ rv = nni_thr_init(&sock->s_recver, sock->s_sock_ops.sock_recv,
+ sock->s_data);
+ if (rv != 0) {
+ nni_thr_wait(&sock->s_sender);
+ sock->s_sock_ops.sock_fini(&sock->s_data);
+ nni_msgq_fini(sock->s_urq);
+ nni_msgq_fini(sock->s_uwq);
+ nni_thr_fini(&sock->s_sender);
+ nni_thr_fini(&sock->s_reaper);
+ nni_cv_fini(&sock->s_cv);
+ nni_mtx_fini(&sock->s_mx);
+ NNI_FREE_STRUCT(sock);
}
+
+
nni_thr_run(&sock->s_reaper);
+ nni_thr_run(&sock->s_recver);
+ nni_thr_run(&sock->s_sender);
*sockp = sock;
return (0);
}
@@ -265,10 +334,14 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_lock(&sock->s_mx);
}
+ sock->s_sock_ops.sock_close(sock->s_data);
+
nni_cv_wake(&sock->s_cv);
nni_mtx_unlock(&sock->s_mx);
- // Wait for the reaper to exit.
+ // Wait for the threads to exit.
+ nni_thr_wait(&sock->s_sender);
+ nni_thr_wait(&sock->s_recver);
nni_thr_wait(&sock->s_reaper);
// At this point, there are no threads blocked inside of us
@@ -297,7 +370,7 @@ nni_sock_close(nni_sock *sock)
// the results may be tragic.
// The protocol needs to clean up its state.
- sock->s_proto.proto_fini(sock->s_data);
+ sock->s_sock_ops.sock_fini(sock->s_data);
// And we need to clean up *our* state.
nni_thr_fini(&sock->s_reaper);
@@ -328,9 +401,10 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
return (rv);
}
besteffort = sock->s_besteffort;
+
+ msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg);
nni_mtx_unlock(&sock->s_mx);
- msg = sock->s_proto.proto_send_filter(sock->s_data, msg);
if (msg == NULL) {
return (0);
}
@@ -372,7 +446,9 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- msg = sock->s_proto.proto_recv_filter(sock->s_data, msg);
+ nni_mtx_lock(&sock->s_mx);
+ msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg);
+ nni_mtx_unlock(&sock->s_mx);
if (msg != NULL) {
break;
}
@@ -388,14 +464,14 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
uint16_t
nni_sock_proto(nni_sock *sock)
{
- return (sock->s_proto.proto_self);
+ return (sock->s_protocol);
}
uint16_t
nni_sock_peer(nni_sock *sock)
{
- return (sock->s_proto.proto_peer);
+ return (sock->s_peer);
}
@@ -488,7 +564,7 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size);
+ rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
return (rv);
@@ -533,7 +609,7 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
nni_mtx_unlock(&sock->s_mx);
return (NNG_ECLOSED);
}
- rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep);
+ rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep);
if (rv != NNG_ENOTSUP) {
nni_mtx_unlock(&sock->s_mx);
return (rv);