aboutsummaryrefslogtreecommitdiff
path: root/src/core/pipe.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-09-20 12:11:53 -0700
committerGarrett D'Amore <garrett@damore.org>2017-09-22 12:33:50 -0700
commite236dc8141f4d00dc926fbfba7739dabf96ebcdd (patch)
tree3c88190966eac4d888008d5076e7edd1817f64a2 /src/core/pipe.c
parentf04cfd27e2d67b0fc89b079410fc11b55b6d1979 (diff)
downloadnng-e236dc8141f4d00dc926fbfba7739dabf96ebcdd.tar.gz
nng-e236dc8141f4d00dc926fbfba7739dabf96ebcdd.tar.bz2
nng-e236dc8141f4d00dc926fbfba7739dabf96ebcdd.zip
More pipe option handling, pipe API support. Url option.
This fleshes most of the pipe API out, making it available to end user code. It also adds a URL option that is independent of the address options (which would be sockaddrs.) Also, we are now setting the pipe for req/rep. The other protocols need to have the same logic added to set the receive pipe on the message. (Pair is already done.)
Diffstat (limited to 'src/core/pipe.c')
-rw-r--r--src/core/pipe.c64
1 files changed, 53 insertions, 11 deletions
diff --git a/src/core/pipe.c b/src/core/pipe.c
index 0670ed01..edc8c15d 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -27,6 +27,7 @@ struct nni_pipe {
int p_reap;
int p_stop;
int p_refcnt;
+ const char * p_url;
nni_mtx p_mtx;
nni_cv p_cv;
nni_list_node p_reap_node;
@@ -34,6 +35,7 @@ struct nni_pipe {
};
static nni_idhash *nni_pipes;
+static nni_mtx nni_pipe_lk;
static nni_list nni_pipe_reap_list;
static nni_mtx nni_pipe_reap_lk;
@@ -49,6 +51,7 @@ nni_pipe_sys_init(void)
int rv;
NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node);
+ nni_mtx_init(&nni_pipe_lk);
nni_mtx_init(&nni_pipe_reap_lk);
nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk);
@@ -85,6 +88,7 @@ nni_pipe_sys_fini(void)
nni_thr_fini(&nni_pipe_reap_thr);
nni_cv_fini(&nni_pipe_reap_cv);
nni_mtx_fini(&nni_pipe_reap_lk);
+ nni_mtx_fini(&nni_pipe_lk);
if (nni_pipes != NULL) {
nni_idhash_fini(nni_pipes);
nni_pipes = NULL;
@@ -103,11 +107,14 @@ nni_pipe_destroy(nni_pipe *p)
// Make sure any unlocked holders are done with this.
// This happens during initialization for example.
- nni_mtx_lock(&p->p_mtx);
+ nni_mtx_lock(&nni_pipe_lk);
+ if (p->p_id != 0) {
+ nni_idhash_remove(nni_pipes, p->p_id);
+ }
while (p->p_refcnt != 0) {
nni_cv_wait(&p->p_cv);
}
- nni_mtx_unlock(&p->p_mtx);
+ nni_mtx_unlock(&nni_pipe_lk);
// We have exclusive access at this point, so we can check if
// we are still on any lists.
@@ -124,13 +131,35 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.p_fini(p->p_tran_data);
}
- if (p->p_id != 0) {
- nni_idhash_remove(nni_pipes, p->p_id);
- }
nni_mtx_fini(&p->p_mtx);
NNI_FREE_STRUCT(p);
}
+int
+nni_pipe_find(nni_pipe **pp, uint32_t id)
+{
+ int rv;
+ nni_pipe *p;
+ nni_mtx_lock(&nni_pipe_lk);
+ if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) {
+ p->p_refcnt++;
+ *pp = p;
+ }
+ nni_mtx_unlock(&nni_pipe_lk);
+ return (rv);
+}
+
+void
+nni_pipe_rele(nni_pipe *p)
+{
+ nni_mtx_lock(&nni_pipe_lk);
+ p->p_refcnt--;
+ if (p->p_refcnt == 0) {
+ nni_cv_wake(&p->p_cv);
+ }
+ nni_mtx_unlock(&nni_pipe_lk);
+}
+
// nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces.
uint32_t
nni_pipe_id(nni_pipe *p)
@@ -238,16 +267,21 @@ nni_pipe_create(nni_ep *ep, void *tdata)
p->p_proto_data = NULL;
p->p_ep = ep;
p->p_sock = sock;
+ p->p_url = nni_ep_url(ep);
NNI_LIST_NODE_INIT(&p->p_reap_node);
NNI_LIST_NODE_INIT(&p->p_sock_node);
NNI_LIST_NODE_INIT(&p->p_ep_node);
nni_mtx_init(&p->p_mtx);
- nni_cv_init(&p->p_cv, &p->p_mtx);
+ nni_cv_init(&p->p_cv, &nni_pipe_lk);
nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p);
- if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) ||
+ nni_mtx_lock(&nni_pipe_lk);
+ rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
+ nni_mtx_unlock(&nni_pipe_lk);
+
+ if ((rv != 0) ||
((rv = nni_ep_pipe_add(ep, p)) != 0) ||
((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
@@ -259,11 +293,19 @@ nni_pipe_create(nni_ep *ep, void *tdata)
int
nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
{
- /* This should only be called with the mutex held... */
- if (p->p_tran_ops.p_getopt == NULL) {
- return (NNG_ENOTSUP);
+ int rv = NNG_ENOTSUP;
+
+ if (opt == nng_optid_url) {
+ return (nni_getopt_str(p->p_url, val, szp));
}
- return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp));
+ if (p->p_tran_ops.p_getopt != NULL) {
+ rv = p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp);
+ }
+ if (rv == NNG_ENOTSUP) {
+ // Maybe its a generic socket option?
+ rv = nni_sock_getopt(p->p_sock, opt, val, szp);
+ }
+ return (rv);
}
void