diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 120 |
1 files changed, 98 insertions, 22 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 4e8eccbf..2ef35d7a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -56,7 +56,7 @@ nni_reaper(void *arg) // Note that if a protocol has rejected the pipe, it // won't have any data. if (pipe->p_active) { - pipe->p_proto_ops.pipe_rem(pipe->p_proto_data); + sock->s_pipe_ops.pipe_rem(pipe->p_proto_data); } nni_mtx_unlock(&sock->s_mx); @@ -78,6 +78,13 @@ nni_reaper(void *arg) } +nni_mtx * +nni_sock_mtx(nni_sock *sock) +{ + return (&sock->s_mx); +} + + static nni_msg * nni_sock_nullfilter(void *arg, nni_msg *mp) { @@ -108,6 +115,22 @@ nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) } +static void +nni_sock_nullop(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + + +static int +nni_sock_nulladdpipe(void *arg) +{ + NNI_ARG_UNUSED(arg); + + return (0); +} + + // nn_sock_open creates the underlying socket. int nni_sock_open(nni_sock **sockp, uint16_t pnum) @@ -124,7 +147,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } // We make a copy of the protocol operations. - sock->s_proto = *proto; + sock->s_protocol = proto->proto_self; + sock->s_peer = proto->proto_peer; sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; @@ -135,6 +159,30 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); + sock->s_sock_ops = *proto->proto_sock_ops; + if (sock->s_sock_ops.sock_sfilter == NULL) { + sock->s_sock_ops.sock_sfilter = nni_sock_nullfilter; + } + if (sock->s_sock_ops.sock_rfilter == NULL) { + sock->s_sock_ops.sock_rfilter = nni_sock_nullfilter; + } + if (sock->s_sock_ops.sock_getopt == NULL) { + sock->s_sock_ops.sock_getopt = nni_sock_nullgetopt; + } + if (sock->s_sock_ops.sock_setopt == NULL) { + sock->s_sock_ops.sock_setopt = nni_sock_nullsetopt; + } + if (sock->s_sock_ops.sock_close == NULL) { + sock->s_sock_ops.sock_close = nni_sock_nullop; + } + sock->s_pipe_ops = *proto->proto_pipe_ops; + if (sock->s_pipe_ops.pipe_add == NULL) { + sock->s_pipe_ops.pipe_add = nni_sock_nulladdpipe; + } + if (sock->s_pipe_ops.pipe_rem == NULL) { + sock->s_pipe_ops.pipe_rem = nni_sock_nullop; + } + if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { NNI_FREE_STRUCT(sock); return (rv); @@ -145,6 +193,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) return (rv); } + if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); @@ -161,14 +210,14 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) } if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); + nni_thr_fini(&sock->s_recver); nni_cv_fini(&sock->s_cv); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); return (rv); } - if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_sock_ops.sock_init(&sock->s_data, sock)) != 0) { nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); @@ -177,19 +226,39 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_FREE_STRUCT(sock); return (rv); } - if (sock->s_proto.proto_send_filter == NULL) { - sock->s_proto.proto_send_filter = nni_sock_nullfilter; - } - if (sock->s_proto.proto_recv_filter == NULL) { - sock->s_proto.proto_recv_filter = nni_sock_nullfilter; - } - if (sock->s_proto.proto_getopt == NULL) { - sock->s_proto.proto_getopt = nni_sock_nullgetopt; + + // NB: If these functions are NULL, the thread initialization is + // largely a NO-OP. The system won't actually create the threads. + rv = nni_thr_init(&sock->s_sender, sock->s_sock_ops.sock_send, + sock->s_data); + if (rv != 0) { + nni_thr_wait(&sock->s_reaper); + sock->s_sock_ops.sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_thr_fini(&sock->s_reaper); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); } - if (sock->s_proto.proto_setopt == NULL) { - sock->s_proto.proto_setopt = nni_sock_nullsetopt; + rv = nni_thr_init(&sock->s_recver, sock->s_sock_ops.sock_recv, + sock->s_data); + if (rv != 0) { + nni_thr_wait(&sock->s_sender); + sock->s_sock_ops.sock_fini(&sock->s_data); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_thr_fini(&sock->s_sender); + nni_thr_fini(&sock->s_reaper); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); } + + nni_thr_run(&sock->s_reaper); + nni_thr_run(&sock->s_recver); + nni_thr_run(&sock->s_sender); *sockp = sock; return (0); } @@ -265,10 +334,14 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_lock(&sock->s_mx); } + sock->s_sock_ops.sock_close(sock->s_data); + nni_cv_wake(&sock->s_cv); nni_mtx_unlock(&sock->s_mx); - // Wait for the reaper to exit. + // Wait for the threads to exit. + nni_thr_wait(&sock->s_sender); + nni_thr_wait(&sock->s_recver); nni_thr_wait(&sock->s_reaper); // At this point, there are no threads blocked inside of us @@ -297,7 +370,7 @@ nni_sock_close(nni_sock *sock) // the results may be tragic. // The protocol needs to clean up its state. - sock->s_proto.proto_fini(sock->s_data); + sock->s_sock_ops.sock_fini(sock->s_data); // And we need to clean up *our* state. nni_thr_fini(&sock->s_reaper); @@ -328,9 +401,10 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) return (rv); } besteffort = sock->s_besteffort; + + msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); nni_mtx_unlock(&sock->s_mx); - msg = sock->s_proto.proto_send_filter(sock->s_data, msg); if (msg == NULL) { return (0); } @@ -372,7 +446,9 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - msg = sock->s_proto.proto_recv_filter(sock->s_data, msg); + nni_mtx_lock(&sock->s_mx); + msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); + nni_mtx_unlock(&sock->s_mx); if (msg != NULL) { break; } @@ -388,14 +464,14 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_proto.proto_self); + return (sock->s_protocol); } uint16_t nni_sock_peer(nni_sock *sock) { - return (sock->s_proto.proto_peer); + return (sock->s_peer); } @@ -488,7 +564,7 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); + rv = sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); return (rv); @@ -533,7 +609,7 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) nni_mtx_unlock(&sock->s_mx); return (NNG_ECLOSED); } - rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); + rv = sock->s_sock_ops.sock_getopt(sock->s_data, opt, val, sizep); if (rv != NNG_ENOTSUP) { nni_mtx_unlock(&sock->s_mx); return (rv); |
