diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-08-08 19:07:12 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-08-08 19:07:12 -0700 |
| commit | 5f0398de8edd1ed4ddbf6455c66273a6608aad9a (patch) | |
| tree | 80ad67ddb14f3e3e329b62d076b6ed2888e0bb53 | |
| parent | fec1e51b8c193152120d22c1898d71a2a3bbc934 (diff) | |
| download | nng-5f0398de8edd1ed4ddbf6455c66273a6608aad9a.tar.gz nng-5f0398de8edd1ed4ddbf6455c66273a6608aad9a.tar.bz2 nng-5f0398de8edd1ed4ddbf6455c66273a6608aad9a.zip | |
fixes #37 Make transports pluggable
We automatically register inproc, TCP, and IPC. We can add more now
by just calling nni_tran_register(). (There is no unregister support.)
This requires transports to have access to the AIO framework (so that needs
to be something we consider), and a few nni_sock calls to get socket options.
Going forward we should version the ops vectors, and move to pushing down
transport options from the framework via setopt calls -- there is no reason
really that transports need to know all these.
| -rw-r--r-- | src/core/transport.c | 93 | ||||
| -rw-r--r-- | src/core/transport.h | 1 | ||||
| -rw-r--r-- | tests/trantest.h | 44 |
3 files changed, 87 insertions, 51 deletions
diff --git a/src/core/transport.c b/src/core/transport.c index ff72c754..278c7d1e 100644 --- a/src/core/transport.c +++ b/src/core/transport.c @@ -10,6 +10,7 @@ #include "core/nng_impl.h" +#include <stdio.h> #include <string.h> // For now the list of transports is hard-wired. Adding new transports @@ -18,34 +19,60 @@ extern nni_tran nni_inproc_tran; extern nni_tran nni_tcp_tran; extern nni_tran nni_ipc_tran; -static nni_tran *transports[] = { - // clang-format off - &nni_inproc_tran, - &nni_tcp_tran, - &nni_ipc_tran, - NULL - // clang-format on -}; +typedef struct nni_transport { + nni_tran t_tran; + char t_prefix[16]; // e.g. "tcp://" or "tls+tcp://" + nni_list_node t_node; +} nni_transport; + +static nni_list nni_tran_list; +static nni_mtx nni_tran_lk; + +int +nni_tran_register(const nni_tran *tran) +{ + nni_transport *t; + int rv; + + nni_mtx_lock(&nni_tran_lk); + // Check to see if the transport is already registered... + NNI_LIST_FOREACH (&nni_tran_list, t) { + if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) { + nni_mtx_unlock(&nni_tran_lk); + return (NNG_ESTATE); + } + } + if ((t = NNI_ALLOC_STRUCT(t)) == NULL) { + return (NNG_ENOMEM); + } + + t->t_tran = *tran; + (void) snprintf( + t->t_prefix, sizeof(t->t_prefix), "%s://", tran->tran_scheme); + if ((rv = t->t_tran.tran_init()) != 0) { + nni_mtx_unlock(&nni_tran_lk); + NNI_FREE_STRUCT(t); + return (rv); + } + nni_list_append(&nni_tran_list, t); + nni_mtx_unlock(&nni_tran_lk); + return (0); +} nni_tran * nni_tran_find(const char *addr) { // address is of the form "<scheme>://blah..." - const char *end; - int len; - int i; - nni_tran * tran; + nni_transport *t; - if ((end = strstr(addr, "://")) == NULL) { - return (NULL); - } - len = (int) (end - addr); - for (i = 0; (tran = transports[i]) != NULL; i++) { - if ((strncmp(addr, tran->tran_scheme, len) == 0) && - (tran->tran_scheme[len] == '\0')) { - return (tran); + nni_mtx_lock(&nni_tran_lk); + NNI_LIST_FOREACH (&nni_tran_list, t) { + if (strncmp(addr, t->t_prefix, strlen(t->t_prefix)) == 0) { + nni_mtx_unlock(&nni_tran_lk); + return (&t->t_tran); } } + nni_mtx_unlock(&nni_tran_lk); return (NULL); } @@ -54,14 +81,15 @@ nni_tran_find(const char *addr) int nni_tran_sys_init(void) { - nni_tran *tran; + int rv; - for (int i = 0; (tran = transports[i]) != NULL; i++) { - int rv; - if ((rv = tran->tran_init()) != 0) { - nni_tran_sys_fini(); - return (rv); - } + NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node); + if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) || + ((rv = nni_tran_register(&nni_inproc_tran)) != 0) || + ((rv = nni_tran_register(&nni_ipc_tran)) != 0) || + ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) { + nni_tran_sys_fini(); + return (rv); } return (0); } @@ -71,11 +99,12 @@ nni_tran_sys_init(void) void nni_tran_sys_fini(void) { - nni_tran *tran; + nni_transport *t; - for (int i = 0; (tran = transports[i]) != NULL; i++) { - if (tran->tran_fini != NULL) { - tran->tran_fini(); - } + while ((t = nni_list_first(&nni_tran_list)) != NULL) { + nni_list_remove(&nni_tran_list, t); + t->t_tran.tran_fini(); + NNI_FREE_STRUCT(t); } + nni_mtx_fini(&nni_tran_lk); } diff --git a/src/core/transport.h b/src/core/transport.h index a739e7d2..11cdcf99 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -120,5 +120,6 @@ struct nni_tran_pipe { extern nni_tran *nni_tran_find(const char *); extern int nni_tran_sys_init(void); extern void nni_tran_sys_fini(void); +extern int nni_tran_register(const nni_tran *); #endif // CORE_TRANSPORT_H diff --git a/tests/trantest.h b/tests/trantest.h index fab3371d..fe6e5431 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -1,5 +1,6 @@ // // Copyright 2017 Garrett D'Amore <garrett@damore.org> +// Copyright 2017 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -8,8 +9,8 @@ // #include "convey.h" -#include "nng.h" #include "core/nng_impl.h" +#include "nng.h" #include <string.h> // Transport common tests. By making a common test framework for transports, @@ -18,20 +19,21 @@ // for comms. typedef struct { - char addr[NNG_MAXADDRLEN+1]; + char addr[NNG_MAXADDRLEN + 1]; nng_socket reqsock; nng_socket repsock; - nni_tran *tran; + nni_tran * tran; } trantest; void trantest_init(trantest *tt, const char *addr) { - snprintf(tt->addr, sizeof (tt->addr), "%s", addr); - tt->tran = nni_tran_find(addr); - So(tt->tran != NULL); + (void) snprintf(tt->addr, sizeof(tt->addr), "%s", addr); So(nng_open(&tt->reqsock, NNG_PROTO_REQ) == 0); So(nng_open(&tt->repsock, NNG_PROTO_REP) == 0); + + tt->tran = nni_tran_find(addr); + So(tt->tran != NULL); } void @@ -57,9 +59,11 @@ trantest_conn_refused(trantest *tt) Convey("Connection refused works", { nng_endpoint ep = 0; - So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_ECONNREFUSED); + So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == + NNG_ECONNREFUSED); So(ep == 0); - So(nng_dial(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_ECONNREFUSED); + So(nng_dial(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == + NNG_ECONNREFUSED); So(ep == 0); }) } @@ -69,10 +73,12 @@ trantest_duplicate_listen(trantest *tt) { Convey("Duplicate listen rejected", { nng_endpoint ep; - So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0); + So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == + 0); So(ep != 0); ep = 0; - So(nng_listen(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_EADDRINUSE); + So(nng_listen(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == + NNG_EADDRINUSE); So(ep == 0); }) } @@ -80,10 +86,11 @@ trantest_duplicate_listen(trantest *tt) void trantest_listen_accept(trantest *tt) { - Convey("Listen and accept" ,{ + Convey("Listen and accept", { nng_endpoint ep; ep = 0; - So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0); + So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == + 0); So(ep != 0); ep = 0; @@ -97,12 +104,13 @@ trantest_send_recv(trantest *tt) { Convey("Send and recv", { nng_endpoint ep = 0; - nng_msg *send; - nng_msg *recv; - size_t len; + nng_msg * send; + nng_msg * recv; + size_t len; ep = 0; - So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0); + So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == + 0); So(ep != 0); ep = 0; So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0); @@ -140,9 +148,7 @@ trantest_test_all(const char *addr) Convey("Given transport", { trantest_init(&tt, addr); - Reset({ - trantest_fini(&tt); - }) + Reset({ trantest_fini(&tt); }); trantest_scheme(&tt); trantest_conn_refused(&tt); |
