diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-09-20 12:11:53 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-09-22 12:33:50 -0700 |
| commit | e236dc8141f4d00dc926fbfba7739dabf96ebcdd (patch) | |
| tree | 3c88190966eac4d888008d5076e7edd1817f64a2 /src | |
| parent | f04cfd27e2d67b0fc89b079410fc11b55b6d1979 (diff) | |
| download | nng-e236dc8141f4d00dc926fbfba7739dabf96ebcdd.tar.gz nng-e236dc8141f4d00dc926fbfba7739dabf96ebcdd.tar.bz2 nng-e236dc8141f4d00dc926fbfba7739dabf96ebcdd.zip | |
More pipe option handling, pipe API support. Url option.
This fleshes most of the pipe API out, making it available to end user
code. It also adds a URL option that is independent of the address
options (which would be sockaddrs.)
Also, we are now setting the pipe for req/rep. The other protocols need
to have the same logic added to set the receive pipe on the message. (Pair
is already done.)
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/endpt.c | 29 | ||||
| -rw-r--r-- | src/core/endpt.h | 41 | ||||
| -rw-r--r-- | src/core/options.c | 14 | ||||
| -rw-r--r-- | src/core/options.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 64 | ||||
| -rw-r--r-- | src/core/pipe.h | 9 | ||||
| -rw-r--r-- | src/nng.c | 30 | ||||
| -rw-r--r-- | src/nng.h | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 2 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 1 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 42 |
11 files changed, 162 insertions, 75 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index 9122bb58..a99041ab 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -21,7 +21,7 @@ struct nni_ep { uint64_t ep_id; // endpoint id nni_list_node ep_node; // per socket list nni_sock * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; + char ep_url[NNG_MAXADDRLEN]; int ep_mode; int ep_started; int ep_closed; // full shutdown @@ -82,6 +82,12 @@ nni_ep_id(nni_ep *ep) return ((uint32_t) ep->ep_id); } +const char * +nni_ep_url(nni_ep *ep) +{ + return (ep->ep_url); +} + static void nni_ep_destroy(nni_ep *ep) { @@ -117,16 +123,16 @@ nni_ep_destroy(nni_ep *ep) } static int -nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) +nni_ep_create(nni_ep **epp, nni_sock *s, const char *url, int mode) { nni_tran *tran; nni_ep * ep; int rv; - if ((tran = nni_tran_find(addr)) == NULL) { + if ((tran = nni_tran_find(url)) == NULL) { return (NNG_ENOTSUP); } - if (strlen(addr) >= NNG_MAXADDRLEN) { + if (strlen(url) >= NNG_MAXADDRLEN) { return (NNG_EINVAL); } @@ -146,7 +152,7 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) // dereference on hot paths. ep->ep_ops = *tran->tran_ep; - (void) nni_strlcpy(ep->ep_addr, addr, sizeof(ep->ep_addr)); + (void) nni_strlcpy(ep->ep_url, url, sizeof(ep->ep_url)); NNI_LIST_NODE_INIT(&ep->ep_node); @@ -159,7 +165,7 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) ((rv = nni_aio_init(&ep->ep_con_aio, nni_ep_con_cb, ep)) != 0) || ((rv = nni_aio_init(&ep->ep_tmo_aio, nni_ep_tmo_cb, ep)) != 0) || ((rv = nni_aio_init(&ep->ep_con_syn, NULL, NULL)) != 0) || - ((rv = ep->ep_ops.ep_init(&ep->ep_data, addr, s, mode)) != 0) || + ((rv = ep->ep_ops.ep_init(&ep->ep_data, url, s, mode)) != 0) || ((rv = nni_idhash_alloc(nni_eps, &ep->ep_id, ep)) != 0) || ((rv = nni_sock_ep_add(s, ep)) != 0)) { nni_ep_destroy(ep); @@ -171,15 +177,15 @@ nni_ep_create(nni_ep **epp, nni_sock *s, const char *addr, int mode) } int -nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *addr) +nni_ep_create_dialer(nni_ep **epp, nni_sock *s, const char *url) { - return (nni_ep_create(epp, s, addr, NNI_EP_MODE_DIAL)); + return (nni_ep_create(epp, s, url, NNI_EP_MODE_DIAL)); } int -nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *addr) +nni_ep_create_listener(nni_ep **epp, nni_sock *s, const char *url) { - return (nni_ep_create(epp, s, addr, NNI_EP_MODE_LISTEN)); + return (nni_ep_create(epp, s, url, NNI_EP_MODE_LISTEN)); } int @@ -603,6 +609,9 @@ nni_ep_getopt(nni_ep *ep, int opt, void *valp, size_t *szp) { int rv; + if (opt == nng_optid_url) { + return (nni_getopt_str(ep->ep_url, valp, szp)); + } if (ep->ep_ops.ep_getopt == NULL) { return (NNG_ENOTSUP); } diff --git a/src/core/endpt.h b/src/core/endpt.h index de058d4b..161c030f 100644 --- a/src/core/endpt.h +++ b/src/core/endpt.h @@ -11,26 +11,27 @@ #ifndef CORE_ENDPT_H #define CORE_ENDPT_H -extern int nni_ep_sys_init(void); -extern void nni_ep_sys_fini(void); -extern nni_tran *nni_ep_tran(nni_ep *); -extern nni_sock *nni_ep_sock(nni_ep *); -extern int nni_ep_find(nni_ep **, uint32_t); -extern int nni_ep_hold(nni_ep *); -extern void nni_ep_rele(nni_ep *); -extern uint32_t nni_ep_id(nni_ep *); -extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *); -extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *); -extern void nni_ep_stop(nni_ep *); -extern int nni_ep_shutdown(nni_ep *); -extern void nni_ep_close(nni_ep *); -extern int nni_ep_dial(nni_ep *, int); -extern int nni_ep_listen(nni_ep *, int); -extern void nni_ep_list_init(nni_list *); -extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int); -extern int nni_ep_getopt(nni_ep *, int, void *, size_t *); -extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); -extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); +extern int nni_ep_sys_init(void); +extern void nni_ep_sys_fini(void); +extern nni_tran * nni_ep_tran(nni_ep *); +extern nni_sock * nni_ep_sock(nni_ep *); +extern int nni_ep_find(nni_ep **, uint32_t); +extern int nni_ep_hold(nni_ep *); +extern void nni_ep_rele(nni_ep *); +extern uint32_t nni_ep_id(nni_ep *); +extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *); +extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *); +extern void nni_ep_stop(nni_ep *); +extern int nni_ep_shutdown(nni_ep *); +extern void nni_ep_close(nni_ep *); +extern int nni_ep_dial(nni_ep *, int); +extern int nni_ep_listen(nni_ep *, int); +extern void nni_ep_list_init(nni_list *); +extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int); +extern int nni_ep_getopt(nni_ep *, int, void *, size_t *); +extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *); +extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *); +extern const char *nni_ep_url(nni_ep *); // Endpoint modes. Currently used by transports. Remove this when we make // transport dialers and listeners explicit. diff --git a/src/core/options.c b/src/core/options.c index 76025710..b7934a06 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -136,6 +136,19 @@ nni_getopt_usec(nni_duration u, void *val, size_t *sizep) } int +nni_getopt_sockaddr(const nng_sockaddr *sa, void *val, size_t *sizep) +{ + size_t sz = sizeof(*sa); + + if (sz > *sizep) { + sz = *sizep; + } + *sizep = sizeof(*sa); + memcpy(val, sa, sz); + return (0); +} + +int nni_getopt_int(int i, void *val, size_t *sizep) { size_t sz = sizeof(i); @@ -403,6 +416,7 @@ nni_option_sys_init(void) ((rv = OPT_REGISTER(remaddr)) != 0) || ((rv = OPT_REGISTER(recvfd)) != 0) || ((rv = OPT_REGISTER(sendfd)) != 0) || + ((rv = OPT_REGISTER(url)) != 0) || ((rv = OPT_REGISTER(req_resendtime)) != 0) || ((rv = OPT_REGISTER(sub_subscribe)) != 0) || ((rv = OPT_REGISTER(sub_unsubscribe)) != 0) || diff --git a/src/core/options.h b/src/core/options.h index ed091703..64036db1 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -44,6 +44,9 @@ extern int nni_getopt_u64(uint64_t, void *, size_t *); // nni_getopt_str gets a C style string. extern int nni_getopt_str(const char *, void *, size_t *); +// nni_getopt_sockaddr gets an nng_sockaddr. +extern int nni_getopt_sockaddr(const nng_sockaddr *, void *, size_t *); + // nni_setopt_size sets a size_t option. extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t); diff --git a/src/core/pipe.c b/src/core/pipe.c index 0670ed01..edc8c15d 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -27,6 +27,7 @@ struct nni_pipe { int p_reap; int p_stop; int p_refcnt; + const char * p_url; nni_mtx p_mtx; nni_cv p_cv; nni_list_node p_reap_node; @@ -34,6 +35,7 @@ struct nni_pipe { }; static nni_idhash *nni_pipes; +static nni_mtx nni_pipe_lk; static nni_list nni_pipe_reap_list; static nni_mtx nni_pipe_reap_lk; @@ -49,6 +51,7 @@ nni_pipe_sys_init(void) int rv; NNI_LIST_INIT(&nni_pipe_reap_list, nni_pipe, p_reap_node); + nni_mtx_init(&nni_pipe_lk); nni_mtx_init(&nni_pipe_reap_lk); nni_cv_init(&nni_pipe_reap_cv, &nni_pipe_reap_lk); @@ -85,6 +88,7 @@ nni_pipe_sys_fini(void) nni_thr_fini(&nni_pipe_reap_thr); nni_cv_fini(&nni_pipe_reap_cv); nni_mtx_fini(&nni_pipe_reap_lk); + nni_mtx_fini(&nni_pipe_lk); if (nni_pipes != NULL) { nni_idhash_fini(nni_pipes); nni_pipes = NULL; @@ -103,11 +107,14 @@ nni_pipe_destroy(nni_pipe *p) // Make sure any unlocked holders are done with this. // This happens during initialization for example. - nni_mtx_lock(&p->p_mtx); + nni_mtx_lock(&nni_pipe_lk); + if (p->p_id != 0) { + nni_idhash_remove(nni_pipes, p->p_id); + } while (p->p_refcnt != 0) { nni_cv_wait(&p->p_cv); } - nni_mtx_unlock(&p->p_mtx); + nni_mtx_unlock(&nni_pipe_lk); // We have exclusive access at this point, so we can check if // we are still on any lists. @@ -124,13 +131,35 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.p_fini(p->p_tran_data); } - if (p->p_id != 0) { - nni_idhash_remove(nni_pipes, p->p_id); - } nni_mtx_fini(&p->p_mtx); NNI_FREE_STRUCT(p); } +int +nni_pipe_find(nni_pipe **pp, uint32_t id) +{ + int rv; + nni_pipe *p; + nni_mtx_lock(&nni_pipe_lk); + if ((rv = nni_idhash_find(nni_pipes, id, (void **) &p)) == 0) { + p->p_refcnt++; + *pp = p; + } + nni_mtx_unlock(&nni_pipe_lk); + return (rv); +} + +void +nni_pipe_rele(nni_pipe *p) +{ + nni_mtx_lock(&nni_pipe_lk); + p->p_refcnt--; + if (p->p_refcnt == 0) { + nni_cv_wake(&p->p_cv); + } + nni_mtx_unlock(&nni_pipe_lk); +} + // nni_pipe_id returns the 32-bit pipe id, which can be used in backtraces. uint32_t nni_pipe_id(nni_pipe *p) @@ -238,16 +267,21 @@ nni_pipe_create(nni_ep *ep, void *tdata) p->p_proto_data = NULL; p->p_ep = ep; p->p_sock = sock; + p->p_url = nni_ep_url(ep); NNI_LIST_NODE_INIT(&p->p_reap_node); NNI_LIST_NODE_INIT(&p->p_sock_node); NNI_LIST_NODE_INIT(&p->p_ep_node); nni_mtx_init(&p->p_mtx); - nni_cv_init(&p->p_cv, &p->p_mtx); + nni_cv_init(&p->p_cv, &nni_pipe_lk); nni_aio_init(&p->p_start_aio, nni_pipe_start_cb, p); - if (((rv = nni_idhash_alloc(nni_pipes, &p->p_id, p)) != 0) || + nni_mtx_lock(&nni_pipe_lk); + rv = nni_idhash_alloc(nni_pipes, &p->p_id, p); + nni_mtx_unlock(&nni_pipe_lk); + + if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) || ((rv = nni_sock_pipe_add(sock, p)) != 0)) { nni_pipe_destroy(p); @@ -259,11 +293,19 @@ nni_pipe_create(nni_ep *ep, void *tdata) int nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp) { - /* This should only be called with the mutex held... */ - if (p->p_tran_ops.p_getopt == NULL) { - return (NNG_ENOTSUP); + int rv = NNG_ENOTSUP; + + if (opt == nng_optid_url) { + return (nni_getopt_str(p->p_url, val, szp)); } - return (p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp)); + if (p->p_tran_ops.p_getopt != NULL) { + rv = p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp); + } + if (rv == NNG_ENOTSUP) { + // Maybe its a generic socket option? + rv = nni_sock_getopt(p->p_sock, opt, val, szp); + } + return (rv); } void diff --git a/src/core/pipe.h b/src/core/pipe.h index 9436d650..bb55a8cd 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -57,7 +57,7 @@ extern void nni_pipe_start(nni_pipe *); extern uint16_t nni_pipe_proto(nni_pipe *); extern uint16_t nni_pipe_peer(nni_pipe *); -extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); +extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep); // nni_pipe_get_proto_data gets the protocol private data set with the // nni_pipe_set_proto_data function. No locking is performed. @@ -72,4 +72,11 @@ extern void nni_pipe_sock_list_init(nni_list *); // a per-endpoint list. extern void nni_pipe_ep_list_init(nni_list *); +// nni_pipe_find finds a pipe given its ID. It places a hold on the +// pipe, which must be released by the caller when it is done. +extern int nni_pipe_find(nni_pipe **, uint32_t); + +// nni_pipe_rele releases the hold on the pipe placed by nni_pipe_find. +extern void nni_pipe_rele(nni_pipe *); + #endif // CORE_PIPE_H @@ -663,30 +663,34 @@ nng_strerror(int num) return ("Unknown error"); } -#if 0 int -nng_pipe_getopt(nng_pipe *pipe, int opt, void *val, size_t *sizep) +nng_pipe_getopt(nng_pipe id, int opt, void *val, size_t *sizep) { - int rv; + int rv; + nni_pipe *p; - rv = nni_pipe_getopt(pipe, opt, val, sizep); - if (rv == ENOTSUP) { - // Maybe its a generic socket option. - rv = nni_sock_getopt(pipe->p_sock, opt, val, sizep); + if ((rv = nni_pipe_find(&p, id)) != 0) { + return (rv); } + rv = nni_pipe_getopt(p, opt, val, sizep); + nni_pipe_rele(p); return (rv); } - int -nng_pipe_close(nng_pipe *pipe) +nng_pipe_close(nng_pipe id) { - nni_pipe_close(pipe); + int rv; + nni_pipe *p; + + if ((rv = nni_pipe_find(&p, id)) != 0) { + return (rv); + } + nni_pipe_close(p); + nni_pipe_rele(p); return (0); } -#endif - // Message handling. int nng_msg_alloc(nng_msg **msgp, size_t size) @@ -1012,6 +1016,7 @@ const char *nng_opt_recvfd = "recv-fd"; const char *nng_opt_sendfd = "send-fd"; const char *nng_opt_locaddr = "local-address"; const char *nng_opt_remaddr = "remote-address"; +const char *nng_opt_url = "url"; // Well known protocol options. const char *nng_opt_req_resendtime = "req:resend-time"; const char *nng_opt_sub_subscribe = "sub:subscribe"; @@ -1034,6 +1039,7 @@ int nng_optid_recvfd; int nng_optid_sendfd; int nng_optid_locaddr; int nng_optid_remaddr; +int nng_optid_url; int nng_optid_req_resendtime; int nng_optid_sub_subscribe; int nng_optid_sub_unsubscribe; @@ -419,6 +419,7 @@ NNG_DECL const char *nng_opt_recvfd; NNG_DECL const char *nng_opt_sendfd; NNG_DECL const char *nng_opt_locaddr; NNG_DECL const char *nng_opt_remaddr; +NNG_DECL const char *nng_opt_url; NNG_DECL const char *nng_opt_req_resendtime; NNG_DECL const char *nng_opt_sub_subscribe; NNG_DECL const char *nng_opt_sub_unsubscribe; @@ -440,6 +441,7 @@ NNG_DECL int nng_optid_recvfd; NNG_DECL int nng_optid_sendfd; NNG_DECL int nng_optid_locaddr; NNG_DECL int nng_optid_remaddr; +NNG_DECL int nng_optid_url; // These protocol specific options may not be valid until a socket of // the given protocol is opened! diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index c26be0b0..6641c58f 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -275,6 +275,8 @@ rep_pipe_recv_cb(void *arg) msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + // Store the pipe id in the header, first thing. rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); if (rv != 0) { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 1b68c6dd..c2008a9a 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -405,6 +405,7 @@ req_recv_cb(void *arg) msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); // We yank 4 bytes of body, and move them to the header. if (nni_msg_len(msg) < 4) { diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 5abd9e3a..b65adfee 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -28,6 +28,7 @@ struct nni_ipc_pipe { uint16_t peer; uint16_t proto; size_t rcvmax; + nng_sockaddr sa; uint8_t txhead[1 + sizeof(uint64_t)]; uint8_t rxhead[1 + sizeof(uint64_t)]; @@ -128,10 +129,13 @@ nni_ipc_pipe_init(nni_ipc_pipe **pipep, nni_ipc_ep *ep, void *ipp) return (rv); } - p->proto = ep->proto; - p->rcvmax = ep->rcvmax; - p->ipp = ipp; - p->addr = ep->addr; + p->proto = ep->proto; + p->rcvmax = ep->rcvmax; + p->ipp = ipp; + p->addr = ep->addr; + p->sa.s_un.s_path.sa_family = NNG_AF_IPC; + nni_strlcpy(p->sa.s_un.s_path.sa_path, p->addr + strlen("ipc://"), + sizeof(p->sa.s_un.s_path.sa_path)); *pipep = p; return (0); @@ -461,24 +465,20 @@ nni_ipc_pipe_peer(void *arg) static int nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) { -#if 0 - nni_inproc_pipe *pipe = arg; - size_t len; - - switch (option) { - case NNG_OPT_LOCALADDR: - case NNG_OPT_REMOTEADDR: - len = strlen(pipe->addr) + 1; - if (len > *szp) { - (void) memcpy(buf, pipe->addr, *szp); - } else { - (void) memcpy(buf, pipe->addr, len); - } - *szp = len; - return (0); + + nni_ipc_pipe *pipe = arg; + size_t len; + int rv; + + if ((option == nng_optid_locaddr) || (option == nng_optid_remaddr)) { + rv = nni_getopt_sockaddr(&pipe->sa, buf, szp); + } else if (option == nng_optid_recvmaxsz) { + rv = nni_getopt_size(pipe->rcvmax, &buf, szp); + + } else { + rv = NNG_ENOTSUP; } -#endif - return (NNG_ENOTSUP); + return (rv); } static void |
