diff options
| -rw-r--r-- | src/core/endpt.c | 2 | ||||
| -rw-r--r-- | src/core/options.c | 35 | ||||
| -rw-r--r-- | src/core/options.h | 11 | ||||
| -rw-r--r-- | src/core/socket.c | 25 | ||||
| -rw-r--r-- | src/core/socket.h | 5 | ||||
| -rw-r--r-- | src/core/transport.h | 2 | ||||
| -rw-r--r-- | src/nng.c | 2 | ||||
| -rw-r--r-- | src/nng_compat.c | 93 | ||||
| -rw-r--r-- | src/nng_compat.h | 13 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 1 | ||||
| -rw-r--r-- | src/transport/inproc/inproc.c | 6 | ||||
| -rw-r--r-- | src/transport/ipc/ipc.c | 12 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 14 | ||||
| -rw-r--r-- | tests/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | tests/compat_cmsg.c | 114 |
15 files changed, 297 insertions, 39 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c index c0890cd3..9411c220 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -131,7 +131,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr) return (NNG_ECLOSED); } - rv = ep->ep_ops.ep_init(&ep->ep_data, addr, nni_sock_proto(sock)); + rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock); if (rv != 0) { nni_mtx_unlock(&sock->s_mx); nni_cv_fini(&ep->ep_cv); diff --git a/src/core/options.c b/src/core/options.c index a0b74014..cd67e0d0 100644 --- a/src/core/options.c +++ b/src/core/options.c @@ -49,6 +49,27 @@ nni_setopt_int(int *ptr, const void *val, size_t size, int minval, int maxval) int +nni_setopt_size(size_t *ptr, const void *val, size_t size, size_t minval, + size_t maxval) +{ + int v; + + if (size != sizeof (v)) { + return (NNG_EINVAL); + } + memcpy(&v, val, sizeof (v)); + if (v > maxval) { + return (NNG_EINVAL); + } + if (v < minval) { + return (NNG_EINVAL); + } + *ptr = v; + return (0); +} + + +int nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep) { size_t sz = sizeof (*ptr); @@ -77,6 +98,20 @@ nni_getopt_int(int *ptr, void *val, size_t *sizep) int +nni_getopt_size(size_t *ptr, void *val, size_t *sizep) +{ + size_t sz = sizeof (*ptr); + + if (sz > *sizep) { + sz = *sizep; + } + *sizep = sizeof (*ptr); + memcpy(val, ptr, sz); + return (0); +} + + +int nni_setopt_buf(nni_msgq *mq, const void *val, size_t sz) { int len; diff --git a/src/core/options.h b/src/core/options.h index 2d843a4c..ec5cce90 100644 --- a/src/core/options.h +++ b/src/core/options.h @@ -37,6 +37,17 @@ extern int nni_setopt_int(int *, const void *, size_t, int, int); // nni_getopt_int gets an integer. extern int nni_getopt_int(int *, void *, size_t *); +// nni_setopt_size sets a size_t option. +extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t); + +// We limit the maximum size to 4GB. That's intentional because some of the +// underlying protocols cannot cope with anything bigger than 32-bits. +#define NNI_MINSZ (0) +#define NNI_MAXSZ ((size_t) 0xffffffff) + +// nni_getopt_size obtains a size_t option. +extern int nni_getopt_size(size_t *, void *, size_t *); + // nni_getopt_fd obtains a notification file descriptor. extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *); diff --git a/src/core/socket.c b/src/core/socket.c index b0ce172f..6ccf3025 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -102,9 +102,6 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id) } -// XXX: don't expose the upper queues to protocols, because we need to -// trap on activity in those queues! - // Because we have to call back into the socket, and possibly also the proto, // and wait for threads to terminate, we do this in a special thread. The // assumption is that closing is always a "fast" operation. @@ -276,6 +273,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) sock->s_reconn = NNI_SECOND; sock->s_reconnmax = 0; sock->s_reapexit = 0; + sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); @@ -684,6 +682,20 @@ nni_sock_peer(nni_sock *sock) } +nni_duration +nni_sock_linger(nni_sock *sock) +{ + return (sock->s_linger); +} + + +size_t +nni_sock_rcvmaxsz(nni_sock *sock) +{ + return (sock->s_rcvmaxsz); +} + + int nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags) { @@ -775,6 +787,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) case NNG_OPT_RCVBUF: rv = nni_setopt_buf(sock->s_urq, val, size); break; + case NNG_OPT_RCVMAXSZ: + rv = nni_setopt_size(&sock->s_rcvmaxsz, val, size, 0, + NNI_MAXSZ); + break; } nni_mtx_unlock(&sock->s_mx); return (rv); @@ -818,6 +834,9 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) case NNG_OPT_RCVBUF: rv = nni_getopt_buf(sock->s_urq, val, sizep); break; + case NNG_OPT_RCVMAXSZ: + rv = nni_getopt_size(&sock->s_rcvmaxsz, val, sizep); + break; case NNG_OPT_SNDFD: rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SND, val, sizep); diff --git a/src/core/socket.h b/src/core/socket.h index 42f42371..df11ded6 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -48,6 +48,8 @@ struct nni_socket { nni_cv s_notify_cv; // wakes notify thread nni_mtx s_notify_mx; // protects s_notify list + size_t s_rcvmaxsz; // maximum receive size + nni_list s_reaps; // pipes to reap nni_thr s_reaper; nni_thr s_notifier; @@ -112,4 +114,7 @@ extern nni_msgq *nni_sock_recvq(nni_sock *); // here so that protocols can use it to initialize condvars. extern nni_mtx *nni_sock_mtx(nni_sock *); +extern nni_duration nni_sock_linger(nni_sock *); +extern size_t nni_sock_rcvmaxsz(nni_sock *); + #endif // CORE_SOCKET_H diff --git a/src/core/transport.h b/src/core/transport.h index 8544f097..c74ec497 100644 --- a/src/core/transport.h +++ b/src/core/transport.h @@ -40,7 +40,7 @@ struct nni_tran { struct nni_tran_ep { // ep_init creates a vanilla endpoint. The value created is // used for the first argument for all other endpoint functions. - int (*ep_init)(void **, const char *, uint16_t); + int (*ep_init)(void **, const char *, nni_sock *); // ep_fini frees the resources associated with the endpoint. // The endpoint will already have been closed. @@ -336,7 +336,7 @@ nng_unsetnotify(nng_socket sid, nng_notify *notify) nng_socket nng_event_socket(nng_event *ev) { - // FOR NOW.... maybe evnet should contain socket Id instead? + // XXX: FOR NOW.... maybe evnet should contain socket Id instead? return (nni_sock_id(ev->e_sock)); } diff --git a/src/nng_compat.c b/src/nng_compat.c index fd43af29..04759e01 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -401,9 +401,12 @@ nn_recvmsg(int s, struct nn_msghdr *mh, int flags) char *cdata; size_t clen; size_t tlen; + size_t spsz; struct nn_cmsghdr *hdr; + unsigned char *ptr; - clen = NN_CMSG_SPACE(nng_msg_header_len(msg)); + spsz = nng_msg_header_len(msg); + clen = NN_CMSG_SPACE(sizeof (spsz) + spsz); if ((tlen = mh->msg_controllen) == NN_MSG) { // Ideally we'd use the same msg, but we would need @@ -429,13 +432,15 @@ nn_recvmsg(int s, struct nn_msghdr *mh, int flags) } if (clen <= tlen) { + ptr = NN_CMSG_DATA(cdata); hdr = (void *) cdata; - hdr->cmsg_len = nng_msg_header_len(msg); + hdr->cmsg_len = clen; hdr->cmsg_level = PROTO_SP; hdr->cmsg_type = SP_HDR; - memcpy(NN_CMSG_DATA(cdata), nng_msg_header(msg), - nng_msg_header_len(msg)); + memcpy(ptr, &spsz, sizeof (spsz)); + ptr += sizeof (spsz); + memcpy(ptr, nng_msg_header(msg), spsz); } } @@ -451,7 +456,7 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) { nng_msg *msg = NULL; nng_msg *cmsg = NULL; - char *cdata = NULL; + char *cdata; int keep = 0; size_t sz; int rv; @@ -497,9 +502,16 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) } // Now suck up the control data... + // This POSIX-inspired API is one of the most painful for + // usability we've ever seen. cmsg = NULL; if ((cdata = mh->msg_control) != NULL) { size_t clen; + size_t offs; + size_t spsz; + struct nn_cmsghdr *chdr; + unsigned char *data; + if ((clen = mh->msg_controllen) == NN_MSG) { // Underlying data is a message. This is awkward, // because we have to copy the data, but we should @@ -508,13 +520,42 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) cdata = *(void **) cdata; cmsg = *(nng_msg **) (cdata - sizeof (cmsg)); clen = nng_msg_len(cmsg); + } else { + clen = mh->msg_controllen; } - if ((rv = nng_msg_append_header(msg, cdata, clen)) != 0) { - if (!keep) { - nng_msg_free(msg); + + offs = 0; + while ((offs + sizeof (NN_CMSG_LEN(0))) < clen) { + chdr = (void *) (cdata + offs); + if ((chdr->cmsg_level != PROTO_SP) || + (chdr->cmsg_type != SP_HDR)) { + offs += chdr->cmsg_len; } - nn_seterror(rv); - return (-1); + + // SP header in theory. Starts with size, then + // any backtrace details. + if (chdr->cmsg_len < sizeof (size_t)) { + offs += chdr->cmsg_len; + continue; + } + data = NN_CMSG_DATA(chdr); + memcpy(&spsz, data, sizeof (spsz)); + if ((spsz + sizeof (spsz)) > chdr->cmsg_len) { + // Truncated header? Ignore it. + offs += chdr->cmsg_len; + continue; + } + data += sizeof (spsz); + rv = nng_msg_append_header(msg, data, spsz); + if (rv != 0) { + if (!keep) { + nng_msg_free(msg); + } + nn_seterror(rv); + return (-1); + } + + break; } } @@ -654,3 +695,35 @@ nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz) } return (0); } + + +struct nn_cmsghdr * +nn_cmsg_next(struct nn_msghdr *mh, struct nn_cmsghdr *first) +{ + size_t clen; + char *data; + + // We only support SP headers, so there can be at most one header. + if (first != NULL) { + return (NULL); + } + if ((clen = mh->msg_controllen) == NN_MSG) { + nng_msg *msg; + data = *((void **) (mh->msg_control)); + msg = *(nng_msg **) (data - sizeof (msg)); + clen = nng_msg_len(msg); + } else { + data = mh->msg_control; + } + + if (first == NULL) { + first = (void *) data; + } else { + first = first + first->cmsg_len; + } + + if (((char *) first + sizeof (*first)) > (data + clen)) { + return (NULL); + } + return (first); +} diff --git a/src/nng_compat.h b/src/nng_compat.h index 64ace069..5866fe2c 100644 --- a/src/nng_compat.h +++ b/src/nng_compat.h @@ -258,7 +258,7 @@ struct nn_cmsghdr { int cmsg_type; }; -#define NN_ALIGN(len) \ +#define NN_CMSG_ALIGN(len) \ (((len) + sizeof (size_t) - 1) & (size_t) ~(sizeof (size_t) - 1)) // Unlike old nanomsg, we explicitly only support the SP header as attached @@ -268,16 +268,17 @@ struct nn_cmsghdr { // anyone used that in practice though.) #define NN_CMSG_FIRSTHDR(mh) \ nn_cmsg_next((struct nn_msghdr *) (mh), NULL) -#define NN_CMSG_NEXTHDR(mh, ch) \ +#define NN_CMSG_NXTHDR(mh, ch) \ nn_cmsg_next((struct nn_msghdr *) (mh), (struct nn_cmsghdr *) ch) #define NN_CMSG_DATA(ch) \ ((unsigned char *) (((struct nn_cmsghdr *) (ch)) + 1)) #define NN_CMSG_SPACE(len) \ - (NN_ALIGN(len) + NN_ALIGN(sizeof (struct nn_cmsghdr))) + (NN_CMSG_ALIGN(len) + NN_CMSG_ALIGN(sizeof (struct nn_cmsghdr))) #define NN_CMSG_LEN(len) \ - (NN_ALIGN(sizeof (nn_cmsghdr)) + (len)) + (NN_CMSG_ALIGN(sizeof (struct nn_cmsghdr)) + (len)) -NN_DECL struct cmsg_hdr *nn_cmsg_next(struct nn_msghdr *, struct nn_cmsghdr *); +NN_DECL struct nn_cmsghdr *nn_cmsg_next(struct nn_msghdr *, + struct nn_cmsghdr *); NN_DECL int nn_socket(int, int); NN_DECL int nn_setsockopt(int, int, int, const void *, size_t); NN_DECL int nn_getsockopt(int, int, int, void *, size_t *); @@ -287,7 +288,7 @@ NN_DECL int nn_shutdown(int, int); NN_DECL int nn_send(int, const void *, size_t, int); NN_DECL int nn_recv(int, void *, size_t, int); NN_DECL int nn_sendmsg(int, const struct nn_msghdr *, int); -NN_DECL int nn_recvcmsg(int, struct nn_msghdr *, int); +NN_DECL int nn_recvmsg(int, struct nn_msghdr *, int); NN_DECL int nn_close(int); NN_DECL int nn_poll(struct nn_pollfd *, int, int); NN_DECL int nn_device(int, int); diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 40ee52fc..93bc3d13 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -294,6 +294,7 @@ nni_rep_sock_setopt(void *arg, int opt, const void *buf, size_t sz) break; case NNG_OPT_RAW: rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); + nni_sock_senderr(rep->sock, rep->raw ? 0 : NNG_ESTATE); break; default: rv = NNG_ENOTSUP; diff --git a/src/transport/inproc/inproc.c b/src/transport/inproc/inproc.c index d6b1ac74..66f076ea 100644 --- a/src/transport/inproc/inproc.c +++ b/src/transport/inproc/inproc.c @@ -189,7 +189,7 @@ nni_inproc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_inproc_ep_init(void **epp, const char *url, uint16_t proto) +nni_inproc_ep_init(void **epp, const char *url, nni_sock *sock) { nni_inproc_ep *ep; int rv; @@ -207,7 +207,7 @@ nni_inproc_ep_init(void **epp, const char *url, uint16_t proto) ep->mode = NNI_INPROC_EP_IDLE; ep->closed = 0; - ep->proto = proto; + ep->proto = nni_sock_proto(sock); NNI_LIST_NODE_INIT(&ep->node); NNI_LIST_INIT(&ep->clients, nni_inproc_ep, node); @@ -291,8 +291,6 @@ nni_inproc_ep_connect(void *arg, void **pipep) return (NNG_ECONNREFUSED); } - // XXX check protocol peer validity... - ep->mode = NNI_INPROC_EP_DIAL; nni_list_append(&server->clients, ep); nni_cv_wake(&server->cv); diff --git a/src/transport/ipc/ipc.c b/src/transport/ipc/ipc.c index 3aef5363..4673a6dc 100644 --- a/src/transport/ipc/ipc.c +++ b/src/transport/ipc/ipc.c @@ -26,7 +26,7 @@ struct nni_ipc_pipe { nni_plat_ipcsock fd; uint16_t peer; uint16_t proto; - uint32_t rcvmax; + size_t rcvmax; }; struct nni_ipc_ep { @@ -34,7 +34,7 @@ struct nni_ipc_ep { nni_plat_ipcsock fd; int closed; uint16_t proto; - uint32_t rcvmax; + size_t rcvmax; }; static int @@ -123,7 +123,7 @@ nni_ipc_pipe_recv(void *arg, nni_msg **msgp) } NNI_GET64(buf, len); if (len > pipe->rcvmax) { - return (NNG_EPROTO); + return (NNG_EMSGSIZE); } if ((rv = nng_msg_alloc(&msg, (size_t) len)) != 0) { @@ -176,7 +176,7 @@ nni_ipc_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_ipc_ep_init(void **epp, const char *url, uint16_t proto) +nni_ipc_ep_init(void **epp, const char *url, nni_sock *sock) { nni_ipc_ep *ep; int rv; @@ -188,8 +188,8 @@ nni_ipc_ep_init(void **epp, const char *url, uint16_t proto) return (NNG_ENOMEM); } ep->closed = 0; - ep->proto = proto; - ep->rcvmax = 1024 * 1024; // XXX: fix this + ep->proto = nni_sock_proto(sock); + ep->rcvmax = nni_sock_rcvmaxsz(sock); if ((rv = nni_plat_ipc_init(&ep->fd)) != 0) { NNI_FREE_STRUCT(ep); return (rv); diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index d18fd289..ad2d398a 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -25,7 +25,7 @@ struct nni_tcp_pipe { nni_plat_tcpsock fd; uint16_t peer; uint16_t proto; - uint32_t rcvmax; + size_t rcvmax; }; struct nni_tcp_ep { @@ -33,7 +33,7 @@ struct nni_tcp_ep { nni_plat_tcpsock fd; int closed; uint16_t proto; - uint32_t rcvmax; + size_t rcvmax; int ipv4only; }; @@ -112,7 +112,7 @@ nni_tcp_pipe_recv(void *arg, nni_msg **msgp) } NNI_GET64(buf, len); if (len > pipe->rcvmax) { - return (NNG_EPROTO); + return (NNG_EMSGSIZE); } if ((rv = nng_msg_alloc(&msg, (size_t) len)) != 0) { @@ -165,7 +165,7 @@ nni_tcp_pipe_getopt(void *arg, int option, void *buf, size_t *szp) static int -nni_tcp_ep_init(void **epp, const char *url, uint16_t proto) +nni_tcp_ep_init(void **epp, const char *url, nni_sock *sock) { nni_tcp_ep *ep; int rv; @@ -177,9 +177,9 @@ nni_tcp_ep_init(void **epp, const char *url, uint16_t proto) return (NNG_ENOMEM); } ep->closed = 0; - ep->proto = proto; - ep->ipv4only = 0; - ep->rcvmax = 1024 * 1024; // XXX: fix this + ep->proto = nni_sock_proto(sock); + ep->ipv4only = 0; // XXX: FIXME + ep->rcvmax = nni_sock_rcvmaxsz(sock); if ((rv = nni_plat_tcp_init(&ep->fd)) != 0) { NNI_FREE_STRUCT(ep); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 52c9fdba..57230f33 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -86,4 +86,5 @@ add_nng_test(survey 5) add_nng_test(tcp 5) # compatbility tests +add_nng_compat_test(compat_cmsg 5) add_nng_compat_test(compat_reqrep 5) diff --git a/tests/compat_cmsg.c b/tests/compat_cmsg.c new file mode 100644 index 00000000..4d70dabe --- /dev/null +++ b/tests/compat_cmsg.c @@ -0,0 +1,114 @@ +/* + Copyright (c) 2014 Martin Sustrik All rights reserved. + Copyright 2015 Garrett D'Amore <garrett@damore.org> + Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com> + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "nng_compat.h" +#include "compat_testutil.h" + +int main (int argc, const char *argv[]) +{ + int rc; + int rep; + int req; + struct nn_msghdr hdr; + struct nn_iovec iovec; + unsigned char body [3]; + unsigned char ctrl [256]; + struct nn_cmsghdr *cmsg; + unsigned char *data; + void *buf; + char socket_address[128]; + + test_addr_from(socket_address, "tcp", "127.0.0.1", + get_test_port(argc, argv)); + strcpy(socket_address, "inproc://testaddr"); + + rep = test_socket (AF_SP_RAW, NN_REP); + test_bind (rep, socket_address); + req = test_socket (AF_SP, NN_REQ); + test_connect (req, socket_address); + + /* Test ancillary data in static buffer. */ + + test_send (req, "ABC"); + + iovec.iov_base = body; + iovec.iov_len = sizeof (body); + hdr.msg_iov = &iovec; + hdr.msg_iovlen = 1; + hdr.msg_control = ctrl; + hdr.msg_controllen = sizeof (ctrl); + rc = nn_recvmsg (rep, &hdr, 0); + errno_assert (rc == 3); + + cmsg = NN_CMSG_FIRSTHDR (&hdr); + while (1) { + nn_assert (cmsg); + if (cmsg->cmsg_level == PROTO_SP && cmsg->cmsg_type == SP_HDR) + break; + cmsg = NN_CMSG_NXTHDR (&hdr, cmsg); + } + nn_assert (cmsg->cmsg_len == NN_CMSG_SPACE (8+sizeof (size_t))); + data = NN_CMSG_DATA (cmsg); + nn_assert (!(data[0+sizeof (size_t)] & 0x80)); + nn_assert (data[4+sizeof (size_t)] & 0x80); + + rc = nn_sendmsg (rep, &hdr, 0); + nn_assert (rc == 3); + test_recv (req, "ABC"); + + /* Test ancillary data in dynamically allocated buffer (NN_MSG). */ + + test_send (req, "ABC"); + + iovec.iov_base = body; + iovec.iov_len = sizeof (body); + hdr.msg_iov = &iovec; + hdr.msg_iovlen = 1; + hdr.msg_control = &buf; + hdr.msg_controllen = NN_MSG; + rc = nn_recvmsg (rep, &hdr, 0); + errno_assert (rc == 3); + + cmsg = NN_CMSG_FIRSTHDR (&hdr); + while (1) { + nn_assert (cmsg); + if (cmsg->cmsg_level == PROTO_SP && cmsg->cmsg_type == SP_HDR) + break; + cmsg = NN_CMSG_NXTHDR (&hdr, cmsg); + } + nn_assert (cmsg->cmsg_len == NN_CMSG_SPACE (8+sizeof (size_t))); + data = NN_CMSG_DATA (cmsg); + nn_assert (!(data[0+sizeof (size_t)] & 0x80)); + nn_assert (data[4+sizeof (size_t)] & 0x80); + + rc = nn_sendmsg (rep, &hdr, 0); + nn_assert (rc == 3); + test_recv (req, "ABC"); + + test_close (req); + test_close (rep); + + return 0; +} + |
