aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-26 17:39:17 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-27 17:28:05 -0700
commit251553b13e6bc8019914b9edd1292f97e856dd43 (patch)
tree9193b8b4d4df86253f0a469cd96d8bb304a64c82 /src/core/socket.c
parent91f9061ad9289afffb0111c03a8390d0f82d7114 (diff)
downloadnng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.gz
nng-251553b13e6bc8019914b9edd1292f97e856dd43.tar.bz2
nng-251553b13e6bc8019914b9edd1292f97e856dd43.zip
fixes #522 Separate out the endpoint plumbing
This separates the plumbing for endpoints into distinct dialer and listeners. Some of the transports could benefit from further separation, but we've done some rather larger separation e.g. for the websocket transport. IPC would be a good one to update later, when we start looking at exposing a more natural underlying API.
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c139
1 files changed, 105 insertions, 34 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 4cf624e2..894e4fee 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -82,9 +82,10 @@ struct nni_socket {
nni_list s_options; // opts not handled by sock/proto
char s_name[64]; // socket name (legacy compat)
- nni_list s_eps; // active endpoints
- nni_list s_pipes; // active pipes
- nni_list s_ctxs; // active contexts (protected by global sock_lk)
+ nni_list s_listeners; // active listeners
+ nni_list s_dialers; // active dialers
+ nni_list s_pipes; // active pipes
+ nni_list s_ctxs; // active contexts (protected by global sock_lk)
bool s_closing; // Socket is closing
bool s_closed; // Socket closed, protected by global lock
@@ -558,7 +559,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node);
nni_pipe_sock_list_init(&s->s_pipes);
- nni_ep_list_init(&s->s_eps);
+ nni_listener_list_init(&s->s_listeners);
+ nni_dialer_list_init(&s->s_dialers);
nni_mtx_init(&s->s_mx);
nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
@@ -672,11 +674,13 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
int
nni_sock_shutdown(nni_sock *sock)
{
- nni_pipe *pipe;
- nni_ep * ep;
- nni_ep * nep;
- nni_ctx * ctx;
- nni_ctx * nctx;
+ nni_pipe * pipe;
+ nni_dialer * d;
+ nni_dialer * nd;
+ nni_listener *l;
+ nni_listener *nl;
+ nni_ctx * ctx;
+ nni_ctx * nctx;
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -688,9 +692,13 @@ nni_sock_shutdown(nni_sock *sock)
// Close the EPs. This prevents new connections from forming
// but but allows existing ones to drain.
- NNI_LIST_FOREACH (&sock->s_eps, ep) {
- nni_ep_shutdown(ep);
+ NNI_LIST_FOREACH (&sock->s_listeners, l) {
+ nni_listener_shutdown(l);
}
+ NNI_LIST_FOREACH (&sock->s_dialers, d) {
+ nni_dialer_shutdown(d);
+ }
+
nni_mtx_unlock(&sock->s_mx);
// We now mark any owned contexts as closing.
@@ -734,16 +742,26 @@ nni_sock_shutdown(nni_sock *sock)
nni_msgq_close(sock->s_urq);
nni_msgq_close(sock->s_uwq);
- // Go through the endpoint list, attempting to close them.
+ // Go through the dialers and listeners, attempting to close them.
// We might already have a close in progress, in which case
// we skip past it; it will be removed from another thread.
- nep = nni_list_first(&sock->s_eps);
- while ((ep = nep) != NULL) {
- nep = nni_list_next(&sock->s_eps, nep);
+ nl = nni_list_first(&sock->s_listeners);
+ while ((l = nl) != NULL) {
+ nl = nni_list_next(&sock->s_listeners, nl);
+
+ if (nni_listener_hold(l) == 0) {
+ nni_mtx_unlock(&sock->s_mx);
+ nni_listener_close(l);
+ nni_mtx_lock(&sock->s_mx);
+ }
+ }
+ nd = nni_list_first(&sock->s_dialers);
+ while ((d = nd) != NULL) {
+ nd = nni_list_next(&sock->s_dialers, nd);
- if (nni_ep_hold(ep) == 0) {
+ if (nni_dialer_hold(d) == 0) {
nni_mtx_unlock(&sock->s_mx);
- nni_ep_close(ep);
+ nni_dialer_close(d);
nni_mtx_lock(&sock->s_mx);
}
}
@@ -756,7 +774,8 @@ nni_sock_shutdown(nni_sock *sock)
// We have to wait for *both* endpoints and pipes to be
// removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
- (!nni_list_empty(&sock->s_eps))) {
+ (!nni_list_empty(&sock->s_listeners)) ||
+ (!nni_list_empty(&sock->s_dialers))) {
nni_cv_wait(&sock->s_cv);
}
@@ -810,8 +829,9 @@ nni_sock_close(nni_sock *s)
// Wait for pipes, eps, and contexts to finish closing.
nni_mtx_lock(&s->s_mx);
- while (
- (!nni_list_empty(&s->s_pipes)) || (!nni_list_empty(&s->s_eps))) {
+ while ((!nni_list_empty(&s->s_pipes)) ||
+ (!nni_list_empty(&s->s_dialers)) ||
+ (!nni_list_empty(&s->s_listeners))) {
nni_cv_wait(&s->s_cv);
}
nni_mtx_unlock(&s->s_mx);
@@ -905,7 +925,33 @@ nni_sock_reconntimes(nni_sock *sock, nni_duration *rcur, nni_duration *rmax)
}
int
-nni_sock_ep_add(nni_sock *s, nni_ep *ep)
+nni_sock_add_listener(nni_sock *s, nni_listener *l)
+{
+ nni_sockopt *sopt;
+
+ nni_mtx_lock(&s->s_mx);
+ if (s->s_closing) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_ECLOSED);
+ }
+
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ int rv;
+ rv = nni_listener_setopt(
+ l, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ if ((rv != 0) && (rv != NNG_ENOTSUP)) {
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+ }
+ }
+
+ nni_list_append(&s->s_listeners, l);
+ nni_mtx_unlock(&s->s_mx);
+ return (0);
+}
+
+int
+nni_sock_add_dialer(nni_sock *s, nni_dialer *d)
{
nni_sockopt *sopt;
@@ -917,30 +963,43 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep)
NNI_LIST_FOREACH (&s->s_options, sopt) {
int rv;
- rv = nni_ep_setopt(
- ep, sopt->name, sopt->data, sopt->sz, sopt->typ);
+ rv = nni_dialer_setopt(
+ d, sopt->name, sopt->data, sopt->sz, sopt->typ);
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
- nni_list_append(&s->s_eps, ep);
+ nni_list_append(&s->s_dialers, d);
nni_mtx_unlock(&s->s_mx);
return (0);
}
void
-nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
+nni_sock_remove_listener(nni_sock *s, nni_listener *l)
{
- nni_mtx_lock(&sock->s_mx);
- if (nni_list_active(&sock->s_eps, ep)) {
- nni_list_remove(&sock->s_eps, ep);
- if ((sock->s_closing) && (nni_list_empty(&sock->s_eps))) {
- nni_cv_wake(&sock->s_cv);
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_listeners, l)) {
+ nni_list_remove(&s->s_listeners, l);
+ if ((s->s_closing) && (nni_list_empty(&s->s_listeners))) {
+ nni_cv_wake(&s->s_cv);
}
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
+}
+
+void
+nni_sock_remove_dialer(nni_sock *s, nni_dialer *d)
+{
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_dialers, d)) {
+ nni_list_remove(&s->s_dialers, d);
+ if ((s->s_closing) && (nni_list_empty(&s->s_dialers))) {
+ nni_cv_wake(&s->s_cv);
+ }
+ }
+ nni_mtx_unlock(&s->s_mx);
}
int
@@ -948,7 +1007,8 @@ nni_sock_setopt(
nni_sock *s, const char *name, const void *v, size_t sz, nni_opt_type t)
{
int rv = NNG_ENOTSUP;
- nni_ep * ep;
+ nni_dialer * d;
+ nni_listener * l;
nni_sockopt * optv;
nni_sockopt * oldv = NULL;
const sock_option * sso;
@@ -1042,9 +1102,20 @@ nni_sock_setopt(
// transport (other than ENOTSUP) stops the operation
// altogether. Its important that transport wide checks
// properly pre-validate.
- NNI_LIST_FOREACH (&s->s_eps, ep) {
+ NNI_LIST_FOREACH (&s->s_listeners, l) {
+ int x;
+ x = nni_listener_setopt(l, optv->name, optv->data, sz, t);
+ if (x != NNG_ENOTSUP) {
+ if ((rv = x) != 0) {
+ nni_mtx_unlock(&s->s_mx);
+ nni_free_opt(optv);
+ return (rv);
+ }
+ }
+ }
+ NNI_LIST_FOREACH (&s->s_dialers, d) {
int x;
- x = nni_ep_setopt(ep, optv->name, optv->data, sz, t);
+ x = nni_dialer_setopt(d, optv->name, optv->data, sz, t);
if (x != NNG_ENOTSUP) {
if ((rv = x) != 0) {
nni_mtx_unlock(&s->s_mx);