aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-08 00:33:19 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-08 00:33:19 -0700
commit50532054c0bee3a1ff3324db10f3cdf7b44041e4 (patch)
tree6e924d21e7eb188865d98e9daa203a045cc928c5 /src
parent702fe1d0af4b08a8b53172aaca57394b181d58b2 (diff)
downloadnng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.tar.gz
nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.tar.bz2
nng-50532054c0bee3a1ff3324db10f3cdf7b44041e4.zip
Added nn_compat code for option handling, fixed other bugs.
Hop counts for REQ were busted (bad TTL), and imported the compat_reqtll test. At the same time, added code to nn_term to shut down completely, discarding sockets. (Note that some things, such as globals, may still be left around; that's ok.)
Diffstat (limited to 'src')
-rw-r--r--src/core/socket.c42
-rw-r--r--src/core/socket.h3
-rw-r--r--src/nng.c6
-rw-r--r--src/nng.h4
-rw-r--r--src/nng_compat.c139
-rw-r--r--src/protocol/reqrep/rep.c2
-rw-r--r--src/protocol/reqrep/req.c11
7 files changed, 198 insertions, 9 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index e59f6042..f2dbb573 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -15,6 +15,8 @@
// Socket implementation.
static nni_objhash *nni_socks = NULL;
+static nni_list nni_sock_list;
+static nni_mtx nni_sock_lk;
uint32_t
nni_sock_id(nni_sock *s)
@@ -362,6 +364,7 @@ nni_sock_ctor(uint32_t id)
sock->s_reconnmax = 0;
sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
sock->s_id = id;
+ NNI_LIST_NODE_INIT(&sock->s_node);
nni_pipe_sock_list_init(&sock->s_pipes);
@@ -436,8 +439,12 @@ nni_sock_sys_init(void)
{
int rv;
- rv = nni_objhash_init(&nni_socks, nni_sock_ctor, nni_sock_dtor);
-
+ NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node);
+ if (((rv = nni_objhash_init(
+ &nni_socks, nni_sock_ctor, nni_sock_dtor)) != 0) ||
+ ((rv = nni_mtx_init(&nni_sock_lk)) != 0)) {
+ nni_sock_sys_fini();
+ }
return (rv);
}
@@ -446,6 +453,7 @@ nni_sock_sys_fini(void)
{
nni_objhash_fini(nni_socks);
nni_socks = NULL;
+ nni_mtx_fini(&nni_sock_lk);
}
// nn_sock_open creates the underlying socket.
@@ -512,6 +520,10 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sops->sock_open(sock->s_data);
+ nni_mtx_lock(&nni_sock_lk);
+ nni_list_append(&nni_sock_list, sock);
+ nni_mtx_unlock(&nni_sock_lk);
+
*sockp = sock;
return (0);
}
@@ -657,6 +669,10 @@ nni_sock_close(nni_sock *sock)
sock->s_closed = 1;
nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_lock(&nni_sock_lk);
+ nni_list_node_remove(&sock->s_node);
+ nni_mtx_unlock(&nni_sock_lk);
+
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller
// to have concurrent threads using this. We've taken care to
@@ -672,6 +688,28 @@ nni_sock_close(nni_sock *sock)
nni_objhash_unref_wait(nni_socks, sock->s_id);
}
+void
+nni_sock_closeall(void)
+{
+ nni_sock *s;
+ uint32_t id;
+
+ for (;;) {
+ nni_mtx_lock(&nni_sock_lk);
+ if ((s = nni_list_first(&nni_sock_list)) == NULL) {
+ nni_mtx_unlock(&nni_sock_lk);
+ return;
+ }
+ id = s->s_id;
+ nni_list_node_remove(&s->s_node);
+ nni_mtx_unlock(&nni_sock_lk);
+
+ if (nni_sock_find(&s, id) == 0) {
+ nni_sock_close(s);
+ }
+ }
+}
+
int
nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 41dfbc33..54c1b4da 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -23,6 +23,8 @@ struct nni_socket {
nni_msgq *s_uwq; // Upper write queue
nni_msgq *s_urq; // Upper read queue
+ nni_list_node s_node;
+
uint16_t s_protocol;
uint16_t s_peer;
uint32_t s_flags;
@@ -67,6 +69,7 @@ extern int nni_sock_find(nni_sock **, uint32_t);
extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);
+extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
extern uint16_t nni_sock_proto(nni_sock *);
extern uint16_t nni_sock_peer(nni_sock *);
diff --git a/src/nng.c b/src/nng.c
index ac84012a..d5cc4f2b 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -73,6 +73,12 @@ nng_close(nng_socket sid)
return (rv);
}
+void
+nng_closeall(void)
+{
+ nni_sock_closeall();
+}
+
uint16_t
nng_protocol(nng_socket sid)
{
diff --git a/src/nng.h b/src/nng.h
index 48a04ae1..aac7ec89 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -75,6 +75,10 @@ NNG_DECL void nng_fini(void);
// pipes associated with the socket.
NNG_DECL int nng_close(nng_socket);
+// nng_closeall closes all open sockets. Do not call this from
+// a library; it will affect all sockets.
+NNG_DECL void nng_closeall(void);
+
// nng_shutdown shuts down the socket. This causes any threads doing
// work for the socket or blocked in socket functions to be woken (and
// return NNG_ECLOSED). The socket resources are still present, so it
diff --git a/src/nng_compat.c b/src/nng_compat.c
index 160fd33d..88e2f69c 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -60,7 +60,7 @@ nn_strerror(int err)
return ("Unknown I/O error");
}
- // Arguablye we could use strerror() here, but we should only
+ // Arguably we could use strerror() here, but we should only
// be getting errnos we understand at this point.
(void) snprintf(msgbuf, sizeof(msgbuf), "Unknown error %d", err);
return (msgbuf);
@@ -536,6 +536,133 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
}
int
+nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp)
+{
+ int opt = 0;
+ int mscvt = 0;
+ uint64_t usec;
+ int * msecp;
+ int rv;
+
+ switch (nnlevel) {
+ case NN_SOL_SOCKET:
+ switch (nnopt) {
+ case NN_LINGER:
+ opt = NNG_OPT_LINGER;
+ break;
+ case NN_SNDBUF:
+ opt = NNG_OPT_SNDBUF;
+ break;
+ case NN_RCVBUF:
+ opt = NNG_OPT_RCVBUF;
+ break;
+ case NN_RECONNECT_IVL:
+ opt = NNG_OPT_RECONN_TIME;
+ mscvt = 1;
+ break;
+ case NN_RECONNECT_IVL_MAX:
+ opt = NNG_OPT_RECONN_MAXTIME;
+ mscvt = 1;
+ break;
+ case NN_SNDFD:
+ opt = NNG_OPT_SNDFD;
+ break;
+ case NN_RCVFD:
+ opt = NNG_OPT_RCVFD;
+ break;
+ case NN_RCVMAXSIZE:
+ opt = NNG_OPT_RCVMAXSZ;
+ break;
+ case NN_MAXTTL:
+ opt = NNG_OPT_MAXTTL;
+ break;
+ case NN_RCVTIMEO:
+ opt = NNG_OPT_RCVTIMEO;
+ mscvt = 1;
+ break;
+ case NN_SNDTIMEO:
+ opt = NNG_OPT_SNDTIMEO;
+ mscvt = 1;
+ break;
+ case NN_DOMAIN:
+ case NN_PROTOCOL:
+ case NN_IPV4ONLY:
+ case NN_SOCKET_NAME:
+ case NN_SNDPRIO:
+ case NN_RCVPRIO:
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+
+ break;
+ }
+ break;
+ case NN_REQ:
+ switch (nnopt) {
+ case NN_REQ_RESEND_IVL:
+ opt = NNG_OPT_RESENDTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SUB:
+ switch (nnopt) {
+ case NN_SUB_SUBSCRIBE:
+ opt = NNG_OPT_SUBSCRIBE;
+ break;
+ case NN_SUB_UNSUBSCRIBE:
+ opt = NNG_OPT_UNSUBSCRIBE;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ case NN_SURVEYOR:
+ switch (nnopt) {
+ case NN_SURVEYOR_DEADLINE:
+ opt = NNG_OPT_SURVEYTIME;
+ mscvt = 1;
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+ break;
+ default:
+ errno = ENOPROTOOPT;
+ return (-1);
+ }
+
+ if (mscvt) {
+ if (*szp != sizeof(int)) {
+ errno = EINVAL;
+ return (-1);
+ }
+
+ msecp = valp;
+ valp = &usec;
+ *szp = sizeof(uint64_t);
+ }
+
+ if ((rv = nng_getopt((nng_socket) s, opt, valp, szp)) != 0) {
+ nn_seterror(rv);
+ return (-1);
+ }
+
+ if (mscvt) {
+ // We have to convert value to ms...
+ *msecp = (usec / 1000);
+ *szp = sizeof(int);
+ }
+
+ return (0);
+}
+
+int
nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz)
{
int opt = 0;
@@ -703,11 +830,11 @@ nn_device(int s1, int s2)
void
nn_term(void)
{
- // XXX: Implement something to do something. Probably we
- // should go through the nni_sockets idhash and clobber all
- // of the sockets. This function is relatively toxic, since
- // it can affect all sockets in the process, including those
- // in use by libraries, etc.
+ // This function is relatively toxic, since it can affect
+ // all sockets in the process, including those
+ // in use by libraries, etc. Accordingly, do not use this
+ // in a library -- only e.g. atexit() and similar.
+ nng_closeall();
}
// Internal test support routines.
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 049b1422..c7546182 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -312,7 +312,7 @@ nni_rep_pipe_recv_cb(void *arg)
}
// Move backtrace from body to header
- hops = 0;
+ hops = 1;
for (;;) {
int end = 0;
if (hops >= rep->ttl) {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 8e7056f5..7ec53c90 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -35,6 +35,7 @@ struct nni_req_sock {
int raw;
int wantw;
int closed;
+ int ttl;
nni_msg * reqmsg;
nni_req_pipe *pendpipe;
@@ -101,6 +102,7 @@ nni_req_sock_init(void **reqp, nni_sock *sock)
req->raw = 0;
req->wantw = 0;
req->resend = NNI_TIME_ZERO;
+ req->ttl = 8;
req->uwq = nni_sock_sendq(sock);
req->urq = nni_sock_recvq(sock);
@@ -269,6 +271,12 @@ nni_req_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
break;
case NNG_OPT_RAW:
rv = nni_setopt_int(&req->raw, buf, sz, 0, 1);
+ if (rv == 0) {
+ nni_sock_recverr(req->sock, req->raw ? 0 : NNG_ESTATE);
+ }
+ break;
+ case NNG_OPT_MAXTTL:
+ rv = nni_setopt_int(&req->ttl, buf, sz, 1, 255);
break;
default:
rv = NNG_ENOTSUP;
@@ -289,6 +297,9 @@ nni_req_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
case NNG_OPT_RAW:
rv = nni_getopt_int(&req->raw, buf, szp);
break;
+ case NNG_OPT_MAXTTL:
+ rv = nni_getopt_int(&req->ttl, buf, szp);
+ break;
default:
rv = NNG_ENOTSUP;
}