aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-24 19:26:31 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-24 19:42:43 -0800
commite88f07b434dbcfdb57435a14e1beadcdae3cef0d (patch)
tree79433ae3c4ed6d2501e4d63ad9ada5a621df3bd9 /src
parent907a1eb392ca4b29c62b9cc3d2df1ad337695abf (diff)
downloadnng-e88f07b434dbcfdb57435a14e1beadcdae3cef0d.tar.gz
nng-e88f07b434dbcfdb57435a14e1beadcdae3cef0d.tar.bz2
nng-e88f07b434dbcfdb57435a14e1beadcdae3cef0d.zip
Add endpoint tuning of maxrcv size. Fix cmsg API.
The CMSG handling was completely borked. This is fixed now, and we stash the SP header size (ugh) in the CMSG contents to match what nanomsg does. We now pass the cmsg validation test. We also fixed handling of certain endpoint-related options, so that endpoints can get options from the socket at initialization time. This required a minor change to the transport API for endpoints. Finally, we fixed a critical fault in the REP handling of RAW sockets, which caused them to always return NNG_ESTATE in all cases. It should now honor the actual socket option.
Diffstat (limited to 'src')
-rw-r--r--src/core/endpt.c2
-rw-r--r--src/core/options.c35
-rw-r--r--src/core/options.h11
-rw-r--r--src/core/socket.c25
-rw-r--r--src/core/socket.h5
-rw-r--r--src/core/transport.h2
-rw-r--r--src/nng.c2
-rw-r--r--src/nng_compat.c93
-rw-r--r--src/nng_compat.h13
-rw-r--r--src/protocol/reqrep/rep.c1
-rw-r--r--src/transport/inproc/inproc.c6
-rw-r--r--src/transport/ipc/ipc.c12
-rw-r--r--src/transport/tcp/tcp.c14
13 files changed, 182 insertions, 39 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index c0890cd3..9411c220 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -131,7 +131,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
return (NNG_ECLOSED);
}
- rv = ep->ep_ops.ep_init(&ep->ep_data, addr, nni_sock_proto(sock));
+ rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock);
if (rv != 0) {
nni_mtx_unlock(&sock->s_mx);
nni_cv_fini(&ep->ep_cv);
diff --git a/src/core/options.c b/src/core/options.c
index a0b74014..cd67e0d0 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -49,6 +49,27 @@ nni_setopt_int(int *ptr, const void *val, size_t size, int minval, int maxval)
int
+nni_setopt_size(size_t *ptr, const void *val, size_t size, size_t minval,
+ size_t maxval)
+{
+ int v;
+
+ if (size != sizeof (v)) {
+ return (NNG_EINVAL);
+ }
+ memcpy(&v, val, sizeof (v));
+ if (v > maxval) {
+ return (NNG_EINVAL);
+ }
+ if (v < minval) {
+ return (NNG_EINVAL);
+ }
+ *ptr = v;
+ return (0);
+}
+
+
+int
nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
{
size_t sz = sizeof (*ptr);
@@ -77,6 +98,20 @@ nni_getopt_int(int *ptr, void *val, size_t *sizep)
int
+nni_getopt_size(size_t *ptr, void *val, size_t *sizep)
+{
+ size_t sz = sizeof (*ptr);
+
+ if (sz > *sizep) {
+ sz = *sizep;
+ }
+ *sizep = sizeof (*ptr);
+ memcpy(val, ptr, sz);
+ return (0);
+}
+
+
+int
nni_setopt_buf(nni_msgq *mq, const void *val, size_t sz)
{
int len;
diff --git a/src/core/options.h b/src/core/options.h
index 2d843a4c..ec5cce90 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -37,6 +37,17 @@ extern int nni_setopt_int(int *, const void *, size_t, int, int);
// nni_getopt_int gets an integer.
extern int nni_getopt_int(int *, void *, size_t *);
+// nni_setopt_size sets a size_t option.
+extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t);
+
+// We limit the maximum size to 4GB. That's intentional because some of the
+// underlying protocols cannot cope with anything bigger than 32-bits.
+#define NNI_MINSZ (0)
+#define NNI_MAXSZ ((size_t) 0xffffffff)
+
+// nni_getopt_size obtains a size_t option.
+extern int nni_getopt_size(size_t *, void *, size_t *);
+
// nni_getopt_fd obtains a notification file descriptor.
extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *);
diff --git a/src/core/socket.c b/src/core/socket.c
index b0ce172f..6ccf3025 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -102,9 +102,6 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id)
}
-// XXX: don't expose the upper queues to protocols, because we need to
-// trap on activity in those queues!
-
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
@@ -276,6 +273,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = 0;
sock->s_reapexit = 0;
+ sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
@@ -684,6 +682,20 @@ nni_sock_peer(nni_sock *sock)
}
+nni_duration
+nni_sock_linger(nni_sock *sock)
+{
+ return (sock->s_linger);
+}
+
+
+size_t
+nni_sock_rcvmaxsz(nni_sock *sock)
+{
+ return (sock->s_rcvmaxsz);
+}
+
+
int
nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
{
@@ -775,6 +787,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
case NNG_OPT_RCVBUF:
rv = nni_setopt_buf(sock->s_urq, val, size);
break;
+ case NNG_OPT_RCVMAXSZ:
+ rv = nni_setopt_size(&sock->s_rcvmaxsz, val, size, 0,
+ NNI_MAXSZ);
+ break;
}
nni_mtx_unlock(&sock->s_mx);
return (rv);
@@ -818,6 +834,9 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RCVBUF:
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
+ case NNG_OPT_RCVMAXSZ:
+ rv = nni_getopt_size(&sock->s_rcvmaxsz, val, sizep);
+ break;
case NNG_OPT_SNDFD:
rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SND,
val, sizep);
diff --git a/src/core/socket.h b/src/core/socket.h
index 42f42371..df11ded6 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -48,6 +48,8 @@ struct nni_socket {
nni_cv s_notify_cv; // wakes notify thread
nni_mtx s_notify_mx; // protects s_notify list
+ size_t s_rcvmaxsz; // maximum receive size
+
nni_list s_reaps; // pipes to reap
nni_thr s_reaper;
nni_thr s_notifier;
@@ -112,4 +114,7 @@ extern nni_msgq *nni_sock_recvq(nni_sock *);
// here so that protocols can use it to initialize condvars.
extern nni_mtx *nni_sock_mtx(nni_sock *);
+extern nni_duration nni_sock_linger(nni_sock *);
+extern size_t nni_sock_rcvmaxsz(nni_sock *);
+
#endif // CORE_SOCKET_H
diff --git a/src/core/transport.h b/src/core/transport.h
index 8544f097..c74ec497 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -40,7 +40,7 @@ struct nni_tran {
struct nni_tran_ep {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
- int (*ep_init)(void **, const char *, uint16_t);
+ int (*ep_init)(void **, const char *, nni_sock *);
// ep_fini frees the resources associated with the endpoint.
// The endpoint will already have been closed.
diff --git a/src/nng.c b/src/nng.c
index a534b1bc..bb36a236 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -336,7 +336,7 @@ nng_unsetnotify(nng_socket sid, nng_notify *notify)
nng_socket
nng_event_socket(nng_event *ev)
{
- // FOR NOW.... maybe evnet should contain socket Id instead?
+ // XXX: FOR NOW.... maybe evnet should contain socket Id instead?
return (nni_sock_id(ev->e_sock));
}
diff --git a/src/nng_compat.c b/src/nng_compat.c
index fd43af29..04759e01 100644
--- a/src/nng_compat.c
+++ b/src/nng_compat.c
@@ -401,9 +401,12 @@ nn_recvmsg(int s, struct nn_msghdr *mh, int flags)
char *cdata;
size_t clen;
size_t tlen;
+ size_t spsz;
struct nn_cmsghdr *hdr;
+ unsigned char *ptr;
- clen = NN_CMSG_SPACE(nng_msg_header_len(msg));
+ spsz = nng_msg_header_len(msg);
+ clen = NN_CMSG_SPACE(sizeof (spsz) + spsz);
if ((tlen = mh->msg_controllen) == NN_MSG) {
// Ideally we'd use the same msg, but we would need
@@ -429,13 +432,15 @@ nn_recvmsg(int s, struct nn_msghdr *mh, int flags)
}
if (clen <= tlen) {
+ ptr = NN_CMSG_DATA(cdata);
hdr = (void *) cdata;
- hdr->cmsg_len = nng_msg_header_len(msg);
+ hdr->cmsg_len = clen;
hdr->cmsg_level = PROTO_SP;
hdr->cmsg_type = SP_HDR;
- memcpy(NN_CMSG_DATA(cdata), nng_msg_header(msg),
- nng_msg_header_len(msg));
+ memcpy(ptr, &spsz, sizeof (spsz));
+ ptr += sizeof (spsz);
+ memcpy(ptr, nng_msg_header(msg), spsz);
}
}
@@ -451,7 +456,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
{
nng_msg *msg = NULL;
nng_msg *cmsg = NULL;
- char *cdata = NULL;
+ char *cdata;
int keep = 0;
size_t sz;
int rv;
@@ -497,9 +502,16 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
}
// Now suck up the control data...
+ // This POSIX-inspired API is one of the most painful for
+ // usability we've ever seen.
cmsg = NULL;
if ((cdata = mh->msg_control) != NULL) {
size_t clen;
+ size_t offs;
+ size_t spsz;
+ struct nn_cmsghdr *chdr;
+ unsigned char *data;
+
if ((clen = mh->msg_controllen) == NN_MSG) {
// Underlying data is a message. This is awkward,
// because we have to copy the data, but we should
@@ -508,13 +520,42 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags)
cdata = *(void **) cdata;
cmsg = *(nng_msg **) (cdata - sizeof (cmsg));
clen = nng_msg_len(cmsg);
+ } else {
+ clen = mh->msg_controllen;
}
- if ((rv = nng_msg_append_header(msg, cdata, clen)) != 0) {
- if (!keep) {
- nng_msg_free(msg);
+
+ offs = 0;
+ while ((offs + sizeof (NN_CMSG_LEN(0))) < clen) {
+ chdr = (void *) (cdata + offs);
+ if ((chdr->cmsg_level != PROTO_SP) ||
+ (chdr->cmsg_type != SP_HDR)) {
+ offs += chdr->cmsg_len;
}
- nn_seterror(rv);
- return (-1);
+
+ // SP header in theory. Starts with size, then
+ // any backtrace details.
+ if (chdr->cmsg_len < sizeof (size_t)) {
+ offs += chdr->cmsg_len;
+ continue;
+ }
+ data = NN_CMSG_DATA(chdr);
+ memcpy(&spsz, data, sizeof (spsz));
+ if ((spsz + sizeof (spsz)) > chdr->cmsg_len) {
+ // Truncated header? Ignore it.
+ offs += chdr->cmsg_len;
+ continue;
+ }
+ data += sizeof (spsz);
+ rv = nng_msg_append_header(msg, data, spsz);
+ if (rv != 0) {
+ if (!keep) {
+ nng_msg_free(msg);
+ }
+ nn_seterror(rv);
+ return (-1);
+ }
+
+ break;
}
}
@@ -654,3 +695,35 @@ nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz)
}
return (0);
}
+
+
+struct nn_cmsghdr *
+nn_cmsg_next(struct nn_msghdr *mh, struct nn_cmsghdr *first)
+{
+ size_t clen;
+ char *data;
+
+ // We only support SP headers, so there can be at most one header.
+ if (first != NULL) {
+ return (NULL);
+ }
+ if ((clen = mh->msg_controllen) == NN_MSG) {
+ nng_msg *msg;
+ data = *((void **) (mh->msg_control));
+ msg = *(nng_msg **) (data - sizeof (msg));
+ clen = nng_msg_len(msg);
+ } else {
+ data = mh->msg_control;
+ }
+
+ if (first == NULL) {
+ first = (void *) data;
+ } else {
+ first = first + first->cmsg_len;
+ }
+
+ if (((char *) first + sizeof (*first)) > (data + clen)) {
+ return (NULL);
+ }
+ return (first);
+}
diff --git a/src/nng_compat.h b/src/nng_compat.h
index 64ace069..5866fe2c 100644
--- a/src/nng_compat.h
+++ b/src/nng_compat.h
@@ -258,7 +258,7 @@ struct nn_cmsghdr {
int cmsg_type;
};
-#define NN_ALIGN(len) \
+#define NN_CMSG_ALIGN(len) \
(((len) + sizeof (size_t) - 1) & (size_t) ~(sizeof (size_t) - 1))
// Unlike old nanomsg, we explicitly only support the SP header as attached
@@ -268,16 +268,17 @@ struct nn_cmsghdr {
// anyone used that in practice though.)
#define NN_CMSG_FIRSTHDR(mh) \
nn_cmsg_next((struct nn_msghdr *) (mh), NULL)
-#define NN_CMSG_NEXTHDR(mh, ch) \
+#define NN_CMSG_NXTHDR(mh, ch) \
nn_cmsg_next((struct nn_msghdr *) (mh), (struct nn_cmsghdr *) ch)
#define NN_CMSG_DATA(ch) \
((unsigned char *) (((struct nn_cmsghdr *) (ch)) + 1))
#define NN_CMSG_SPACE(len) \
- (NN_ALIGN(len) + NN_ALIGN(sizeof (struct nn_cmsghdr)))
+ (NN_CMSG_ALIGN(len) + NN_CMSG_ALIGN(sizeof (struct nn_cmsghdr)))
#define NN_CMSG_LEN(len) \
- (NN_ALIGN(sizeof (nn_cmsghdr)) + (len))
+ (NN_CMSG_ALIGN(sizeof (struct nn_cmsghdr)) + (len))
-NN_DECL struct cmsg_hdr *nn_cmsg_next(struct nn_msghdr *, struct nn_cmsghdr *);
+NN_DECL struct nn_cmsghdr *nn_cmsg_next(struct nn_msghdr *,
+ struct nn_cmsghdr *);
NN_DECL int nn_socket(int, int);
NN_DECL int nn_setsockopt(int, int, int, const void *, size_t);
NN_DECL int nn_getsockopt(int, int, int, void *, size_t *);
@@ -287,7 +288,7 @@ NN_DECL int nn_shutdown(int, int);
NN_DECL int nn_send(int, const void *, size_t, int);
NN_DECL int nn_recv(int, void *, size_t, int);
NN_DECL int nn_sendmsg(int, const struct nn_msghdr *, int);
-NN_DECL int nn_recvcmsg(int, struct nn_msghdr *, int);
+NN_DECL int nn_recvmsg(int, struct nn_msghdr *, int);
NN_DECL int nn_close(int);
NN_DECL int nn_poll(struct nn_pollfd *, int, int);
NN_DECL int nn_device(int, int);
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 40ee52fc..93bc3d13 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -294,6 +294,7 @@ nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
break;
case NNG_OPT_RAW:
rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1);
+ nni_sock_senderr(rep->sock, rep->raw ? 0 : NNG_ESTATE);
break;
default:
rv = NNG_ENOTSUP;
diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c
index d6b1ac74..66f076ea 100644
--- a/src/transport/inproc/inproc.c
+++ b/src/transport/inproc/inproc.c
@@ -189,7 +189,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
static int
-nni_inproc_ep_init(void **epp, const char *url, uint16_t proto)
+nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock)
{
nni_inproc_ep *ep;
int rv;
@@ -207,7 +207,7 @@ nni_inproc_ep_init(void **epp, const char *url, uint16_t proto)
ep->mode = NNI_INPROC_EP_IDLE;
ep->closed = 0;
- ep->proto = proto;
+ ep->proto = nni_sock_proto(sock);
NNI_LIST_NODE_INIT(&ep->node);
NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node);
@@ -291,8 +291,6 @@ nni_inproc_ep_connect(void *arg, void **pipep)
return (NNG_ECONNREFUSED);
}
- // XXX check protocol peer validity...
-
ep->mode = NNI_INPROC_EP_DIAL;
nni_list_append(&server->clients, ep);
nni_cv_wake(&server->cv);
diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c
index 3aef5363..4673a6dc 100644
--- a/src/transport/ipc/ipc.c
+++ b/src/transport/ipc/ipc.c
@@ -26,7 +26,7 @@ struct nni_ipc_pipe {
nni_plat_ipcsock fd;
uint16_t peer;
uint16_t proto;
- uint32_t rcvmax;
+ size_t rcvmax;
};
struct nni_ipc_ep {
@@ -34,7 +34,7 @@ struct nni_ipc_ep {
nni_plat_ipcsock fd;
int closed;
uint16_t proto;
- uint32_t rcvmax;
+ size_t rcvmax;
};
static int
@@ -123,7 +123,7 @@ nni_ipc_pipe_recv(void *arg, nni_msg **msgp)
}
NNI_GET64(buf, len);
if (len > pipe->rcvmax) {
- return (NNG_EPROTO);
+ return (NNG_EMSGSIZE);
}
if ((rv = nng_msg_alloc(&msg, (size_t) len)) != 0) {
@@ -176,7 +176,7 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
static int
-nni_ipc_ep_init(void **epp, const char *url, uint16_t proto)
+nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock)
{
nni_ipc_ep *ep;
int rv;
@@ -188,8 +188,8 @@ nni_ipc_ep_init(void **epp, const char *url, uint16_t proto)
return (NNG_ENOMEM);
}
ep->closed = 0;
- ep->proto = proto;
- ep->rcvmax = 1024 * 1024; // XXX: fix this
+ ep->proto = nni_sock_proto(sock);
+ ep->rcvmax = nni_sock_rcvmaxsz(sock);
if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) {
NNI_FREE_STRUCT(ep);
return (rv);
diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c
index d18fd289..ad2d398a 100644
--- a/src/transport/tcp/tcp.c
+++ b/src/transport/tcp/tcp.c
@@ -25,7 +25,7 @@ struct nni_tcp_pipe {
nni_plat_tcpsock fd;
uint16_t peer;
uint16_t proto;
- uint32_t rcvmax;
+ size_t rcvmax;
};
struct nni_tcp_ep {
@@ -33,7 +33,7 @@ struct nni_tcp_ep {
nni_plat_tcpsock fd;
int closed;
uint16_t proto;
- uint32_t rcvmax;
+ size_t rcvmax;
int ipv4only;
};
@@ -112,7 +112,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp)
}
NNI_GET64(buf, len);
if (len > pipe->rcvmax) {
- return (NNG_EPROTO);
+ return (NNG_EMSGSIZE);
}
if ((rv = nng_msg_alloc(&msg, (size_t) len)) != 0) {
@@ -165,7 +165,7 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp)
static int
-nni_tcp_ep_init(void **epp, const char *url, uint16_t proto)
+nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock)
{
nni_tcp_ep *ep;
int rv;
@@ -177,9 +177,9 @@ nni_tcp_ep_init(void **epp, const char *url, uint16_t proto)
return (NNG_ENOMEM);
}
ep->closed = 0;
- ep->proto = proto;
- ep->ipv4only = 0;
- ep->rcvmax = 1024 * 1024; // XXX: fix this
+ ep->proto = nni_sock_proto(sock);
+ ep->ipv4only = 0; // XXX: FIXME
+ ep->rcvmax = nni_sock_rcvmaxsz(sock);
if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) {
NNI_FREE_STRUCT(ep);