From fc553e0689a9be70b90663db1bcd020706ba9ae6 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Mon, 23 Jan 2017 14:00:05 -0800 Subject: nn_recvmsg, and enhancements to support NN_MSG allocated messages. --- src/nng_compat.c | 210 ++++++++++++++++++++++++++++++++++++++++++------------- src/nng_compat.h | 15 ++-- 2 files changed, 172 insertions(+), 53 deletions(-) (limited to 'src') diff --git a/src/nng_compat.c b/src/nng_compat.c index bed171ac..e5859cdc 100644 --- a/src/nng_compat.c +++ b/src/nng_compat.c @@ -13,14 +13,8 @@ #include #include -// This file provides the "public" API. This is a thin wrapper around -// internal API functions. We use the public prefix instead of internal, -// to indicate that these interfaces are intended for applications to use -// directly. -// -// Anything not defined in this file, applications have no business using. -// Pretty much every function calls the nni_platform_init to check against -// fork related activity. +// This file supplies the legacy compatibility API. Applications should +// avoid using these if at all possible, and instead use the new style APIs. static struct { int perr; @@ -204,18 +198,19 @@ nn_allocmsg(size_t size, int type) } -void -nni_freemsg(void *ptr) +int +nn_freemsg(void *ptr) { nng_msg *msg; memcpy(&msg, ((char *) ptr) - sizeof (msg), sizeof (msg)); nng_msg_free(msg); + return (0); } void * -nni_reallocmsg(void *ptr, size_t len) +nn_reallocmsg(void *ptr, size_t len) { nng_msg *msg; int rv; @@ -242,21 +237,31 @@ nni_reallocmsg(void *ptr, size_t len) } -int -nn_send(int s, const void *buf, size_t len, int flags) +static int +nn_flags(int flags) { - int rv; - switch (flags) { - case NN_DONTWAIT: - flags = NNG_FLAG_NONBLOCK; - break; case 0: - break; + return (0); + + case NN_DONTWAIT: + return (NNG_FLAG_NONBLOCK); + default: nn_seterror(NNG_EINVAL); return (-1); } +} + + +int +nn_send(int s, const void *buf, size_t len, int flags) +{ + int rv; + + if ((flags = nn_flags(flags)) == -1) { + return (-1); + } if (len == NN_MSG) { nng_msg *msg; memcpy(&msg, ((char *) buf) - sizeof (msg), sizeof (msg)); @@ -278,41 +283,38 @@ nn_recv(int s, void *buf, size_t len, int flags) { int rv; - switch (flags) { - case NN_DONTWAIT: - flags = NNG_FLAG_NONBLOCK; - break; - case 0: - break; - default: - nn_seterror(NNG_EINVAL); + if ((flags = nn_flags(flags)) == -1) { return (-1); } + if (len == NN_MSG) { nng_msg *msg; - rv = nng_recvmsg((nng_socket) s, &msg, flags); - if (rv == 0) { - void *body; - // prepend our header to the body... - // Note that this *can* alter the message, - // although for performance reasons it ought not. - // (There should be sufficient headroom.) - nng_msg_prepend(msg, &msg, sizeof (msg)); - // then trim it off :-) - nng_msg_trim(msg, sizeof (msg)); - // store the pointer to the revised body - body = nng_msg_body(msg); - - // arguably we could do this with a pointer store, - // but memcpy gives us extra paranoia in case the - // the receiver is misaligned. - memcpy(buf, &body, sizeof (body)); - len = nng_msg_len(msg); + void *body; + if ((rv = nng_recvmsg((nng_socket) s, &msg, flags)) != 0) { + nn_seterror(rv); + return (-1); } - } else { - rv = nng_recv((nng_socket) s, buf, &len, flags); + + // prepend our header to the body... + // Note that this *can* alter the message, + // although for performance reasons it ought not. + // (There should be sufficient headroom.) + if ((rv = nng_msg_prepend(msg, &msg, sizeof (msg))) != 0) { + nng_msg_free(msg); + nn_seterror(rv); + return (-1); + } + + // now "trim" it off... the value is still there, but the + // contents are unreferenced. We rely on legacy nanomsg's + // ignorance of nng msgs to preserve this. + nng_msg_trim(msg, sizeof (msg)); + + *(void **) buf = nng_msg_body(msg); + return ((int) nng_msg_len(msg)); } - if (rv != 0) { + + if ((rv = nng_recv((nng_socket) s, buf, &len, flags)) != 0) { nn_seterror(rv); return (-1); } @@ -320,6 +322,118 @@ nn_recv(int s, void *buf, size_t len, int flags) } +int +nn_recvmsg(int s, struct nn_msghdr *mh, int flags) +{ + int rv; + nng_msg *msg; + size_t len; + int keep = 0; + + if ((flags = nn_flags(flags)) == -1) { + return (-1); + } + if (mh == NULL) { + nn_seterror(NNG_EINVAL); + return (-1); + } + + if ((rv = nng_recvmsg((nng_socket) s, &msg, flags)) != 0) { + nn_seterror(rv); + return (-1); + } + if ((mh->msg_iovlen == 1) && (mh->msg_iov[0].iov_len == NN_MSG)) { + // Receiver wants to have a dynamically allocated message. + // There can only be one of these. + if ((rv = nng_msg_prepend(msg, &msg, sizeof (msg))) != 0) { + nng_msg_free(msg); + nn_seterror(rv); + return (-1); + } + nng_msg_trim(msg, sizeof (msg)); + *(void **) (mh->msg_iov[0].iov_base) = nng_msg_body(msg); + len = nng_msg_len(msg); + keep = 1; // Do not discard message! + } else { + // copyout to multiple iovecs. + char *ptr = nng_msg_body(msg); + int i; + size_t n; + len = nng_msg_len(msg); + + for (i = 0; i < mh->msg_iovlen; i++) { + if ((n = mh->msg_iov[i].iov_len) == NN_MSG) { + // This is forbidden! + nn_seterror(NNG_EINVAL); + nng_msg_free(msg); + return (-1); + } + if (n > len) { + n = len; + } + memcpy(mh->msg_iov[i].iov_base, ptr, n); + len -= n; + ptr += n; + } + + // If we copied everything, len will be zero, otherwise, + // it represents the amount of data that we were unable to + // copyout. The caller is responsible for noticing this, + // as there is no API to pass this information out. + len = nng_msg_len(msg); + } + + // If the caller has requested control information (header details), + // we grab it. + if (mh->msg_control != NULL) { + char *cdata; + size_t clen; + size_t tlen; + struct nn_cmsghdr *hdr; + + clen = NN_CMSG_SPACE(nng_msg_header_len(msg)); + + if ((tlen = mh->msg_controllen) == NN_MSG) { + // Ideally we'd use the same msg, but we would need + // to set up reference counts on the message, so + // instead we just make a new message. + nng_msg *nmsg; + + rv = nng_msg_alloc(&nmsg, clen + sizeof (nmsg)); + if (rv != 0) { + nng_msg_free(msg); + nn_seterror(rv); + return (-1); + } + memcpy(nng_msg_body(nmsg), &nmsg, sizeof (nmsg)); + nng_msg_trim(nmsg, sizeof (nmsg)); + cdata = nng_msg_body(nmsg); + *(void **) mh->msg_control = cdata; + tlen = clen; + } else { + cdata = mh->msg_control; + memset(cdata, 0, + tlen > sizeof (*hdr) ? sizeof (*hdr) : tlen); + } + + if (clen <= tlen) { + hdr = (void *) cdata; + hdr->cmsg_len = nng_msg_header_len(msg); + hdr->cmsg_level = PROTO_SP; + hdr->cmsg_type = SP_HDR; + + memcpy(NN_CMSG_DATA(cdata), nng_msg_header(msg), + nng_msg_header_len(msg)); + } + } + + if (!keep) { + nng_msg_free(msg); + } + return (len); +} + + #if 0 int nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) diff --git a/src/nng_compat.h b/src/nng_compat.h index 020a342a..0a147196 100644 --- a/src/nng_compat.h +++ b/src/nng_compat.h @@ -245,19 +245,24 @@ struct nn_cmsghdr { #define NN_ALIGN(len) \ (((len) + sizeof (size_t) - 1) & (size_t) ~(sizeof (size_t) - 1)) + +// Unlike old nanomsg, we explicitly only support the SP header as attached +// cmsg data. It turns out that old nanomsg didn't really store anything +// useful otherwise anyway. (One specific exception was that it stored the +// message type of text or binary for the websocket transport. We don't think +// anyone used that in practice though.) #define NN_CMSG_FIRSTHDR(mh) \ - nn_cmsg_nexthdr((struct nn_msghdr *) (mh), NULL) + nn_cmsg_next((struct nn_msghdr *) (mh), NULL) #define NN_CMSG_NEXTHDR(mh, ch) \ - nn_cmsg_nexthdr((struct nn_msghdr *) (mh), (struct nn_cmsghdr *) ch) + nn_cmsg_next((struct nn_msghdr *) (mh), (struct nn_cmsghdr *) ch) #define NN_CMSG_DATA(ch) \ - ((unsigned char *) (((struct cmsghdr *) (ch)) + 1)) + ((unsigned char *) (((struct nn_cmsghdr *) (ch)) + 1)) #define NN_CMSG_SPACE(len) \ (NN_ALIGN(len) + NN_ALIGN(sizeof (struct nn_cmsghdr))) #define NN_CMSG_LEN(len) \ (NN_ALIGN(sizeof (nn_cmsghdr)) + (len)) -NN_DECL struct cmsg_hdr *nn_cmsg_nexthdr(const struct nn_msghdr *, - const struct nn_cmsghdr *); +NN_DECL struct cmsg_hdr *nn_cmsg_next(struct nn_msghdr *, struct nn_cmsghdr *); NN_DECL int nn_socket(int, int); NN_DECL int nn_setsockopt(int, int, int, const void *, size_t); NN_DECL int nn_getsockopt(int, int, int, void *, size_t *); -- cgit v1.2.3-70-g09d2