diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-21 12:05:35 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-21 12:05:35 -0800 |
| commit | 0e2e1c40f4b22d940886de6e8555eeef9c076808 (patch) | |
| tree | 34eb2e705487b6333110362532181032e409b6fc | |
| parent | 4a24ebe175bc3eebf7fbf2ac581b2339a809ace1 (diff) | |
| download | nng-0e2e1c40f4b22d940886de6e8555eeef9c076808.tar.gz nng-0e2e1c40f4b22d940886de6e8555eeef9c076808.tar.bz2 nng-0e2e1c40f4b22d940886de6e8555eeef9c076808.zip | |
Implement nng_send and nng_recv convenience routines.
| -rw-r--r-- | src/nng.c | 72 | ||||
| -rw-r--r-- | src/nng.h | 22 | ||||
| -rw-r--r-- | tests/sock.c | 57 |
3 files changed, 110 insertions, 41 deletions
@@ -18,6 +18,8 @@ // Pretty much every function calls the nni_platform_init to check against // fork related activity. +#include <string.h> + int nng_open(nng_socket *sidp, uint16_t proto) { @@ -96,6 +98,43 @@ nng_peer(nng_socket sid) int +nng_recv(nng_socket sid, void *buf, size_t *szp, int flags) +{ + nng_msg *msg; + int rv; + + // Note that while it would be nice to make this a zero copy operation, + // its not normally possible if a size was specified. + if ((rv = nng_recvmsg(sid, &msg, flags & ~(NNG_FLAG_ALLOC))) != 0) { + return (rv); + } + if (!(flags & NNG_FLAG_ALLOC)) { + memcpy(buf, nng_msg_body(msg), + *szp > nng_msg_len(msg) ? nng_msg_len(msg) : *szp); + *szp = nng_msg_len(msg); + } else { + // We'd really like to avoid a separate data copy, but since + // we have allocated messages with headroom, we can't really + // make free() work on the base pointer. We'd have to have + // some other API for this. Folks that want zero copy had + // better use nng_recvmsg() instead. + void *nbuf; + + if ((nbuf = nni_alloc(nng_msg_len(msg))) == NULL) { + nng_msg_free(msg); + return (NNG_ENOMEM); + } + + *(void **) buf = nbuf; + memcpy(nbuf, nni_msg_body(msg), nni_msg_len(msg)); + *szp = nng_msg_len(msg); + } + nni_msg_free(msg); + return (0); +} + + +int nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { nni_time expire; @@ -121,6 +160,39 @@ nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) int +nng_send(nng_socket sid, void *buf, size_t len, int flags) +{ + nng_msg *msg; + int rv; + + if ((rv = nng_msg_alloc(&msg, len)) != 0) { + return (rv); + } + memcpy(nng_msg_body(msg), buf, len); + if ((rv = nng_sendmsg(sid, msg, flags)) != 0) { + nng_msg_free(msg); + } else if (flags & NNG_FLAG_ALLOC) { + nni_free(buf, len); + } + return (rv); +} + + +void * +nng_alloc(size_t sz) +{ + return (nni_alloc(sz)); +} + + +void +nng_free(void *buf, size_t sz) +{ + nni_free(buf, sz); +} + + +int nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { nni_time expire; @@ -180,8 +180,11 @@ NNG_DECL const char *nng_strerror(int); // this function may (will!) return before any receiver has actually // received the data. The return value will be zero to indicate that the // socket has accepted the entire data for send, or an errno to indicate -// failure. The flags may include NNG_FLAG_NONBLOCK. -NNG_DECL int nng_send(nng_socket, const void *, size_t, int); +// failure. The flags may include NNG_FLAG_NONBLOCK or NNG_FLAG_ALLOC. +// If the flag includes NNG_FLAG_ALLOC, then the function will call +// nng_free() on the supplied pointer & size on success. (If the call +// fails then the memory is not freed.) +NNG_DECL int nng_send(nng_socket, void *, size_t, int); // nng_recv receives message data into the socket, up to the supplied size. // The actual size of the message data will be written to the value pointed @@ -189,7 +192,7 @@ NNG_DECL int nng_send(nng_socket, const void *, size_t, int); // If NNG_FLAG_ALLOC is supplied then the library will allocate memory for // the caller. In that case the pointer to the allocated will be stored // instead of the data itself. The caller is responsible for freeing the -// associated memory with free(). +// associated memory with nng_free(). NNG_DECL int nng_recv(nng_socket, void *, size_t *, int); // nng_sendmsg is like nng_send, but offers up a message structure, which @@ -204,6 +207,19 @@ NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int); // can be passed off directly to nng_sendmsg. NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int); +// nng_alloc is used to allocate memory. It's intended purpose is for +// allocating memory suitable for message buffers with nng_send(). +// Applications that need memory for other purposes should use their platform +// specific API. +NNG_DECL void *nng_alloc(size_t); + +// nng_free is used to free memory allocated with nng_alloc, which includes +// memory allocated by nng_recv() when the NNG_FLAG_ALLOC message is supplied. +// As the application is required to keep track of the size of memory, this +// is probably less convenient for general uses than the C library malloc and +// calloc. +NNG_DECL void nng_free(void *, size_t); + // Message API. NNG_DECL int nng_msg_alloc(nng_msg **, size_t); NNG_DECL void nng_msg_free(nng_msg *); diff --git a/tests/sock.c b/tests/sock.c index a41a0093..ec923e4a 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -146,51 +146,32 @@ Main({ Convey("We can send and receive messages", { nng_socket sock2; int len = 1; - nng_msg *msg; - uint64_t second = 1000000; + size_t sz; + uint64_t second = 3000000; + char *buf; So(nng_open(&sock2, NNG_PROTO_PAIR) == 0); - rv = nng_setopt(sock, NNG_OPT_RCVBUF, &len, sizeof (len)); - So(rv == 0); - rv = nng_setopt(sock, NNG_OPT_SNDBUF, &len, sizeof (len)); - So(rv == 0); + So(nng_setopt(sock, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); + So(nng_setopt(sock, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - rv = nng_setopt(sock2, NNG_OPT_RCVBUF, &len, sizeof (len)); - So(rv == 0); - rv = nng_setopt(sock2, NNG_OPT_SNDBUF, &len, sizeof (len)); - So(rv == 0); + So(nng_setopt(sock2, NNG_OPT_RCVBUF, &len, sizeof (len)) == 0); + So(nng_setopt(sock2, NNG_OPT_SNDBUF, &len, sizeof (len)) == 0); - rv = nng_setopt(sock, NNG_OPT_SNDTIMEO, &second, sizeof (second)); - So(rv == 0); - rv = nng_setopt(sock, NNG_OPT_RCVTIMEO, &second, sizeof (second)); - So(rv == 0); - rv = nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second, sizeof (second)); - So(rv == 0); - rv = nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second, sizeof (second)); - So(rv == 0); + So(nng_setopt(sock, NNG_OPT_SNDTIMEO, &second, sizeof (second)) == 0); + So(nng_setopt(sock, NNG_OPT_RCVTIMEO, &second, sizeof (second)) == 0); + So(nng_setopt(sock2, NNG_OPT_SNDTIMEO, &second, sizeof (second)) == 0); + So(nng_setopt(sock2, NNG_OPT_RCVTIMEO, &second, sizeof (second)) == 0); - rv = nng_listen(sock, "inproc://test1", NULL, NNG_FLAG_SYNCH); - So(rv == 0); - rv = nng_dial(sock2, "inproc://test1", NULL, 0); - So(rv == 0); - - rv = nng_msg_alloc(&msg, 3); - So(rv == 0); - So(nng_msg_len(msg) == 3); - So(nng_msg_body(msg) != NULL); - memcpy(nng_msg_body(msg), "abc", 3); + So(nng_listen(sock, "inproc://test1", NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial(sock2, "inproc://test1", NULL, 0) == 0); - rv = nng_sendmsg(sock, msg, 0); - So(rv == 0); - - msg = NULL; - rv = nng_recvmsg(sock2, &msg, 0); - So(rv == 0); - So(msg != NULL); - So(nng_msg_len(msg) == 3); - So(memcmp(nng_msg_body(msg), "abc", 3) == 0); - nng_msg_free(msg); + So(nng_send(sock, "abc", 4, 0) == 0); + So(nng_recv(sock2 , &buf, &sz, NNG_FLAG_ALLOC) == 0); + So(buf != NULL); + So(sz == 4); + So(memcmp(buf, "abc", 4) == 0); + nng_free(buf, sz); nng_close(sock2); }) }) |
