aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c29
-rw-r--r--src/core/endpt.h41
-rw-r--r--src/core/options.c14
-rw-r--r--src/core/options.h3
-rw-r--r--src/core/pipe.c64
-rw-r--r--src/core/pipe.h9
6 files changed, 118 insertions, 42 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index 9122bb58..a99041ab 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -21,7 +21,7 @@ struct nni_ep {
uint64_t ep_id; // endpoint id
nni_list_node ep_node; // per socket list
nni_sock * ep_sock;
- char ep_addr[NNG_MAXADDRLEN];
+ char ep_url[NNG_MAXADDRLEN];
int ep_mode;
int ep_started;
int ep_closed; // full shutdown
@@ -82,6 +82,12 @@ nni_ep_id(nni_ep *ep)
return ((uint32_t) ep->ep_id);
}
+const char *
+nni_ep_url(nni_ep *ep)
+{
+ return (ep->ep_url);
+}
+
static void
nni_ep_destroy(nni_ep *ep)
{
@@ -117,16 +123,16 @@ nni_ep_destroy(nni_ep *ep)
}
static int
-nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
+nni_ep_create(nni_ep **epp, nni_sock *s, const char *url, int mode)
{
nni_tran *tran;
nni_ep * ep;
int rv;
- if ((tran = nni_tran_find(addr)) == NULL) {
+ if ((tran = nni_tran_find(url)) == NULL) {
return (NNG_ENOTSUP);
}
- if (strlen(addr) >= NNG_MAXADDRLEN) {
+ if (strlen(url) >= NNG_MAXADDRLEN) {
return (NNG_EINVAL);
}
@@ -146,7 +152,7 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
// dereference on hot paths.
ep->ep_ops = *tran->tran_ep;
- (void) nni_strlcpy(ep->ep_addr, addr, sizeof(ep->ep_addr));
+ (void) nni_strlcpy(ep->ep_url, url, sizeof(ep->ep_url));
NNI_LIST_NODE_INIT(&ep->ep_node);
@@ -159,7 +165,7 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) ||
((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) ||
((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) ||
- ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) ||
+ ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) ||
((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) ||
((rv = nni_sock_ep_add(s, ep)) != 0)) {
nni_ep_destroy(ep);
@@ -171,15 +177,15 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode)
}
int
-nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *addr)
+nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *url)
{
- return (nni_ep_create(epp, s, addr, NNI_EP_MODE_DIAL));
+ return (nni_ep_create(epp, s, url, NNI_EP_MODE_DIAL));
}
int
-nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *addr)
+nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *url)
{
- return (nni_ep_create(epp, s, addr, NNI_EP_MODE_LISTEN));
+ return (nni_ep_create(epp, s, url, NNI_EP_MODE_LISTEN));
}
int
@@ -603,6 +609,9 @@ nni_ep_getopt(nni_ep *ep, int opt, void *valp, size_t *szp)
{
int rv;
+ if (opt == nng_optid_url) {
+ return (nni_getopt_str(ep->ep_url, valp, szp));
+ }
if (ep->ep_ops.ep_getopt == NULL) {
return (NNG_ENOTSUP);
}
diff --git a/src/core/endpt.h b/src/core/endpt.h
index de058d4b..161c030f 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -11,26 +11,27 @@
#ifndef CORE_ENDPT_H
#define CORE_ENDPT_H
-extern int nni_ep_sys_init(void);
-extern void nni_ep_sys_fini(void);
-extern nni_tran *nni_ep_tran(nni_ep *);
-extern nni_sock *nni_ep_sock(nni_ep *);
-extern int nni_ep_find(nni_ep **, uint32_t);
-extern int nni_ep_hold(nni_ep *);
-extern void nni_ep_rele(nni_ep *);
-extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
-extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
-extern void nni_ep_stop(nni_ep *);
-extern int nni_ep_shutdown(nni_ep *);
-extern void nni_ep_close(nni_ep *);
-extern int nni_ep_dial(nni_ep *, int);
-extern int nni_ep_listen(nni_ep *, int);
-extern void nni_ep_list_init(nni_list *);
-extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int);
-extern int nni_ep_getopt(nni_ep *, int, void *, size_t *);
-extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
-extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
+extern int nni_ep_sys_init(void);
+extern void nni_ep_sys_fini(void);
+extern nni_tran * nni_ep_tran(nni_ep *);
+extern nni_sock * nni_ep_sock(nni_ep *);
+extern int nni_ep_find(nni_ep **, uint32_t);
+extern int nni_ep_hold(nni_ep *);
+extern void nni_ep_rele(nni_ep *);
+extern uint32_t nni_ep_id(nni_ep *);
+extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
+extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
+extern void nni_ep_stop(nni_ep *);
+extern int nni_ep_shutdown(nni_ep *);
+extern void nni_ep_close(nni_ep *);
+extern int nni_ep_dial(nni_ep *, int);
+extern int nni_ep_listen(nni_ep *, int);
+extern void nni_ep_list_init(nni_list *);
+extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int);
+extern int nni_ep_getopt(nni_ep *, int, void *, size_t *);
+extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
+extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
+extern const char *nni_ep_url(nni_ep *);
// Endpoint modes. Currently used by transports. Remove this when we make
// transport dialers and listeners explicit.
diff --git a/src/core/options.c b/src/core/options.c
index 76025710..b7934a06 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -136,6 +136,19 @@ nni_getopt_usec(nni_duration u, void *val, size_t *sizep)
}
int
+nni_getopt_sockaddr(const nng_sockaddr *sa, void *val, size_t *sizep)
+{
+ size_t sz = sizeof(*sa);
+
+ if (sz > *sizep) {
+ sz = *sizep;
+ }
+ *sizep = sizeof(*sa);
+ memcpy(val, sa, sz);
+ return (0);
+}
+
+int
nni_getopt_int(int i, void *val, size_t *sizep)
{
size_t sz = sizeof(i);
@@ -403,6 +416,7 @@ nni_option_sys_init(void)
((rv = OPT_REGISTER(remaddr)) != 0) ||
((rv = OPT_REGISTER(recvfd)) != 0) ||
((rv = OPT_REGISTER(sendfd)) != 0) ||
+ ((rv = OPT_REGISTER(url)) != 0) ||
((rv = OPT_REGISTER(req_resendtime)) != 0) ||
((rv = OPT_REGISTER(sub_subscribe)) != 0) ||
((rv = OPT_REGISTER(sub_unsubscribe)) != 0) ||
diff --git a/src/core/options.h b/src/core/options.h
index ed091703..64036db1 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -44,6 +44,9 @@ extern int nni_getopt_u64(uint64_t, void *, size_t *);
// nni_getopt_str gets a C style string.
extern int nni_getopt_str(const char *, void *, size_t *);
+// nni_getopt_sockaddr gets an nng_sockaddr.
+extern int nni_getopt_sockaddr(const nng_sockaddr *, void *, size_t *);
+
// nni_setopt_size sets a size_t option.
extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t);
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 0670ed01..edc8c15d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -27,6 +27,7 @@ struct nni_pipe {
int p_reap;
int p_stop;
int p_refcnt;
+ const char * p_url;
nni_mtx p_mtx;
nni_cv p_cv;
nni_list_node p_reap_node;
@@ -34,6 +35,7 @@ struct nni_pipe {
};
static nni_idhash *nni_pipes;
+static nni_mtx nni_pipe_lk;
static nni_list nni_pipe_reap_list;
static nni_mtx nni_pipe_reap_lk;
@@ -49,6 +51,7 @@ nni_pipe_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
+ nni_mtx_init(&nni_pipe_lk);
nni_mtx_init(&nni_pipe_reap_lk);
nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);
@@ -85,6 +88,7 @@ nni_pipe_sys_fini(void)
nni_thr_fini(&nni_pipe_reap_thr);
nni_cv_fini(&nni_pipe_reap_cv);
nni_mtx_fini(&nni_pipe_reap_lk);
+ nni_mtx_fini(&nni_pipe_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
nni_pipes = NULL;
@@ -103,11 +107,14 @@ nni_pipe_destroy(nni_pipe *p)
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
- nni_mtx_lock(&p->p_mtx);
+ nni_mtx_lock(&nni_pipe_lk);
+ if (p->p_id != 0) {
+ nni_idhash_remove(nni_pipes, p->p_id);
+ }
while (p->p_refcnt != 0) {
nni_cv_wait(&p->p_cv);
}
- nni_mtx_unlock(&p->p_mtx);
+ nni_mtx_unlock(&nni_pipe_lk);
// We have exclusive access at this point, so we can check if
// we are still on any lists.
@@ -124,13 +131,35 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
- if (p->p_id != 0) {
- nni_idhash_remove(nni_pipes, p->p_id);
- }
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
}
+int
+nni_pipe_find(nni_pipe **pp, uint32_t id)
+{
+ int rv;
+ nni_pipe *p;
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) {
+ p->p_refcnt++;
+ *pp = p;
+ }
+ nni_mtx_unlock(&nni_pipe_lk);
+ return (rv);
+}
+
+void
+nni_pipe_rele(nni_pipe *p)
+{
+ nni_mtx_lock(&nni_pipe_lk);
+ p->p_refcnt--;
+ if (p->p_refcnt == 0) {
+ nni_cv_wake(&p->p_cv);
+ }
+ nni_mtx_unlock(&nni_pipe_lk);
+}
+
// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
uint32_t
nni_pipe_id(nni_pipe *p)
@@ -238,16 +267,21 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
+ p->p_url = nni_ep_url(ep);
NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
nni_mtx_init(&p->p_mtx);
- nni_cv_init(&p->p_cv, &p->p_mtx);
+ nni_cv_init(&p->p_cv, &nni_pipe_lk);
nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p);
- if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
+ nni_mtx_lock(&nni_pipe_lk);
+ rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
+ nni_mtx_unlock(&nni_pipe_lk);
+
+ if ((rv != 0) ||
((rv = nni_ep_pipe_add(ep, p)) != 0) ||
((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
@@ -259,11 +293,19 @@ nni_pipe_create(nni_ep *ep, void *tdata)
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
- /* This should only be called with the mutex held... */
- if (p->p_tran_ops.p_getopt == NULL) {
- return (NNG_ENOTSUP);
+ int rv = NNG_ENOTSUP;
+
+ if (opt == nng_optid_url) {
+ return (nni_getopt_str(p->p_url, val, szp));
}
- return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp));
+ if (p->p_tran_ops.p_getopt != NULL) {
+ rv = p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp);
+ }
+ if (rv == NNG_ENOTSUP) {
+ // Maybe its a generic socket option?
+ rv = nni_sock_getopt(p->p_sock, opt, val, szp);
+ }
+ return (rv);
}
void
diff --git a/src/core/pipe.h b/src/core/pipe.h
index 9436d650..bb55a8cd 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -57,7 +57,7 @@ extern void nni_pipe_start(nni_pipe *);
extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
-extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
+extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
@@ -72,4 +72,11 @@ extern void nni_pipe_sock_list_init(nni_list *);
// a per-endpoint list.
extern void nni_pipe_ep_list_init(nni_list *);
+// nni_pipe_find finds a pipe given its ID. It places a hold on the
+// pipe, which must be released by the caller when it is done.
+extern int nni_pipe_find(nni_pipe **, uint32_t);
+
+// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find.
+extern void nni_pipe_rele(nni_pipe *);
+
#endif // CORE_PIPE_H