diff options
Diffstat (limited to 'src/core/socket.c')
| -rw-r--r-- | src/core/socket.c | 299 |
1 files changed, 226 insertions, 73 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 9aa89a2d..79c1602b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,13 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +typedef struct nni_sockopt { + nni_list_node node; + int opt; + size_t sz; + void * data; +} nni_sockopt; + struct nni_socket { nni_list_node s_node; nni_mtx s_mx; @@ -44,7 +51,8 @@ struct nni_socket { nni_duration s_rcvtimeo; // receive timeout nni_duration s_reconn; // reconnect time nni_duration s_reconnmax; // max reconnect time - size_t s_rcvmaxsz; // maximum receive size + size_t s_rcvmaxsz; // max receive size + nni_list s_options; // opts not handled by sock/proto nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes @@ -63,6 +71,13 @@ struct nni_socket { nni_notifyfd s_recv_fd; }; +static void +nni_free_opt(nni_sockopt *opt) +{ + nni_free(opt->data, opt->sz); + NNI_FREE_STRUCT(opt); +} + uint32_t nni_sock_id(nni_sock *s) { @@ -268,6 +283,8 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify) static void nni_sock_destroy(nni_sock *s) { + nni_sockopt *sopt; + if (s == NULL) { return; } @@ -285,6 +302,11 @@ nni_sock_destroy(nni_sock *s) s->s_sock_ops.sock_fini(s->s_data); } + while ((sopt = nni_list_first(&s->s_options)) != NULL) { + nni_list_remove(&s->s_options, sopt); + nni_free_opt(sopt); + } + nni_ev_fini(&s->s_send_ev); nni_ev_fini(&s->s_recv_ev); nni_msgq_fini(s->s_urq); @@ -305,8 +327,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) return (NNG_ENOMEM); } s->s_linger = 0; - s->s_sndtimeo = -1; - s->s_rcvtimeo = -1; + s->s_sndtimeo = NNI_TIME_NEVER; + s->s_rcvtimeo = NNI_TIME_NEVER; s->s_closing = 0; s->s_reconn = NNI_SECOND; s->s_reconnmax = 0; @@ -328,6 +350,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL); NNI_LIST_NODE_INIT(&s->s_node); + NNI_LIST_INIT(&s->s_options, nni_sockopt, node); nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); @@ -338,10 +361,23 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || - ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) { + ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) || + ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger, + sizeof(nni_duration))) != 0) || + ((rv = nni_sock_setopt(s, NNG_OPT_SNDTIMEO, &s->s_sndtimeo, + sizeof(nni_duration))) != 0) || + ((rv = nni_sock_setopt(s, NNG_OPT_RCVTIMEO, &s->s_rcvtimeo, + sizeof(nni_duration))) != 0) || + ((rv = nni_sock_setopt(s, NNG_OPT_RECONN_TIME, &s->s_reconn, + sizeof(nni_duration))) != 0) || + ((rv = nni_sock_setopt(s, NNG_OPT_RECONN_MAXTIME, &s->s_reconnmax, + sizeof(nni_duration))) != 0) || + ((rv = nni_sock_setopt(s, NNG_OPT_RCVMAXSZ, &s->s_rcvmaxsz, + sizeof(size_t))) != 0)) { nni_sock_destroy(s); return (rv); } + *sp = s; return (rv); } @@ -383,7 +419,6 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) if (((rv = nni_init()) != 0) || ((rv = nni_sock_create(&s, proto)) != 0)) { - nni_sock_destroy(s); return (rv); } @@ -690,12 +725,6 @@ nni_sock_peer(nni_sock *sock) return (sock->s_peer_id.p_id); } -size_t -nni_sock_rcvmaxsz(nni_sock *sock) -{ - return (sock->s_rcvmaxsz); -} - void nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) { @@ -707,15 +736,25 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax) } int -nni_sock_ep_add(nni_sock *sock, nni_ep *ep) +nni_sock_ep_add(nni_sock *s, nni_ep *ep) { - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); + nni_sockopt *sopt; + + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } - nni_list_append(&sock->s_eps, ep); - nni_mtx_unlock(&sock->s_mx); + NNI_LIST_FOREACH (&s->s_options, sopt) { + int rv; + rv = nni_ep_setopt(ep, sopt->opt, sopt->data, sopt->sz, 0); + if ((rv != 0) && (rv != NNG_ENOTSUP)) { + nni_mtx_unlock(&s->s_mx); + return (rv); + } + } + nni_list_append(&s->s_eps, ep); + nni_mtx_unlock(&s->s_mx); return (0); } @@ -745,108 +784,222 @@ nni_sock_senderr(nni_sock *sock, int err) } int -nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) +nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size) { - int rv = NNG_ENOTSUP; + int rv = NNG_ENOTSUP; + nni_ep * ep; + int commits = 0; + nni_sockopt *optv; + nni_sockopt *oldv = NULL; - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } - if (sock->s_sock_ops.sock_setopt != NULL) { - rv = - sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size); + if (s->s_sock_ops.sock_setopt != NULL) { + rv = s->s_sock_ops.sock_setopt(s->s_data, opt, val, size); if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (rv); } } + + // Some options do not go down to transports. Handle them + // directly. switch (opt) { - case NNG_OPT_LINGER: - rv = nni_setopt_usec(&sock->s_linger, val, size); - break; - case NNG_OPT_SNDTIMEO: - rv = nni_setopt_usec(&sock->s_sndtimeo, val, size); - break; - case NNG_OPT_RCVTIMEO: - rv = nni_setopt_usec(&sock->s_rcvtimeo, val, size); - break; case NNG_OPT_RECONN_TIME: - rv = nni_setopt_usec(&sock->s_reconn, val, size); + rv = nni_setopt_usec(&s->s_reconn, val, size); break; case NNG_OPT_RECONN_MAXTIME: - rv = nni_setopt_usec(&sock->s_reconnmax, val, size); + rv = nni_setopt_usec(&s->s_reconnmax, val, size); break; case NNG_OPT_SNDBUF: - rv = nni_setopt_buf(sock->s_uwq, val, size); + rv = nni_setopt_buf(s->s_uwq, val, size); break; case NNG_OPT_RCVBUF: - rv = nni_setopt_buf(sock->s_urq, val, size); + rv = nni_setopt_buf(s->s_urq, val, size); break; - case NNG_OPT_RCVMAXSZ: - rv = nni_setopt_size( - &sock->s_rcvmaxsz, val, size, 0, NNI_MAXSZ); + case NNG_OPT_SNDFD: + case NNG_OPT_RCVFD: + case NNG_OPT_LOCALADDR: + case NNG_OPT_REMOTEADDR: + // these options can be read, but cannot be set + rv = NNG_EINVAL; break; } - nni_mtx_unlock(&sock->s_mx); + + nni_mtx_unlock(&s->s_mx); + + // If the option was already handled one way or the other, + if (rv != NNG_ENOTSUP) { + return (rv); + } + + // Validation of transport options. This is stateless, so + // transports should not fail to set an option later if they + // passed it here. + rv = nni_tran_chkopt(opt, val, size); + + // Also check a few generic things. We do this if no transport + // check was found, or even if a transport rejected one of the + // settings. + if ((rv == NNG_ENOTSUP) || (rv == 0)) { + switch (opt) { + case NNG_OPT_LINGER: + rv = nni_chkopt_usec(val, size); + break; + case NNG_OPT_SNDTIMEO: + rv = nni_chkopt_usec(val, size); + break; + case NNG_OPT_RCVTIMEO: + rv = nni_chkopt_usec(val, size); + break; + case NNG_OPT_RCVMAXSZ: + // just a sanity test on the size; it also ensures that + // a size can be set even with no transport configured. + rv = nni_chkopt_size(val, size, 0, NNI_MAXSZ); + break; + } + } + + if (rv != 0) { + return (rv); + } + + // Prepare a copy of the sockoption. + if ((optv = NNI_ALLOC_STRUCT(optv)) == NULL) { + return (NNG_ENOMEM); + } + if ((optv->data = nni_alloc(size)) == NULL) { + NNI_FREE_STRUCT(optv); + return (NNG_ENOMEM); + } + memcpy(optv->data, val, size); + optv->opt = opt; + optv->sz = size; + NNI_LIST_NODE_INIT(&optv->node); + + nni_mtx_lock(&s->s_mx); + NNI_LIST_FOREACH (&s->s_options, oldv) { + if (oldv->opt == opt) { + if ((oldv->sz != size) || + (memcmp(oldv->data, val, size) != 0)) { + break; + } + + // The values are the same. This is a no-op. + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (0); + } + } + + // Apply the options. Failure to set any option on any transport + // (other than ENOTSUP) stops the operation altogether. Its + // important that transport wide checks properly pre-validate. + NNI_LIST_FOREACH (&s->s_eps, ep) { + int x; + x = nni_ep_setopt(ep, opt, optv->data, size, 0); + if (x != NNG_ENOTSUP) { + if ((rv = x) != 0) { + nni_mtx_unlock(&s->s_mx); + nni_free_opt(optv); + return (rv); + } + } + } + + // For some options, which also have an impact on the socket + // behavior, we save a local value. Note that the transport + // will already have had a chance to veto this. + switch (opt) { + case NNG_OPT_LINGER: + rv = nni_setopt_usec(&s->s_linger, val, size); + break; + case NNG_OPT_SNDTIMEO: + rv = nni_setopt_usec(&s->s_sndtimeo, val, size); + break; + case NNG_OPT_RCVTIMEO: + rv = nni_setopt_usec(&s->s_rcvtimeo, val, size); + break; + } + + if (rv == 0) { + // Remove and toss the old value, we are using a new one. + if (oldv != NULL) { + nni_list_remove(&s->s_options, oldv); + nni_free_opt(oldv); + } + + // Insert our new value. This permits it to be compared + // against later, and for new endpoints to automatically + // receive these values, + nni_list_append(&s->s_options, optv); + } else { + nni_free_opt(optv); + } + + nni_mtx_unlock(&s->s_mx); return (rv); } int -nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) +nni_sock_getopt(nni_sock *s, int opt, void *val, size_t *szp) { - int rv = NNG_ENOTSUP; + int rv = NNG_ENOTSUP; + nni_sockopt *sopt; - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); + nni_mtx_lock(&s->s_mx); + if (s->s_closing) { + nni_mtx_unlock(&s->s_mx); return (NNG_ECLOSED); } - if (sock->s_sock_ops.sock_getopt != NULL) { - rv = sock->s_sock_ops.sock_getopt( - sock->s_data, opt, val, sizep); + if (s->s_sock_ops.sock_getopt != NULL) { + rv = s->s_sock_ops.sock_getopt(s->s_data, opt, val, szp); if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (rv); } } + // Options that are handled by socket core, and never + // passed down. switch (opt) { - case NNG_OPT_LINGER: - rv = nni_getopt_usec(&sock->s_linger, val, sizep); - break; - case NNG_OPT_SNDTIMEO: - rv = nni_getopt_usec(&sock->s_sndtimeo, val, sizep); - break; - case NNG_OPT_RCVTIMEO: - rv = nni_getopt_usec(&sock->s_rcvtimeo, val, sizep); - break; case NNG_OPT_RECONN_TIME: - rv = nni_getopt_usec(&sock->s_reconn, val, sizep); + rv = nni_getopt_usec(&s->s_reconn, val, szp); break; case NNG_OPT_RECONN_MAXTIME: - rv = nni_getopt_usec(&sock->s_reconnmax, val, sizep); + rv = nni_getopt_usec(&s->s_reconnmax, val, szp); break; case NNG_OPT_SNDBUF: - rv = nni_getopt_buf(sock->s_uwq, val, sizep); + rv = nni_getopt_buf(s->s_uwq, val, szp); break; case NNG_OPT_RCVBUF: - rv = nni_getopt_buf(sock->s_urq, val, sizep); - break; - case NNG_OPT_RCVMAXSZ: - rv = nni_getopt_size(&sock->s_rcvmaxsz, val, sizep); + rv = nni_getopt_buf(s->s_urq, val, szp); break; case NNG_OPT_SNDFD: - rv = nni_getopt_fd( - sock, &sock->s_send_fd, NNG_EV_CAN_SND, val, sizep); + rv = nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, val, szp); break; case NNG_OPT_RCVFD: - rv = nni_getopt_fd( - sock, &sock->s_recv_fd, NNG_EV_CAN_RCV, val, sizep); + rv = nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, val, szp); + break; + default: + NNI_LIST_FOREACH (&s->s_options, sopt) { + if (sopt->opt == opt) { + size_t sz = sopt->sz; + if (sopt->sz > *szp) { + sz = *szp; + } + *szp = sopt->sz; + memcpy(val, sopt->data, sz); + rv = 0; + break; + } + } break; } - nni_mtx_unlock(&sock->s_mx); + nni_mtx_unlock(&s->s_mx); return (rv); } |
