diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 95 |
1 files changed, 65 insertions, 30 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 6aa40bfc..cad4b10a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -50,16 +50,15 @@ nni_reaper(void *arg) ep->ep_pipe = NULL; nni_cv_wake(&ep->ep_cv); } - nni_mtx_unlock(&sock->s_mx); // Remove the pipe from the protocol. Protocols may // keep lists of pipes for managing their topologies. // Note that if a protocol has rejected the pipe, it // won't have any data. if (pipe->p_active) { - sock->s_ops.proto_rem_pipe(sock->s_data, - pipe->p_pdata); + pipe->p_proto_ops.pipe_rem(pipe->p_proto_data); } + nni_mtx_unlock(&sock->s_mx); // XXX: also publish event... nni_pipe_destroy(pipe); @@ -79,21 +78,53 @@ nni_reaper(void *arg) } +static nni_msg * +nni_sock_nullfilter(void *arg, nni_msg *mp) +{ + NNI_ARG_UNUSED(arg); + return (mp); +} + + +static int +nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(num); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(szp); + return (NNG_ENOTSUP); +} + + +static int +nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(num); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(sz); + return (NNG_ENOTSUP); +} + + // nn_sock_open creates the underlying socket. int -nni_sock_open(nni_sock **sockp, uint16_t proto) +nni_sock_open(nni_sock **sockp, uint16_t pnum) { nni_sock *sock; - nni_protocol *ops; + nni_proto *proto; int rv; - if ((ops = nni_protocol_find(proto)) == NULL) { + if ((proto = nni_proto_find(pnum)) == NULL) { return (NNG_ENOTSUP); } if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { return (NNG_ENOMEM); } - sock->s_ops = *ops; + + // We make a copy of the protocol operations. + sock->s_proto = *proto; sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; @@ -137,7 +168,7 @@ nni_sock_open(nni_sock **sockp, uint16_t proto) return (rv); } - if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) { nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); @@ -146,6 +177,18 @@ nni_sock_open(nni_sock **sockp, uint16_t proto) NNI_FREE_STRUCT(sock); return (rv); } + if (sock->s_proto.proto_send_filter == NULL) { + sock->s_proto.proto_send_filter = nni_sock_nullfilter; + } + if (sock->s_proto.proto_recv_filter == NULL) { + sock->s_proto.proto_recv_filter = nni_sock_nullfilter; + } + if (sock->s_proto.proto_getopt == NULL) { + sock->s_proto.proto_getopt = nni_sock_nullgetopt; + } + if (sock->s_proto.proto_setopt == NULL) { + sock->s_proto.proto_setopt = nni_sock_nullsetopt; + } nni_thr_run(&sock->s_reaper); *sockp = sock; return (0); @@ -223,7 +266,7 @@ nni_sock_close(nni_sock *sock) // At this point nothing else should be referencing us. // The protocol needs to clean up its state. - sock->s_ops.proto_destroy(sock->s_data); + sock->s_proto.proto_fini(sock->s_data); // And we need to clean up *our* state. nni_msgq_fini(sock->s_urq); @@ -256,11 +299,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) besteffort = sock->s_besteffort; nni_mtx_unlock(&sock->s_mx); - if (sock->s_ops.proto_send_filter != NULL) { - msg = sock->s_ops.proto_send_filter(sock->s_data, msg); - if (msg == NULL) { - return (0); - } + msg = sock->s_proto.proto_send_filter(sock->s_data, msg); + if (msg == NULL) { + return (0); } if (besteffort) { @@ -300,9 +341,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - if (sock->s_ops.proto_recv_filter != NULL) { - msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); - } + msg = sock->s_proto.proto_recv_filter(sock->s_data, msg); if (msg != NULL) { break; } @@ -318,7 +357,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_ops.proto_self); + return (sock->s_proto.proto_self); } @@ -386,12 +425,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); - if (sock->s_ops.proto_setopt != NULL) { - rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } + rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); } switch (opt) { case NNG_OPT_LINGER: @@ -429,12 +466,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); - if (sock->s_ops.proto_getopt != NULL) { - rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } + rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); } switch (opt) { case NNG_OPT_LINGER: |
