aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-08 19:07:12 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-08 19:07:12 -0700
commit5f0398de8edd1ed4ddbf6455c66273a6608aad9a (patch)
tree80ad67ddb14f3e3e329b62d076b6ed2888e0bb53
parentfec1e51b8c193152120d22c1898d71a2a3bbc934 (diff)
downloadnng-5f0398de8edd1ed4ddbf6455c66273a6608aad9a.tar.gz
nng-5f0398de8edd1ed4ddbf6455c66273a6608aad9a.tar.bz2
nng-5f0398de8edd1ed4ddbf6455c66273a6608aad9a.zip
fixes #37 Make transports pluggable
We automatically register inproc, TCP, and IPC. We can add more now by just calling nni_tran_register(). (There is no unregister support.) This requires transports to have access to the AIO framework (so that needs to be something we consider), and a few nni_sock calls to get socket options. Going forward we should version the ops vectors, and move to pushing down transport options from the framework via setopt calls -- there is no reason really that transports need to know all these.
-rw-r--r--src/core/transport.c93
-rw-r--r--src/core/transport.h1
-rw-r--r--tests/trantest.h44
3 files changed, 87 insertions, 51 deletions
diff --git a/src/core/transport.c b/src/core/transport.c
index ff72c754..278c7d1e 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -10,6 +10,7 @@
#include "core/nng_impl.h"
+#include <stdio.h>
#include <string.h>
// For now the list of transports is hard-wired. Adding new transports
@@ -18,34 +19,60 @@ extern nni_tran nni_inproc_tran;
extern nni_tran nni_tcp_tran;
extern nni_tran nni_ipc_tran;
-static nni_tran *transports[] = {
- // clang-format off
- &nni_inproc_tran,
- &nni_tcp_tran,
- &nni_ipc_tran,
- NULL
- // clang-format on
-};
+typedef struct nni_transport {
+ nni_tran t_tran;
+ char t_prefix[16]; // e.g. "tcp://" or "tls+tcp://"
+ nni_list_node t_node;
+} nni_transport;
+
+static nni_list nni_tran_list;
+static nni_mtx nni_tran_lk;
+
+int
+nni_tran_register(const nni_tran *tran)
+{
+ nni_transport *t;
+ int rv;
+
+ nni_mtx_lock(&nni_tran_lk);
+ // Check to see if the transport is already registered...
+ NNI_LIST_FOREACH (&nni_tran_list, t) {
+ if (strcmp(tran->tran_scheme, t->t_tran.tran_scheme) == 0) {
+ nni_mtx_unlock(&nni_tran_lk);
+ return (NNG_ESTATE);
+ }
+ }
+ if ((t = NNI_ALLOC_STRUCT(t)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+
+ t->t_tran = *tran;
+ (void) snprintf(
+ t->t_prefix, sizeof(t->t_prefix), "%s://", tran->tran_scheme);
+ if ((rv = t->t_tran.tran_init()) != 0) {
+ nni_mtx_unlock(&nni_tran_lk);
+ NNI_FREE_STRUCT(t);
+ return (rv);
+ }
+ nni_list_append(&nni_tran_list, t);
+ nni_mtx_unlock(&nni_tran_lk);
+ return (0);
+}
nni_tran *
nni_tran_find(const char *addr)
{
// address is of the form "<scheme>://blah..."
- const char *end;
- int len;
- int i;
- nni_tran * tran;
+ nni_transport *t;
- if ((end = strstr(addr, "://")) == NULL) {
- return (NULL);
- }
- len = (int) (end - addr);
- for (i = 0; (tran = transports[i]) != NULL; i++) {
- if ((strncmp(addr, tran->tran_scheme, len) == 0) &&
- (tran->tran_scheme[len] == '\0')) {
- return (tran);
+ nni_mtx_lock(&nni_tran_lk);
+ NNI_LIST_FOREACH (&nni_tran_list, t) {
+ if (strncmp(addr, t->t_prefix, strlen(t->t_prefix)) == 0) {
+ nni_mtx_unlock(&nni_tran_lk);
+ return (&t->t_tran);
}
}
+ nni_mtx_unlock(&nni_tran_lk);
return (NULL);
}
@@ -54,14 +81,15 @@ nni_tran_find(const char *addr)
int
nni_tran_sys_init(void)
{
- nni_tran *tran;
+ int rv;
- for (int i = 0; (tran = transports[i]) != NULL; i++) {
- int rv;
- if ((rv = tran->tran_init()) != 0) {
- nni_tran_sys_fini();
- return (rv);
- }
+ NNI_LIST_INIT(&nni_tran_list, nni_transport, t_node);
+ if (((rv = nni_mtx_init(&nni_tran_lk)) != 0) ||
+ ((rv = nni_tran_register(&nni_inproc_tran)) != 0) ||
+ ((rv = nni_tran_register(&nni_ipc_tran)) != 0) ||
+ ((rv = nni_tran_register(&nni_tcp_tran)) != 0)) {
+ nni_tran_sys_fini();
+ return (rv);
}
return (0);
}
@@ -71,11 +99,12 @@ nni_tran_sys_init(void)
void
nni_tran_sys_fini(void)
{
- nni_tran *tran;
+ nni_transport *t;
- for (int i = 0; (tran = transports[i]) != NULL; i++) {
- if (tran->tran_fini != NULL) {
- tran->tran_fini();
- }
+ while ((t = nni_list_first(&nni_tran_list)) != NULL) {
+ nni_list_remove(&nni_tran_list, t);
+ t->t_tran.tran_fini();
+ NNI_FREE_STRUCT(t);
}
+ nni_mtx_fini(&nni_tran_lk);
}
diff --git a/src/core/transport.h b/src/core/transport.h
index a739e7d2..11cdcf99 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -120,5 +120,6 @@ struct nni_tran_pipe {
extern nni_tran *nni_tran_find(const char *);
extern int nni_tran_sys_init(void);
extern void nni_tran_sys_fini(void);
+extern int nni_tran_register(const nni_tran *);
#endif // CORE_TRANSPORT_H
diff --git a/tests/trantest.h b/tests/trantest.h
index fab3371d..fe6e5431 100644
--- a/tests/trantest.h
+++ b/tests/trantest.h
@@ -1,5 +1,6 @@
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
+// Copyright 2017 Capitar IT Group BV <info@capitar.com>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -8,8 +9,8 @@
//
#include "convey.h"
-#include "nng.h"
#include "core/nng_impl.h"
+#include "nng.h"
#include <string.h>
// Transport common tests. By making a common test framework for transports,
@@ -18,20 +19,21 @@
// for comms.
typedef struct {
- char addr[NNG_MAXADDRLEN+1];
+ char addr[NNG_MAXADDRLEN + 1];
nng_socket reqsock;
nng_socket repsock;
- nni_tran *tran;
+ nni_tran * tran;
} trantest;
void
trantest_init(trantest *tt, const char *addr)
{
- snprintf(tt->addr, sizeof (tt->addr), "%s", addr);
- tt->tran = nni_tran_find(addr);
- So(tt->tran != NULL);
+ (void) snprintf(tt->addr, sizeof(tt->addr), "%s", addr);
So(nng_open(&tt->reqsock, NNG_PROTO_REQ) == 0);
So(nng_open(&tt->repsock, NNG_PROTO_REP) == 0);
+
+ tt->tran = nni_tran_find(addr);
+ So(tt->tran != NULL);
}
void
@@ -57,9 +59,11 @@ trantest_conn_refused(trantest *tt)
Convey("Connection refused works", {
nng_endpoint ep = 0;
- So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_ECONNREFUSED);
+ So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
+ NNG_ECONNREFUSED);
So(ep == 0);
- So(nng_dial(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_ECONNREFUSED);
+ So(nng_dial(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
+ NNG_ECONNREFUSED);
So(ep == 0);
})
}
@@ -69,10 +73,12 @@ trantest_duplicate_listen(trantest *tt)
{
Convey("Duplicate listen rejected", {
nng_endpoint ep;
- So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
+ So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
+ 0);
So(ep != 0);
ep = 0;
- So(nng_listen(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == NNG_EADDRINUSE);
+ So(nng_listen(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
+ NNG_EADDRINUSE);
So(ep == 0);
})
}
@@ -80,10 +86,11 @@ trantest_duplicate_listen(trantest *tt)
void
trantest_listen_accept(trantest *tt)
{
- Convey("Listen and accept" ,{
+ Convey("Listen and accept", {
nng_endpoint ep;
ep = 0;
- So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
+ So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
+ 0);
So(ep != 0);
ep = 0;
@@ -97,12 +104,13 @@ trantest_send_recv(trantest *tt)
{
Convey("Send and recv", {
nng_endpoint ep = 0;
- nng_msg *send;
- nng_msg *recv;
- size_t len;
+ nng_msg * send;
+ nng_msg * recv;
+ size_t len;
ep = 0;
- So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
+ So(nng_listen(tt->repsock, tt->addr, &ep, NNG_FLAG_SYNCH) ==
+ 0);
So(ep != 0);
ep = 0;
So(nng_dial(tt->reqsock, tt->addr, &ep, NNG_FLAG_SYNCH) == 0);
@@ -140,9 +148,7 @@ trantest_test_all(const char *addr)
Convey("Given transport", {
trantest_init(&tt, addr);
- Reset({
- trantest_fini(&tt);
- })
+ Reset({ trantest_fini(&tt); });
trantest_scheme(&tt);
trantest_conn_refused(&tt);