diff options
| -rw-r--r-- | src/core/endpt.c | 3 | ||||
| -rw-r--r-- | src/core/socket.c | 43 | ||||
| -rw-r--r-- | src/core/socket.h | 9 |
3 files changed, 43 insertions, 12 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index bc947dfd..fd328b0f 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -139,12 +139,11 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) nni_pipe_destroy(pipe); return (rv); } + pipe->p_ep = ep; *pp = pipe; return (0); } #if 0 -int nni_endpt_dial(nni_endpt *, nni_pipe **); -int nni_endpt_listen(nni_endpt *); int nni_endpt_accept(nni_endpt *, nni_pipe **); #endif diff --git a/src/core/socket.c b/src/core/socket.c index 70029278..fb56a0bf 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -55,6 +55,7 @@ nni_socket_create(nni_socket **sockp, uint16_t proto) NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_sock_node); NNI_LIST_INIT(&sock->s_eps, nni_endpt, ep_sock_node); + NNI_LIST_INIT(&sock->s_reap_eps, nni_endpt, ep_sock_node); if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { nni_cond_fini(&sock->s_cv); @@ -315,8 +316,8 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe) } -void -nni_sock_dialer(void *arg) +static void +nni_socket_dialer(void *arg) { nni_endpt *ep = arg; nni_socket *sock = ep->ep_sock; @@ -327,7 +328,7 @@ nni_sock_dialer(void *arg) nni_mutex_enter(&ep->ep_mx); while ((!ep->ep_close) && (nni_list_first(&ep->ep_pipes) != NULL)) { - nni_cond_wait(&ep->ep_cv); + nni_cond_wait(&ep->ep_cv); } if (ep->ep_close) { nni_mutex_exit(&ep->ep_mx); @@ -337,19 +338,49 @@ nni_sock_dialer(void *arg) pipe = NULL; - if (((rv = nni_endpt_dial(ep, pipe)) != 0) || + if (((rv = nni_endpt_dial(ep, &pipe)) != 0) || ((rv = nni_socket_add_pipe(sock, pipe)) != 0)) { if (rv == NNG_ECLOSED) { - return; + break; } if (pipe != NULL) { nni_pipe_destroy(pipe); } + // XXX: Publish connection error event... // XXX: Inject a wait for reconnect... continue; } + } +} + + +int +nni_socket_dial(nni_socket *sock, nni_endpt *ep) +{ + int rv = 0; + nni_mutex_enter(&sock->s_mx); + nni_mutex_enter(&ep->ep_mx); + if ((ep->ep_dialer != NULL) || (ep->ep_listener != NULL)) { + rv = NNG_EBUSY; + goto out; + } + if (ep->ep_sock != sock) { // Should never happen + rv = NNG_EINVAL; + goto out; + } + if (sock->s_closing) { + rv = NNG_ECLOSED; + goto out; } + if (nni_thread_create(&ep->ep_dialer, nni_socket_dialer, ep) != 0) { + rv = NNG_ENOMEM; + goto out; + } + nni_list_append(&sock->s_eps, ep); +out: + nni_mutex_exit(&ep->ep_mx); + nni_mutex_exit(&sock->s_mx); - // XXX: move the endpoint to the sockets reap list + return (rv); } diff --git a/src/core/socket.h b/src/core/socket.h index 937f221a..5b321ae7 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -26,15 +26,16 @@ struct nng_socket { // XXX: options - nni_list_t s_eps; - nni_list_t s_pipes; + nni_list_t s_eps; // active endpoints + nni_list_t s_pipes; // pipes for this socket + nni_list_t s_reap_eps; // endpoint deathrow int s_closing; // Socket is closing int s_besteffort; // Best effort mode delivery int s_senderr; // Protocol state machine use - int s_recverr; // Protocol state machine use + int s_recverr; // Protocol state machine use - uint32_t s_nextid; // Next Pipe ID. + uint32_t s_nextid; // Next Pipe ID. }; extern int nni_socket_create(nni_socket **, uint16_t); |
