aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c139
1 files changed, 105 insertions, 34 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 4cf624e2..894e4fee 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -82,9 +82,10 @@ struct nni_socket {
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
- nni_list s_eps; // active endpoints
- nni_list s_pipes; // active pipes
- nni_list s_ctxs; // active contexts (protected by global sock_lk)
+ nni_list s_listeners; // active listeners
+ nni_list s_dialers; // active dialers
+ nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global sock_lk)
bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
@@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
nni_pipe_sock_list_init(&s->s_pipes);
- nni_ep_list_init(&s->s_eps);
+ nni_listener_list_init(&s->s_listeners);
+ nni_dialer_list_init(&s->s_dialers);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
int
nni_sock_shutdown(nni_sock *sock)
{
- nni_pipe *pipe;
- nni_ep * ep;
- nni_ep * nep;
- nni_ctx * ctx;
- nni_ctx * nctx;
+ nni_pipe * pipe;
+ nni_dialer * d;
+ nni_dialer * nd;
+ nni_listener *l;
+ nni_listener *nl;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_shutdown(ep);
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
+ nni_listener_shutdown(l);
}
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
+ nni_dialer_shutdown(d);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// We now mark any owned contexts as closing.
@@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Go through the endpoint list, attempting to close them.
+ // Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nep = nni_list_first(&sock->s_eps);
- while ((ep = nep) != NULL) {
- nep = nni_list_next(&sock->s_eps, nep);
+ nl = nni_list_first(&sock->s_listeners);
+ while ((l = nl) != NULL) {
+ nl = nni_list_next(&sock->s_listeners, nl);
+
+ if (nni_listener_hold(l) == 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_listener_close(l);
+ nni_mtx_lock(&sock->s_mx);
+ }
+ }
+ nd = nni_list_first(&sock->s_dialers);
+ while ((d = nd) != NULL) {
+ nd = nni_list_next(&sock->s_dialers, nd);
- if (nni_ep_hold(ep) == 0) {
+ if (nni_dialer_hold(d) == 0) {
nni_mtx_unlock(&sock->s_mx);
- nni_ep_close(ep);
+ nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
}
@@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock)
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
- (!nni_list_empty(&sock->s_eps))) {
+ (!nni_list_empty(&sock->s_listeners)) ||
+ (!nni_list_empty(&sock->s_dialers))) {
nni_cv_wait(&sock->s_cv);
}
@@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s)
// Wait for pipes, eps, and contexts to finish closing.
nni_mtx_lock(&s->s_mx);
- while (
- (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) {
+ while ((!nni_list_empty(&s->s_pipes)) ||
+ (!nni_list_empty(&s->s_dialers)) ||
+ (!nni_list_empty(&s->s_listeners))) {
nni_cv_wait(&s->s_cv);
}
nni_mtx_unlock(&s->s_mx);
@@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_ep_add(nni_sock *s, nni_ep *ep)
+nni_sock_add_listener(nni_sock *s, nni_listener *l)
+{
+ nni_sockopt *sopt;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ int rv;
+ rv = nni_listener_setopt(
+ l, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+ }
+ }
+
+ nni_list_append(&s->s_listeners, l);
+ nni_mtx_unlock(&s->s_mx);
+ return (0);
+}
+
+int
+nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
{
nni_sockopt *sopt;
@@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep)
NNI_LIST_FOREACH (&s->s_options, sopt) {
int rv;
- rv = nni_ep_setopt(
- ep, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ rv = nni_dialer_setopt(
+ d, sopt->name, sopt->data, sopt->sz, sopt->typ);
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
- nni_list_append(&s->s_eps, ep);
+ nni_list_append(&s->s_dialers, d);
nni_mtx_unlock(&s->s_mx);
return (0);
}
void
-nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
+nni_sock_remove_listener(nni_sock *s, nni_listener *l)
{
- nni_mtx_lock(&sock->s_mx);
- if (nni_list_active(&sock->s_eps, ep)) {
- nni_list_remove(&sock->s_eps, ep);
- if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
- nni_cv_wake(&sock->s_cv);
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_listeners, l)) {
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
}
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
+{
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_dialers, d)) {
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+ }
+ nni_mtx_unlock(&s->s_mx);
}
int
@@ -948,7 +1007,8 @@ nni_sock_setopt(
nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t)
{
int rv = NNG_ENOTSUP;
- nni_ep * ep;
+ nni_dialer * d;
+ nni_listener * l;
nni_sockopt * optv;
nni_sockopt * oldv = NULL;
const sock_option * sso;
@@ -1042,9 +1102,20 @@ nni_sock_setopt(
// transport (other than ENOTSUP) stops the operation
// altogether. Its important that transport wide checks
// properly pre-validate.
- NNI_LIST_FOREACH (&s->s_eps, ep) {
+ NNI_LIST_FOREACH (&s->s_listeners, l) {
+ int x;
+ x = nni_listener_setopt(l, optv->name, optv->data, sz, t);
+ if (x != NNG_ENOTSUP) {
+ if ((rv = x) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_free_opt(optv);
+ return (rv);
+ }
+ }
+ }
+ NNI_LIST_FOREACH (&s->s_dialers, d) {
int x;
- x = nni_ep_setopt(ep, optv->name, optv->data, sz, t);
+ x = nni_dialer_setopt(d, optv->name, optv->data, sz, t);
if (x != NNG_ENOTSUP) {
if ((rv = x) != 0) {
nni_mtx_unlock(&s->s_mx);