aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/defs.h23
-rw-r--r--src/core/device.c8
-rw-r--r--src/core/endpt.c64
-rw-r--r--src/core/endpt.h38
-rw-r--r--src/core/options.c41
-rw-r--r--src/core/options.h10
-rw-r--r--src/core/pipe.c26
-rw-r--r--src/core/pipe.h2
-rw-r--r--src/core/protocol.h13
-rw-r--r--src/core/socket.c337
-rw-r--r--src/core/socket.h8
-rw-r--r--src/core/transport.c26
-rw-r--r--src/core/transport.h56
13 files changed, 439 insertions, 213 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 4d8e6ffb..ff02b28b 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -32,16 +32,19 @@ typedef struct nng_event nni_event;
typedef struct nng_notify nni_notify;
// These are our own names.
-typedef struct nni_socket nni_sock;
-typedef struct nni_ep nni_ep;
-typedef struct nni_pipe nni_pipe;
-typedef struct nni_tran nni_tran;
-typedef struct nni_tran_ep nni_tran_ep;
-typedef struct nni_tran_pipe nni_tran_pipe;
-
-typedef struct nni_proto_sock_ops nni_proto_sock_ops;
-typedef struct nni_proto_pipe_ops nni_proto_pipe_ops;
-typedef struct nni_proto nni_proto;
+typedef struct nni_socket nni_sock;
+typedef struct nni_ep nni_ep;
+typedef struct nni_pipe nni_pipe;
+typedef struct nni_tran nni_tran;
+typedef struct nni_tran_ep nni_tran_ep;
+typedef struct nni_tran_ep_option nni_tran_ep_option;
+typedef struct nni_tran_pipe nni_tran_pipe;
+typedef struct nni_tran_pipe_option nni_tran_pipe_option;
+
+typedef struct nni_proto_sock_ops nni_proto_sock_ops;
+typedef struct nni_proto_pipe_ops nni_proto_pipe_ops;
+typedef struct nni_proto_sock_option nni_proto_sock_option;
+typedef struct nni_proto nni_proto;
typedef struct nni_plat_mtx nni_mtx;
typedef struct nni_plat_cv nni_cv;
diff --git a/src/core/device.c b/src/core/device.c
index e7140664..9161e2f0 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -91,10 +91,10 @@ nni_device(nni_sock *sock1, nni_sock *sock2)
// No timeouts.
sz = sizeof(never);
- if ((nni_sock_setopt(sock1, nng_optid_recvtimeo, &never, sz) != 0) ||
- (nni_sock_setopt(sock2, nng_optid_recvtimeo, &never, sz) != 0) ||
- (nni_sock_setopt(sock1, nng_optid_sendtimeo, &never, sz) != 0) ||
- (nni_sock_setopt(sock2, nng_optid_sendtimeo, &never, sz) != 0)) {
+ if ((nni_sock_setopt(sock1, NNG_OPT_RECVTIMEO, &never, sz) != 0) ||
+ (nni_sock_setopt(sock2, NNG_OPT_RECVTIMEO, &never, sz) != 0) ||
+ (nni_sock_setopt(sock1, NNG_OPT_SENDTIMEO, &never, sz) != 0) ||
+ (nni_sock_setopt(sock2, NNG_OPT_SENDTIMEO, &never, sz) != 0)) {
// This should never happen.
rv = NNG_EINVAL;
goto out;
diff --git a/src/core/endpt.c b/src/core/endpt.c
index a99041ab..e6216ba3 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -480,7 +480,7 @@ nni_ep_acc_cb(void *arg)
break;
case NNG_ECLOSED:
case NNG_ECANCELED:
- // Canceled or closed, no furhter action.
+ // Canceled or closed, no further action.
break;
case NNG_ECONNABORTED:
case NNG_ECONNRESET:
@@ -587,38 +587,62 @@ nni_ep_pipe_remove(nni_ep *ep, nni_pipe *pipe)
}
int
-nni_ep_setopt(nni_ep *ep, int opt, const void *val, size_t sz, int check)
+nni_ep_setopt(nni_ep *ep, const char *name, const void *val, size_t sz)
{
- int rv;
+ nni_tran_ep_option *eo;
- if (ep->ep_ops.ep_setopt == NULL) {
- return (NNG_ENOTSUP);
+ if (strcmp(name, NNG_OPT_URL) == 0) {
+ return (NNG_EREADONLY);
}
- nni_mtx_lock(&ep->ep_mtx);
- if (check && ep->ep_started) {
+
+ for (eo = ep->ep_ops.ep_options; eo && eo->eo_name; eo++) {
+ int rv;
+
+ if (strcmp(eo->eo_name, name) != 0) {
+ continue;
+ }
+ if (eo->eo_setopt == NULL) {
+ return (NNG_EREADONLY);
+ }
+ nni_mtx_lock(&ep->ep_mtx);
+ // XXX: Consider removing this test.
+ if (ep->ep_started) {
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (NNG_ESTATE);
+ }
+ rv = eo->eo_setopt(ep->ep_data, val, sz);
nni_mtx_unlock(&ep->ep_mtx);
- return (NNG_ESTATE);
+ return (rv);
}
- rv = ep->ep_ops.ep_setopt(ep->ep_data, opt, val, sz);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
+
+ // XXX: socket fallback
+ return (NNG_ENOTSUP);
}
int
-nni_ep_getopt(nni_ep *ep, int opt, void *valp, size_t *szp)
+nni_ep_getopt(nni_ep *ep, const char *name, void *valp, size_t *szp)
{
- int rv;
+ nni_tran_ep_option *eo;
- if (opt == nng_optid_url) {
+ if (strcmp(name, NNG_OPT_URL) == 0) {
return (nni_getopt_str(ep->ep_url, valp, szp));
}
- if (ep->ep_ops.ep_getopt == NULL) {
- return (NNG_ENOTSUP);
+
+ for (eo = ep->ep_ops.ep_options; eo && eo->eo_name; eo++) {
+ int rv;
+ if (strcmp(eo->eo_name, name) != 0) {
+ continue;
+ }
+ if (eo->eo_getopt == NULL) {
+ return (NNG_EWRITEONLY);
+ }
+ nni_mtx_lock(&ep->ep_mtx);
+ rv = eo->eo_getopt(ep->ep_data, valp, szp);
+ nni_mtx_unlock(&ep->ep_mtx);
+ return (rv);
}
- nni_mtx_lock(&ep->ep_mtx);
- rv = ep->ep_ops.ep_getopt(ep->ep_data, opt, valp, szp);
- nni_mtx_unlock(&ep->ep_mtx);
- return (rv);
+
+ return (nni_sock_getopt(ep->ep_sock, name, valp, szp));
}
void
diff --git a/src/core/endpt.h b/src/core/endpt.h
index 161c030f..d12d661f 100644
--- a/src/core/endpt.h
+++ b/src/core/endpt.h
@@ -11,25 +11,25 @@
#ifndef CORE_ENDPT_H
#define CORE_ENDPT_H
-extern int nni_ep_sys_init(void);
-extern void nni_ep_sys_fini(void);
-extern nni_tran * nni_ep_tran(nni_ep *);
-extern nni_sock * nni_ep_sock(nni_ep *);
-extern int nni_ep_find(nni_ep **, uint32_t);
-extern int nni_ep_hold(nni_ep *);
-extern void nni_ep_rele(nni_ep *);
-extern uint32_t nni_ep_id(nni_ep *);
-extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
-extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
-extern void nni_ep_stop(nni_ep *);
-extern int nni_ep_shutdown(nni_ep *);
-extern void nni_ep_close(nni_ep *);
-extern int nni_ep_dial(nni_ep *, int);
-extern int nni_ep_listen(nni_ep *, int);
-extern void nni_ep_list_init(nni_list *);
-extern int nni_ep_setopt(nni_ep *, int, const void *, size_t, int);
-extern int nni_ep_getopt(nni_ep *, int, void *, size_t *);
-extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
+extern int nni_ep_sys_init(void);
+extern void nni_ep_sys_fini(void);
+extern nni_tran *nni_ep_tran(nni_ep *);
+extern nni_sock *nni_ep_sock(nni_ep *);
+extern int nni_ep_find(nni_ep **, uint32_t);
+extern int nni_ep_hold(nni_ep *);
+extern void nni_ep_rele(nni_ep *);
+extern uint32_t nni_ep_id(nni_ep *);
+extern int nni_ep_create_dialer(nni_ep **, nni_sock *, const char *);
+extern int nni_ep_create_listener(nni_ep **, nni_sock *, const char *);
+extern void nni_ep_stop(nni_ep *);
+extern int nni_ep_shutdown(nni_ep *);
+extern void nni_ep_close(nni_ep *);
+extern int nni_ep_dial(nni_ep *, int);
+extern int nni_ep_listen(nni_ep *, int);
+extern void nni_ep_list_init(nni_list *);
+extern int nni_ep_setopt(nni_ep *, const char *, const void *, size_t);
+extern int nni_ep_getopt(nni_ep *, const char *, void *, size_t *);
+extern int nni_ep_pipe_add(nni_ep *ep, nni_pipe *);
extern void nni_ep_pipe_remove(nni_ep *, nni_pipe *);
extern const char *nni_ep_url(nni_ep *);
diff --git a/src/core/options.c b/src/core/options.c
index b7934a06..e9a79f35 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -344,23 +344,6 @@ nni_option_lookup(const char *name)
return (id);
}
-const char *
-nni_option_name(int id)
-{
- nni_option *opt;
- const char *name = NULL;
-
- nni_mtx_lock(&nni_option_lk);
- NNI_LIST_FOREACH (&nni_options, opt) {
- if (id == opt->o_id) {
- name = opt->o_name;
- break;
- }
- }
- nni_mtx_unlock(&nni_option_lk);
- return (name);
-}
-
int
nni_option_register(const char *name, int *idp)
{
@@ -390,6 +373,15 @@ nni_option_sys_fini(void)
nni_option_nextid = 0;
}
+int nni_optid_raw;
+int nni_optid_recvmaxsz;
+int nni_optid_maxttl;
+int nni_optid_protocol;
+int nni_optid_transport;
+int nni_optid_locaddr;
+int nni_optid_remaddr;
+int nni_optid_surveyor_surveytime;
+
int
nni_option_sys_init(void)
{
@@ -398,28 +390,15 @@ nni_option_sys_init(void)
nni_option_nextid = 0x10000;
int rv;
-#define OPT_REGISTER(o) nni_option_register(nng_opt_##o, &nng_optid_##o)
+#define OPT_REGISTER(o) nni_option_register(nng_opt_##o, &nni_optid_##o)
// Register our well-known options.
if (((rv = OPT_REGISTER(raw)) != 0) ||
- ((rv = OPT_REGISTER(linger)) != 0) ||
- ((rv = OPT_REGISTER(recvbuf)) != 0) ||
- ((rv = OPT_REGISTER(sendbuf)) != 0) ||
- ((rv = OPT_REGISTER(recvtimeo)) != 0) ||
- ((rv = OPT_REGISTER(sendtimeo)) != 0) ||
- ((rv = OPT_REGISTER(reconnmint)) != 0) ||
- ((rv = OPT_REGISTER(reconnmaxt)) != 0) ||
((rv = OPT_REGISTER(recvmaxsz)) != 0) ||
((rv = OPT_REGISTER(maxttl)) != 0) ||
((rv = OPT_REGISTER(protocol)) != 0) ||
((rv = OPT_REGISTER(transport)) != 0) ||
((rv = OPT_REGISTER(locaddr)) != 0) ||
((rv = OPT_REGISTER(remaddr)) != 0) ||
- ((rv = OPT_REGISTER(recvfd)) != 0) ||
- ((rv = OPT_REGISTER(sendfd)) != 0) ||
- ((rv = OPT_REGISTER(url)) != 0) ||
- ((rv = OPT_REGISTER(req_resendtime)) != 0) ||
- ((rv = OPT_REGISTER(sub_subscribe)) != 0) ||
- ((rv = OPT_REGISTER(sub_unsubscribe)) != 0) ||
((rv = OPT_REGISTER(surveyor_surveytime)) != 0)) {
nni_option_sys_fini();
return (rv);
diff --git a/src/core/options.h b/src/core/options.h
index 64036db1..418a5d00 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -72,4 +72,14 @@ extern const char *nni_option_name(int);
extern int nni_option_sys_init(void);
extern void nni_option_sys_fini(void);
+extern int nni_optid_raw;
+extern int nni_optid_recvmaxsz;
+extern int nni_optid_maxttl;
+extern int nni_optid_protocol;
+extern int nni_optid_transport;
+extern int nni_optid_locaddr;
+extern int nni_optid_remaddr;
+extern int nni_optid_req_resendtime;
+extern int nni_optid_surveyor_surveytime;
+
#endif // CORE_OPTIONS_H
diff --git a/src/core/pipe.c b/src/core/pipe.c
index edc8c15d..7351997a 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -10,6 +10,8 @@
#include "core/nng_impl.h"
+#include <string.h>
+
// This file contains functions relating to pipes.
//
// Operations on pipes (to the transport) are generally blocking operations,
@@ -281,8 +283,7 @@ nni_pipe_create(nni_ep *ep, void *tdata)
rv = nni_idhash_alloc(nni_pipes, &p->p_id, p);
nni_mtx_unlock(&nni_pipe_lk);
- if ((rv != 0) ||
- ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
+ if ((rv != 0) || ((rv = nni_ep_pipe_add(ep, p)) != 0) ||
((rv = nni_sock_pipe_add(sock, p)) != 0)) {
nni_pipe_destroy(p);
}
@@ -291,21 +292,18 @@ nni_pipe_create(nni_ep *ep, void *tdata)
}
int
-nni_pipe_getopt(nni_pipe *p, int opt, void *val, size_t *szp)
+nni_pipe_getopt(nni_pipe *p, const char *name, void *val, size_t *szp)
{
- int rv = NNG_ENOTSUP;
+ nni_tran_pipe_option *po;
- if (opt == nng_optid_url) {
- return (nni_getopt_str(p->p_url, val, szp));
- }
- if (p->p_tran_ops.p_getopt != NULL) {
- rv = p->p_tran_ops.p_getopt(p->p_tran_data, opt, val, szp);
- }
- if (rv == NNG_ENOTSUP) {
- // Maybe its a generic socket option?
- rv = nni_sock_getopt(p->p_sock, opt, val, szp);
+ for (po = p->p_tran_ops.p_options; po && po->po_name; po++) {
+ if (strcmp(po->po_name, name) != 0) {
+ continue;
+ }
+ return (po->po_getopt(p->p_tran_data, val, szp));
}
- return (rv);
+ // Maybe the endpoint knows?
+ return (nni_ep_getopt(p->p_ep, name, val, szp));
}
void
diff --git a/src/core/pipe.h b/src/core/pipe.h
index bb55a8cd..54629810 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -57,7 +57,7 @@ extern void nni_pipe_start(nni_pipe *);
extern uint16_t nni_pipe_proto(nni_pipe *);
extern uint16_t nni_pipe_peer(nni_pipe *);
-extern int nni_pipe_getopt(nni_pipe *, int, void *, size_t *sizep);
+extern int nni_pipe_getopt(nni_pipe *, const char *, void *, size_t *);
// nni_pipe_get_proto_data gets the protocol private data set with the
// nni_pipe_set_proto_data function. No locking is performed.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 9416b2bf..0c0d93ce 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -47,6 +47,12 @@ struct nni_proto_pipe_ops {
void (*pipe_stop)(void *);
};
+struct nni_proto_sock_option {
+ const char *pso_name;
+ int (*pso_getopt)(void *, void *, size_t *);
+ int (*pso_setopt)(void *, const void *, size_t);
+};
+
struct nni_proto_sock_ops {
// sock_init creates the protocol instance, which will be stored on
// the socket. This is run without the sock lock held, and allocates
@@ -68,10 +74,6 @@ struct nni_proto_sock_ops {
// it can signal the socket worker threads to exit.
void (*sock_close)(void *);
- // Option manipulation. These may be NULL.
- int (*sock_setopt)(void *, int, const void *, size_t);
- int (*sock_getopt)(void *, int, void *, size_t *);
-
// Receive filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
// delivered to the application. To drop the message, the prtocol
@@ -81,6 +83,9 @@ struct nni_proto_sock_ops {
// Send filter. This may be NULL, but if it isn't, then messages
// here are filtered just after they come from the application.
nni_msg *(*sock_sfilter)(void *, nni_msg *);
+
+ // Options. Must not be NULL. Final entry should have NULL name.
+ nni_proto_sock_option *sock_options;
};
typedef struct nni_proto_id {
diff --git a/src/core/socket.c b/src/core/socket.c
index 03ae5a9d..dc305b48 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -18,9 +18,15 @@ static nni_list nni_sock_list;
static nni_idhash *nni_sock_hash;
static nni_mtx nni_sock_lk;
+typedef struct nni_socket_option {
+ const char *so_name;
+ int (*so_getopt)(nni_sock *, void *, size_t *);
+ int (*so_setopt)(nni_sock *, const void *, size_t);
+} nni_socket_option;
+
typedef struct nni_sockopt {
nni_list_node node;
- int opt;
+ char * name;
size_t sz;
void * data;
} nni_sockopt;
@@ -71,9 +77,158 @@ struct nni_socket {
nni_notifyfd s_recv_fd;
};
+#if 0
+if (opt == nni_optid_reconnmint) {
+ rv = nni_setopt_usec(&s->s_reconn, val, size);
+} else if (opt == nni_optid_reconnmaxt) {
+ rv = nni_setopt_usec(&s->s_reconnmax, val, size);
+} else if (opt == nni_optid_recvtimeo) {
+ rv = nni_setopt_usec(&s->s_rcvtimeo, val, size);
+} else if (opt == nni_optid_sendtimeo) {
+ rv = nni_setopt_usec(&s->s_sndtimeo, val, size);
+} else if (opt == nni_optid_sendbuf) {
+ rv = nni_setopt_buf(s->s_uwq, val, size);
+} else if (opt == nni_optid_recvbuf) {
+ rv = nni_setopt_buf(s->s_urq, val, size);
+} else if ((opt == nni_optid_sendfd) || (opt == nni_optid_recvfd) ||
+ (opt == nni_optid_locaddr) || (opt == nni_optid_remaddr)) {
+ // these options can be read, but cannot be set
+ rv = NNG_EINVAL;
+#endif
+
+static int
+nni_sock_getopt_sendfd(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, buf, szp));
+}
+
+static int
+nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, buf, szp));
+}
+
+static int
+nni_sock_setopt_recvtimeo(nni_sock *s, const void *buf, size_t sz)
+{
+ return (nni_setopt_usec(&s->s_rcvtimeo, buf, sz));
+}
+
+static int
+nni_sock_getopt_recvtimeo(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_usec(s->s_rcvtimeo, buf, szp));
+}
+
+static int
+nni_sock_setopt_sendtimeo(nni_sock *s, const void *buf, size_t sz)
+{
+ return (nni_setopt_usec(&s->s_sndtimeo, buf, sz));
+}
+
+static int
+nni_sock_getopt_sendtimeo(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_usec(s->s_sndtimeo, buf, szp));
+}
+
+static int
+nni_sock_setopt_reconnmint(nni_sock *s, const void *buf, size_t sz)
+{
+ return (nni_setopt_usec(&s->s_reconn, buf, sz));
+}
+
+static int
+nni_sock_getopt_reconnmint(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_usec(s->s_reconn, buf, szp));
+}
+
+static int
+nni_sock_setopt_reconnmaxt(nni_sock *s, const void *buf, size_t sz)
+{
+ return (nni_setopt_usec(&s->s_reconnmax, buf, sz));
+}
+
+static int
+nni_sock_getopt_reconnmaxt(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_usec(s->s_reconnmax, buf, szp));
+}
+
+static int
+nni_sock_setopt_recvbuf(nni_sock *s, const void *buf, size_t sz)
+{
+ return (nni_setopt_buf(s->s_urq, buf, sz));
+}
+
+static int
+nni_sock_getopt_recvbuf(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_buf(s->s_urq, buf, szp));
+}
+
+static int
+nni_sock_setopt_sendbuf(nni_sock *s, const void *buf, size_t sz)
+{
+ return (nni_setopt_buf(s->s_uwq, buf, sz));
+}
+
+static int
+nni_sock_getopt_sendbuf(nni_sock *s, void *buf, size_t *szp)
+{
+ return (nni_getopt_buf(s->s_uwq, buf, szp));
+}
+
+static const nni_socket_option nni_sock_options[] = {
+ {
+ .so_name = NNG_OPT_RECVTIMEO,
+ .so_getopt = nni_sock_getopt_recvtimeo,
+ .so_setopt = nni_sock_setopt_recvtimeo,
+ },
+ {
+ .so_name = NNG_OPT_SENDTIMEO,
+ .so_getopt = nni_sock_getopt_sendtimeo,
+ .so_setopt = nni_sock_setopt_sendtimeo,
+ },
+ {
+ .so_name = NNG_OPT_RECVFD,
+ .so_getopt = nni_sock_getopt_recvfd,
+ .so_setopt = NULL,
+ },
+ {
+ .so_name = NNG_OPT_SENDFD,
+ .so_getopt = nni_sock_getopt_sendfd,
+ .so_setopt = NULL,
+ },
+ {
+ .so_name = NNG_OPT_RECVBUF,
+ .so_getopt = nni_sock_getopt_recvbuf,
+ .so_setopt = nni_sock_setopt_recvbuf,
+ },
+ {
+ .so_name = NNG_OPT_SENDBUF,
+ .so_getopt = nni_sock_getopt_sendbuf,
+ .so_setopt = nni_sock_setopt_sendbuf,
+ },
+ {
+ .so_name = NNG_OPT_RECONNMINT,
+ .so_getopt = nni_sock_getopt_reconnmint,
+ .so_setopt = nni_sock_setopt_reconnmint,
+ },
+ {
+ .so_name = NNG_OPT_RECONNMAXT,
+ .so_getopt = nni_sock_getopt_reconnmaxt,
+ .so_setopt = nni_sock_setopt_reconnmaxt,
+ },
+ // terminate list
+ { NULL, NULL, NULL },
+};
+
static void
nni_free_opt(nni_sockopt *opt)
{
+ nni_strfree(opt->name);
nni_free(opt->data, opt->sz);
NNI_FREE_STRUCT(opt);
}
@@ -366,17 +521,17 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||
((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) ||
- ((rv = nni_sock_setopt(s, nng_optid_linger, &s->s_linger,
+ ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger,
sizeof(nni_duration))) != 0) ||
- ((rv = nni_sock_setopt(s, nng_optid_sendtimeo, &s->s_sndtimeo,
+ ((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo,
sizeof(nni_duration))) != 0) ||
- ((rv = nni_sock_setopt(s, nng_optid_recvtimeo, &s->s_rcvtimeo,
+ ((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo,
sizeof(nni_duration))) != 0) ||
- ((rv = nni_sock_setopt(s, nng_optid_reconnmint, &s->s_reconn,
+ ((rv = nni_sock_setopt(s, NNG_OPT_RECONNMINT, &s->s_reconn,
sizeof(nni_duration))) != 0) ||
- ((rv = nni_sock_setopt(s, nng_optid_reconnmaxt, &s->s_reconnmax,
+ ((rv = nni_sock_setopt(s, NNG_OPT_RECONNMAXT, &s->s_reconnmax,
sizeof(nni_duration))) != 0) ||
- ((rv = nni_sock_setopt(s, nng_optid_recvmaxsz, &s->s_rcvmaxsz,
+ ((rv = nni_sock_setopt(s, NNG_OPT_RECVMAXSZ, &s->s_rcvmaxsz,
sizeof(size_t))) != 0)) {
nni_sock_destroy(s);
return (rv);
@@ -749,14 +904,16 @@ nni_sock_ep_add(nni_sock *s, nni_ep *ep)
nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
+
NNI_LIST_FOREACH (&s->s_options, sopt) {
int rv;
- rv = nni_ep_setopt(ep, sopt->opt, sopt->data, sopt->sz, 0);
+ rv = nni_ep_setopt(ep, sopt->name, sopt->data, sopt->sz);
if ((rv != 0) && (rv != NNG_ENOTSUP)) {
nni_mtx_unlock(&s->s_mx);
return (rv);
}
}
+
nni_list_append(&s->s_eps, ep);
nni_mtx_unlock(&s->s_mx);
return (0);
@@ -788,41 +945,48 @@ nni_sock_senderr(nni_sock *sock, int err)
}
int
-nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
+nni_sock_setopt(nni_sock *s, const char *name, const void *val, size_t size)
{
- int rv = NNG_ENOTSUP;
- nni_ep * ep;
- int commits = 0;
- nni_sockopt *optv;
- nni_sockopt *oldv = NULL;
+ int rv = NNG_ENOTSUP;
+ nni_ep * ep;
+ int commits = 0;
+ nni_sockopt * optv;
+ nni_sockopt * oldv = NULL;
+ const nni_socket_option * sso;
+ const nni_proto_sock_option *pso;
nni_mtx_lock(&s->s_mx);
if (s->s_closing) {
nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- if (s->s_sock_ops.sock_setopt != NULL) {
- rv = s->s_sock_ops.sock_setopt(s->s_data, opt, val, size);
- if (rv != NNG_ENOTSUP) {
+
+ // Protocol options.
+ for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) {
+ if (strcmp(pso->pso_name, name) != 0) {
+ continue;
+ }
+ if (pso->pso_setopt == NULL) {
nni_mtx_unlock(&s->s_mx);
- return (rv);
+ return (NNG_EREADONLY);
}
+ rv = pso->pso_setopt(s->s_data, val, size);
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
}
- // Some options do not go down to transports. Handle them
- // directly.
- if (opt == nng_optid_reconnmint) {
- rv = nni_setopt_usec(&s->s_reconn, val, size);
- } else if (opt == nng_optid_reconnmaxt) {
- rv = nni_setopt_usec(&s->s_reconnmax, val, size);
- } else if (opt == nng_optid_sendbuf) {
- rv = nni_setopt_buf(s->s_uwq, val, size);
- } else if (opt == nng_optid_recvbuf) {
- rv = nni_setopt_buf(s->s_urq, val, size);
- } else if ((opt == nng_optid_sendfd) || (opt == nng_optid_recvfd) ||
- (opt == nng_optid_locaddr) || (opt == nng_optid_remaddr)) {
- // these options can be read, but cannot be set
- rv = NNG_EINVAL;
+ // Some options do not go down to transports. Handle them directly.
+ for (sso = nni_sock_options; sso->so_name != NULL; sso++) {
+ if (strcmp(sso->so_name, name) != 0) {
+ continue;
+ }
+ if (sso->so_setopt == NULL) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EREADONLY);
+ }
+ rv = sso->so_setopt(s, val, size);
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
}
nni_mtx_unlock(&s->s_mx);
@@ -832,20 +996,16 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
return (rv);
}
- // Validation of transport options. This is stateless, so
- // transports should not fail to set an option later if they
- // passed it here.
- rv = nni_tran_chkopt(opt, val, size);
+ // Validation of transport options. This is stateless, so transports
+ // should not fail to set an option later if they passed it here.
+ rv = nni_tran_chkopt(name, val, size);
// Also check a few generic things. We do this if no transport
- // check was found, or even if a transport rejected one of the
- // settings.
+ // was found, or even if a transport rejected one of the settings.
if ((rv == NNG_ENOTSUP) || (rv == 0)) {
- if ((opt == nng_optid_linger) ||
- (opt == nng_optid_sendtimeo) ||
- (opt == nng_optid_recvtimeo)) {
+ if ((strcmp(name, NNG_OPT_LINGER) == 0)) {
rv = nni_chkopt_usec(val, size);
- } else if (opt == nng_optid_recvmaxsz) {
+ } else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
// just a sanity test on the size; it also ensures that
// a size can be set even with no transport configured.
rv = nni_chkopt_size(val, size, 0, NNI_MAXSZ);
@@ -864,14 +1024,18 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
NNI_FREE_STRUCT(optv);
return (NNG_ENOMEM);
}
+ if ((optv->name = nni_strdup(name)) == NULL) {
+ nni_free(optv->data, size);
+ NNI_FREE_STRUCT(optv);
+ return (NNG_ENOMEM);
+ }
memcpy(optv->data, val, size);
- optv->opt = opt;
- optv->sz = size;
+ optv->sz = size;
NNI_LIST_NODE_INIT(&optv->node);
nni_mtx_lock(&s->s_mx);
NNI_LIST_FOREACH (&s->s_options, oldv) {
- if (oldv->opt == opt) {
+ if (strcmp(oldv->name, name) == 0) {
if ((oldv->sz != size) ||
(memcmp(oldv->data, val, size) != 0)) {
break;
@@ -889,7 +1053,7 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
// important that transport wide checks properly pre-validate.
NNI_LIST_FOREACH (&s->s_eps, ep) {
int x;
- x = nni_ep_setopt(ep, opt, optv->data, size, 0);
+ x = nni_ep_setopt(ep, optv->name, optv->data, size);
if (x != NNG_ENOTSUP) {
if ((rv = x) != 0) {
nni_mtx_unlock(&s->s_mx);
@@ -903,12 +1067,8 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
// behavior, we save a local value. Note that the transport
// will already have had a chance to veto this.
- if (opt == nng_optid_linger) {
+ if (strcmp(name, NNG_OPT_LINGER) == 0) {
rv = nni_setopt_usec(&s->s_linger, val, size);
- } else if (opt == nng_optid_sendtimeo) {
- rv = nni_setopt_usec(&s->s_sndtimeo, val, size);
- } else if (opt == nng_optid_recvtimeo) {
- rv = nni_setopt_usec(&s->s_rcvtimeo, val, size);
}
if (rv == 0) {
@@ -931,52 +1091,63 @@ nni_sock_setopt(nni_sock *s, int opt, const void *val, size_t size)
}
int
-nni_sock_getopt(nni_sock *s, int opt, void *val, size_t *szp)
+nni_sock_getopt(nni_sock *s, const char *name, void *val, size_t *szp)
{
- int rv = NNG_ENOTSUP;
- nni_sockopt *sopt;
+ int rv = NNG_ENOTSUP;
+ nni_sockopt * sopt;
+ int opt;
+ const nni_socket_option * sso;
+ const nni_proto_sock_option *pso;
+
+ opt = nni_option_lookup(name);
nni_mtx_lock(&s->s_mx);
if (s->s_closing) {
nni_mtx_unlock(&s->s_mx);
return (NNG_ECLOSED);
}
- if (s->s_sock_ops.sock_getopt != NULL) {
- rv = s->s_sock_ops.sock_getopt(s->s_data, opt, val, szp);
- if (rv != NNG_ENOTSUP) {
+
+ // Protocol specific options.
+ for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) {
+ if (strcmp(name, pso->pso_name) != 0) {
+ continue;
+ }
+ if (pso->pso_getopt == NULL) {
nni_mtx_unlock(&s->s_mx);
- return (rv);
+ return (NNG_EWRITEONLY);
}
+ rv = pso->pso_getopt(s->s_data, val, szp);
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
}
- // Options that are handled by socket core, and never
- // passed down.
- if (opt == nng_optid_sendbuf) {
- rv = nni_getopt_buf(s->s_uwq, val, szp);
- } else if (opt == nng_optid_recvbuf) {
- rv = nni_getopt_buf(s->s_urq, val, szp);
- } else if (opt == nng_optid_sendfd) {
- rv = nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, val, szp);
- } else if (opt == nng_optid_recvfd) {
- rv = nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, val, szp);
- } else if (opt == nng_optid_reconnmint) {
- rv = nni_getopt_usec(s->s_reconn, val, szp);
- } else if (opt == nng_optid_reconnmaxt) {
- rv = nni_getopt_usec(s->s_reconnmax, val, szp);
- } else {
- NNI_LIST_FOREACH (&s->s_options, sopt) {
- if (sopt->opt == opt) {
- size_t sz = sopt->sz;
- if (sopt->sz > *szp) {
- sz = *szp;
- }
- *szp = sopt->sz;
- memcpy(val, sopt->data, sz);
- rv = 0;
- break;
+ // Options that are handled by socket core, and never passed down.
+ for (sso = nni_sock_options; sso->so_name != NULL; sso++) {
+ if (strcmp(name, sso->so_name) != 0) {
+ continue;
+ }
+ if (sso->so_getopt == NULL) {
+ nni_mtx_unlock(&s->s_mx);
+ return (NNG_EWRITEONLY);
+ }
+ rv = sso->so_getopt(s, val, szp);
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+ }
+
+ NNI_LIST_FOREACH (&s->s_options, sopt) {
+ if (strcmp(sopt->name, name) == 0) {
+ size_t sz = sopt->sz;
+ if (sopt->sz > *szp) {
+ sz = *szp;
}
+ *szp = sopt->sz;
+ memcpy(val, sopt->data, sz);
+ rv = 0;
+ break;
}
}
+
nni_mtx_unlock(&s->s_mx);
return (rv);
}
diff --git a/src/core/socket.h b/src/core/socket.h
index 931fefac..850c4641 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -22,10 +22,10 @@ extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
extern uint16_t nni_sock_proto(nni_sock *);
extern uint16_t nni_sock_peer(nni_sock *);
-extern int nni_sock_setopt(nni_sock *, int, const void *, size_t);
-extern int nni_sock_getopt(nni_sock *, int, void *, size_t *);
-extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
-extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int);
+extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t);
+extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *);
+extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
+extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int);
extern uint32_t nni_sock_id(nni_sock *);
extern void nni_sock_lock(nni_sock *);
diff --git a/src/core/transport.c b/src/core/transport.c
index eead861b..2697ce74 100644
--- a/src/core/transport.c
+++ b/src/core/transport.c
@@ -97,19 +97,29 @@ nni_tran_find(const char *addr)
}
int
-nni_tran_chkopt(int o, const void *v, size_t sz)
+nni_tran_chkopt(const char *name, const void *v, size_t sz)
{
nni_transport *t;
int rv = NNG_ENOTSUP;
+
nni_mtx_lock(&nni_tran_lk);
NNI_LIST_FOREACH (&nni_tran_list, t) {
- int x;
- if (t->t_tran.tran_chkopt == NULL) {
- continue;
- }
- if ((x = t->t_tran.tran_chkopt(o, v, sz)) != NNG_ENOTSUP) {
- if ((rv = x) != 0) {
- break;
+ const nni_tran_ep * ep;
+ const nni_tran_ep_option *eo;
+
+ // Generally we look for endpoint options.
+ ep = t->t_tran.tran_ep;
+ for (eo = ep->ep_options; eo && eo->eo_name != NULL; eo++) {
+ if (strcmp(name, eo->eo_name) != 0) {
+ continue;
+ }
+ if (eo->eo_setopt == NULL) {
+ nni_mtx_unlock(&nni_tran_lk);
+ return (NNG_EREADONLY);
+ }
+ if ((rv = eo->eo_setopt(NULL, v, sz)) != 0) {
+ nni_mtx_unlock(&nni_tran_lk);
+ return (rv);
}
}
}
diff --git a/src/core/transport.h b/src/core/transport.h
index 2891d8a4..b82e2c92 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -29,11 +29,6 @@ struct nni_tran {
// tran_pipe links our pipe-specific operations.
const nni_tran_pipe *tran_pipe;
- // tran_chkopt, if not NULL, is used to validate that the
- // option data presented is valid. This allows an option to
- // be set on a socket, even if no endpoints are configured.
- int (*tran_chkopt)(int, const void *, size_t);
-
// tran_init, if not NULL, is called once during library
// initialization.
int (*tran_init)(void);
@@ -54,10 +49,31 @@ struct nni_tran {
#define NNI_TRANSPORT_V0 0x54520000
#define NNI_TRANSPORT_VERSION NNI_TRANSPORT_V0
+// Endpoint option handlers.
+struct nni_tran_ep_option {
+ // eo_name is the name of the option.
+ const char *eo_name;
+
+ // eo_getopt retrieves the value of the option.
+ int (*eo_getopt)(void *, void *, size_t *);
+
+ // eo_set sets the value of the option. If the first argument
+ // (the endpoint) is NULL, then no actual set operation should be
+ // performed, but the option should be sanity tested for presence
+ // and size. (This permits the core to validate that an option
+ // is reasonable and be set even before endpoints are created.)
+ int (*eo_setopt)(void *, const void *, size_t);
+};
+
// Endpoint operations are called by the socket in a protocol-independent
// fashion. The socket makes individual calls, which are expected to block
-// if appropriate (except for destroy). Endpoints are unable to call back
-// into the socket, to prevent recusive entry and deadlock.
+// if appropriate (except for destroy), or run asynchronously if an aio
+// is provided. Endpoints are unable to call back into the socket, to prevent
+// recusive entry and deadlock.
+//
+// For a given endpoint, the framework holds a lock so that each entry
+// point is run exclusively of the others. (Transports must still guard
+// against any asynchronous operations they manage themselves, though.)
struct nni_tran_ep {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
@@ -86,11 +102,20 @@ struct nni_tran_ep {
// not affect pipes that have already been created.
void (*ep_close)(void *);
- // ep_setopt sets an endpoint (transport-specific) option.
- int (*ep_setopt)(void *, int, const void *, size_t);
+ // ep_options is an array of endpoint options. The final element must
+ // have a NULL name. If this member is NULL, then no transport specific
+ // options are available.
+ nni_tran_ep_option *ep_options;
+};
+
+// Pipe option handlers. We only have get for pipes; once a pipe is created
+// no options may be set on it.
+struct nni_tran_pipe_option {
+ // po_name is the name of the option.
+ const char *po_name;
- // ep_getopt gets an endpoint (transport-specific) option.
- int (*ep_getopt)(void *, int, void *, size_t *);
+ // po_getopt retrieves the value of the option.
+ int (*po_getopt)(void *, void *, size_t *);
};
// Pipe operations are entry points called by the socket. These may be called
@@ -132,15 +157,16 @@ struct nni_tran_pipe {
// transport specific manner is appropriate.
uint16_t (*p_peer)(void *);
- // p_getopt gets an pipe (transport-specific) property. These values
- // may not be changed once the pipe is created.
- int (*p_getopt)(void *, int, void *, size_t *);
+ // p_options is an array of pipe options. The final element must have
+ // a NULL name. If this member is NULL, then no transport specific
+ // options are available.
+ nni_tran_pipe_option *p_options;
};
// These APIs are used by the framework internally, and not for use by
// transport implementations.
extern nni_tran *nni_tran_find(const char *);
-extern int nni_tran_chkopt(int, const void *, size_t);
+extern int nni_tran_chkopt(const char *, const void *, size_t);
extern int nni_tran_sys_init(void);
extern void nni_tran_sys_fini(void);
extern int nni_tran_register(const nni_tran *);