aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c299
1 files changed, 226 insertions, 73 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 9aa89a2d..79c1602b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -18,6 +18,13 @@ static nni_list nni_sock_list;
static nni_idhash *nni_sock_hash;
static nni_mtx nni_sock_lk;
+typedef struct nni_sockopt {
+ nni_list_node node;
+ int opt;
+ size_t sz;
+ void * data;
+} nni_sockopt;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -44,7 +51,8 @@ struct nni_socket {
nni_duration s_rcvtimeo; // receive timeout
nni_duration s_reconn; // reconnect time
nni_duration s_reconnmax; // max reconnect time
- size_t s_rcvmaxsz; // maximum receive size
+ size_t s_rcvmaxsz; // max receive size
+ nni_list s_options; // opts not handled by sock/proto
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
@@ -63,6 +71,13 @@ struct nni_socket {
nni_notifyfd s_recv_fd;
};
+static void
+nni_free_opt(nni_sockopt *opt)
+{
+ nni_free(opt->data, opt->sz);
+ NNI_FREE_STRUCT(opt);
+}
+
uint32_t
nni_sock_id(nni_sock *s)
{
@@ -268,6 +283,8 @@ nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
static void
nni_sock_destroy(nni_sock *s)
{
+ nni_sockopt *sopt;
+
if (s == NULL) {
return;
}
@@ -285,6 +302,11 @@ nni_sock_destroy(nni_sock *s)
s->s_sock_ops.sock_fini(s->s_data);
}
+ while ((sopt = nni_list_first(&s->s_options)) != NULL) {
+ nni_list_remove(&s->s_options, sopt);
+ nni_free_opt(sopt);
+ }
+
nni_ev_fini(&s->s_send_ev);
nni_ev_fini(&s->s_recv_ev);
nni_msgq_fini(s->s_urq);
@@ -305,8 +327,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
return (NNG_ENOMEM);
}
s->s_linger = 0;
- s->s_sndtimeo = -1;
- s->s_rcvtimeo = -1;
+ s->s_sndtimeo = NNI_TIME_NEVER;
+ s->s_rcvtimeo = NNI_TIME_NEVER;
s->s_closing = 0;
s->s_reconn = NNI_SECOND;
s->s_reconnmax = 0;
@@ -328,6 +350,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_ASSERT(s->s_pipe_ops.pipe_stop != NULL);
NNI_LIST_NODE_INIT(&s->s_node);
+ NNI_LIST_INIT(&s->s_options, nni_sockopt, node);
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
nni_mtx_init(&s->s_mx);
@@ -338,10 +361,23 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
- ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0)) {
+ ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) ||
+ ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger,
+ sizeof(nni_duration))) != 0) ||
+ ((rv = nni_sock_setopt(s, NNG_OPT_SNDTIMEO, &s->s_sndtimeo,
+ sizeof(nni_duration))) != 0) ||
+ ((rv = nni_sock_setopt(s, NNG_OPT_RCVTIMEO, &s->s_rcvtimeo,
+ sizeof(nni_duration))) != 0) ||
+ ((rv = nni_sock_setopt(s, NNG_OPT_RECONN_TIME, &s->s_reconn,
+ sizeof(nni_duration))) != 0) ||
+ ((rv = nni_sock_setopt(s, NNG_OPT_RECONN_MAXTIME, &s->s_reconnmax,
+ sizeof(nni_duration))) != 0) ||
+ ((rv = nni_sock_setopt(s, NNG_OPT_RCVMAXSZ, &s->s_rcvmaxsz,
+ sizeof(size_t))) != 0)) {
nni_sock_destroy(s);
return (rv);
}
+
*sp = s;
return (rv);
}
@@ -383,7 +419,6 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
if (((rv = nni_init()) != 0) ||
((rv = nni_sock_create(&s, proto)) != 0)) {
- nni_sock_destroy(s);
return (rv);
}
@@ -690,12 +725,6 @@ nni_sock_peer(nni_sock *sock)
return (sock->s_peer_id.p_id);
}
-size_t
-nni_sock_rcvmaxsz(nni_sock *sock)
-{
- return (sock->s_rcvmaxsz);
-}
-
void
nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
{
@@ -707,15 +736,25 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_ep_add(nni_sock *sock, nni_ep *ep)
+nni_sock_ep_add(nni_sock *s, nni_ep *ep)
{
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
+ nni_sockopt *sopt;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- nni_list_append(&sock->s_eps, ep);
- nni_mtx_unlock(&sock->s_mx);
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ int rv;
+ rv = nni_ep_setopt(ep, sopt->opt, sopt->data, sopt->sz, 0);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+ }
+ }
+ nni_list_append(&s->s_eps, ep);
+ nni_mtx_unlock(&s->s_mx);
return (0);
}
@@ -745,108 +784,222 @@ nni_sock_senderr(nni_sock *sock, int err)
}
int
-nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
+nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
{
- int rv = NNG_ENOTSUP;
+ int rv = NNG_ENOTSUP;
+ nni_ep * ep;
+ int commits = 0;
+ nni_sockopt *optv;
+ nni_sockopt *oldv = NULL;
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- if (sock->s_sock_ops.sock_setopt != NULL) {
- rv =
- sock->s_sock_ops.sock_setopt(sock->s_data, opt, val, size);
+ if (s->s_sock_ops.sock_setopt != NULL) {
+ rv = s->s_sock_ops.sock_setopt(s->s_data, opt, val, size);
if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
+
+ // Some options do not go down to transports. Handle them
+ // directly.
switch (opt) {
- case NNG_OPT_LINGER:
- rv = nni_setopt_usec(&sock->s_linger, val, size);
- break;
- case NNG_OPT_SNDTIMEO:
- rv = nni_setopt_usec(&sock->s_sndtimeo, val, size);
- break;
- case NNG_OPT_RCVTIMEO:
- rv = nni_setopt_usec(&sock->s_rcvtimeo, val, size);
- break;
case NNG_OPT_RECONN_TIME:
- rv = nni_setopt_usec(&sock->s_reconn, val, size);
+ rv = nni_setopt_usec(&s->s_reconn, val, size);
break;
case NNG_OPT_RECONN_MAXTIME:
- rv = nni_setopt_usec(&sock->s_reconnmax, val, size);
+ rv = nni_setopt_usec(&s->s_reconnmax, val, size);
break;
case NNG_OPT_SNDBUF:
- rv = nni_setopt_buf(sock->s_uwq, val, size);
+ rv = nni_setopt_buf(s->s_uwq, val, size);
break;
case NNG_OPT_RCVBUF:
- rv = nni_setopt_buf(sock->s_urq, val, size);
+ rv = nni_setopt_buf(s->s_urq, val, size);
break;
- case NNG_OPT_RCVMAXSZ:
- rv = nni_setopt_size(
- &sock->s_rcvmaxsz, val, size, 0, NNI_MAXSZ);
+ case NNG_OPT_SNDFD:
+ case NNG_OPT_RCVFD:
+ case NNG_OPT_LOCALADDR:
+ case NNG_OPT_REMOTEADDR:
+ // these options can be read, but cannot be set
+ rv = NNG_EINVAL;
break;
}
- nni_mtx_unlock(&sock->s_mx);
+
+ nni_mtx_unlock(&s->s_mx);
+
+ // If the option was already handled one way or the other,
+ if (rv != NNG_ENOTSUP) {
+ return (rv);
+ }
+
+ // Validation of transport options. This is stateless, so
+ // transports should not fail to set an option later if they
+ // passed it here.
+ rv = nni_tran_chkopt(opt, val, size);
+
+ // Also check a few generic things. We do this if no transport
+ // check was found, or even if a transport rejected one of the
+ // settings.
+ if ((rv == NNG_ENOTSUP) || (rv == 0)) {
+ switch (opt) {
+ case NNG_OPT_LINGER:
+ rv = nni_chkopt_usec(val, size);
+ break;
+ case NNG_OPT_SNDTIMEO:
+ rv = nni_chkopt_usec(val, size);
+ break;
+ case NNG_OPT_RCVTIMEO:
+ rv = nni_chkopt_usec(val, size);
+ break;
+ case NNG_OPT_RCVMAXSZ:
+ // just a sanity test on the size; it also ensures that
+ // a size can be set even with no transport configured.
+ rv = nni_chkopt_size(val, size, 0, NNI_MAXSZ);
+ break;
+ }
+ }
+
+ if (rv != 0) {
+ return (rv);
+ }
+
+ // Prepare a copy of the sockoption.
+ if ((optv = NNI_ALLOC_STRUCT(optv)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ if ((optv->data = nni_alloc(size)) == NULL) {
+ NNI_FREE_STRUCT(optv);
+ return (NNG_ENOMEM);
+ }
+ memcpy(optv->data, val, size);
+ optv->opt = opt;
+ optv->sz = size;
+ NNI_LIST_NODE_INIT(&optv->node);
+
+ nni_mtx_lock(&s->s_mx);
+ NNI_LIST_FOREACH (&s->s_options, oldv) {
+ if (oldv->opt == opt) {
+ if ((oldv->sz != size) ||
+ (memcmp(oldv->data, val, size) != 0)) {
+ break;
+ }
+
+ // The values are the same. This is a no-op.
+ nni_mtx_unlock(&s->s_mx);
+ nni_free_opt(optv);
+ return (0);
+ }
+ }
+
+ // Apply the options. Failure to set any option on any transport
+ // (other than ENOTSUP) stops the operation altogether. Its
+ // important that transport wide checks properly pre-validate.
+ NNI_LIST_FOREACH (&s->s_eps, ep) {
+ int x;
+ x = nni_ep_setopt(ep, opt, optv->data, size, 0);
+ if (x != NNG_ENOTSUP) {
+ if ((rv = x) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_free_opt(optv);
+ return (rv);
+ }
+ }
+ }
+
+ // For some options, which also have an impact on the socket
+ // behavior, we save a local value. Note that the transport
+ // will already have had a chance to veto this.
+ switch (opt) {
+ case NNG_OPT_LINGER:
+ rv = nni_setopt_usec(&s->s_linger, val, size);
+ break;
+ case NNG_OPT_SNDTIMEO:
+ rv = nni_setopt_usec(&s->s_sndtimeo, val, size);
+ break;
+ case NNG_OPT_RCVTIMEO:
+ rv = nni_setopt_usec(&s->s_rcvtimeo, val, size);
+ break;
+ }
+
+ if (rv == 0) {
+ // Remove and toss the old value, we are using a new one.
+ if (oldv != NULL) {
+ nni_list_remove(&s->s_options, oldv);
+ nni_free_opt(oldv);
+ }
+
+ // Insert our new value. This permits it to be compared
+ // against later, and for new endpoints to automatically
+ // receive these values,
+ nni_list_append(&s->s_options, optv);
+ } else {
+ nni_free_opt(optv);
+ }
+
+ nni_mtx_unlock(&s->s_mx);
return (rv);
}
int
-nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
+nni_sock_getopt(nni_sock *s, int opt, void *val, size_t *szp)
{
- int rv = NNG_ENOTSUP;
+ int rv = NNG_ENOTSUP;
+ nni_sockopt *sopt;
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- if (sock->s_sock_ops.sock_getopt != NULL) {
- rv = sock->s_sock_ops.sock_getopt(
- sock->s_data, opt, val, sizep);
+ if (s->s_sock_ops.sock_getopt != NULL) {
+ rv = s->s_sock_ops.sock_getopt(s->s_data, opt, val, szp);
if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
+ // Options that are handled by socket core, and never
+ // passed down.
switch (opt) {
- case NNG_OPT_LINGER:
- rv = nni_getopt_usec(&sock->s_linger, val, sizep);
- break;
- case NNG_OPT_SNDTIMEO:
- rv = nni_getopt_usec(&sock->s_sndtimeo, val, sizep);
- break;
- case NNG_OPT_RCVTIMEO:
- rv = nni_getopt_usec(&sock->s_rcvtimeo, val, sizep);
- break;
case NNG_OPT_RECONN_TIME:
- rv = nni_getopt_usec(&sock->s_reconn, val, sizep);
+ rv = nni_getopt_usec(&s->s_reconn, val, szp);
break;
case NNG_OPT_RECONN_MAXTIME:
- rv = nni_getopt_usec(&sock->s_reconnmax, val, sizep);
+ rv = nni_getopt_usec(&s->s_reconnmax, val, szp);
break;
case NNG_OPT_SNDBUF:
- rv = nni_getopt_buf(sock->s_uwq, val, sizep);
+ rv = nni_getopt_buf(s->s_uwq, val, szp);
break;
case NNG_OPT_RCVBUF:
- rv = nni_getopt_buf(sock->s_urq, val, sizep);
- break;
- case NNG_OPT_RCVMAXSZ:
- rv = nni_getopt_size(&sock->s_rcvmaxsz, val, sizep);
+ rv = nni_getopt_buf(s->s_urq, val, szp);
break;
case NNG_OPT_SNDFD:
- rv = nni_getopt_fd(
- sock, &sock->s_send_fd, NNG_EV_CAN_SND, val, sizep);
+ rv = nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, val, szp);
break;
case NNG_OPT_RCVFD:
- rv = nni_getopt_fd(
- sock, &sock->s_recv_fd, NNG_EV_CAN_RCV, val, sizep);
+ rv = nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, val, szp);
+ break;
+ default:
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ if (sopt->opt == opt) {
+ size_t sz = sopt->sz;
+ if (sopt->sz > *szp) {
+ sz = *szp;
+ }
+ *szp = sopt->sz;
+ memcpy(val, sopt->data, sz);
+ rv = 0;
+ break;
+ }
+ }
break;
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
return (rv);
}