aboutsummaryrefslogtreecommitdiff
path: root/src/nng.c
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2021-11-28 09:05:30 -0800
committerGarrett D'Amore <garrett@damore.org>2021-11-28 18:13:21 -0800
commitefa702387fcfa80ddd02e1a85f73c5b6f6ba1955 (patch)
tree853d84a55f7b4f9c1c100a82443520a258cec9d8 /src/nng.c
parent774e9375153d8a48bd1cbf654ca323656551b054 (diff)
downloadnng-efa702387fcfa80ddd02e1a85f73c5b6f6ba1955.tar.gz
nng-efa702387fcfa80ddd02e1a85f73c5b6f6ba1955.tar.bz2
nng-efa702387fcfa80ddd02e1a85f73c5b6f6ba1955.zip
fixes #1536 nng_sendmsg and nng_recvmsg could be faster
fixes #1535 Desire nng_ctx_sendmsg and nng_ctx_recvmsg
Diffstat (limited to 'src/nng.c')
-rw-r--r--src/nng.c140
1 files changed, 112 insertions, 28 deletions
diff --git a/src/nng.c b/src/nng.c
index b1ebbd11..57f24a70 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -120,29 +120,33 @@ nng_recv(nng_socket s, void *buf, size_t *szp, int flags)
int
nng_recvmsg(nng_socket s, nng_msg **msgp, int flags)
{
- int rv;
- nng_aio *ap;
+ int rv;
+ nni_sock *sock;
+ nni_aio aio;
- if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
+ if ((rv = nni_sock_find(&sock, s.id)) != 0) {
return (rv);
}
+
+ nni_aio_init(&aio, NULL, NULL);
if (flags & NNG_FLAG_NONBLOCK) {
- nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ nng_aio_set_timeout(&aio, NNG_DURATION_ZERO);
} else {
- nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ nng_aio_set_timeout(&aio, NNG_DURATION_DEFAULT);
}
+ nni_sock_recv(sock, &aio);
+ nni_sock_rele(sock);
- nng_recv_aio(s, ap);
- nng_aio_wait(ap);
+ nni_aio_wait(&aio);
- if ((rv = nng_aio_result(ap)) == 0) {
- *msgp = nng_aio_get_msg(ap);
+ if ((rv = nni_aio_result(&aio)) == 0) {
+ *msgp = nng_aio_get_msg(&aio);
} else if ((rv == NNG_ETIMEDOUT) &&
((flags & NNG_FLAG_NONBLOCK) == NNG_FLAG_NONBLOCK)) {
rv = NNG_EAGAIN;
}
- nng_aio_free(ap);
+ nni_aio_fini(&aio);
return (rv);
}
@@ -171,24 +175,31 @@ nng_send(nng_socket s, void *buf, size_t len, int flags)
int
nng_sendmsg(nng_socket s, nng_msg *msg, int flags)
{
- int rv;
- nng_aio *ap;
+ int rv;
+ nni_aio aio;
+ nni_sock *sock;
- if ((rv = nng_aio_alloc(&ap, NULL, NULL)) != 0) {
+ if (msg == NULL) {
+ return (NNG_EINVAL);
+ }
+ if ((rv = nni_sock_find(&sock, s.id)) != 0) {
return (rv);
}
+
+ nni_aio_init(&aio, NULL, NULL);
if ((flags & NNG_FLAG_NONBLOCK) == NNG_FLAG_NONBLOCK) {
- nng_aio_set_timeout(ap, NNG_DURATION_ZERO);
+ nni_aio_set_timeout(&aio, NNG_DURATION_ZERO);
} else {
- nng_aio_set_timeout(ap, NNG_DURATION_DEFAULT);
+ nni_aio_set_timeout(&aio, NNG_DURATION_DEFAULT);
}
- nng_aio_set_msg(ap, msg);
- nng_send_aio(s, ap);
- nng_aio_wait(ap);
+ nng_aio_set_msg(&aio, msg);
+ nni_sock_send(sock, &aio);
+ nni_sock_rele(sock);
- rv = nng_aio_result(ap);
- nng_aio_free(ap);
+ nni_aio_wait(&aio);
+ rv = nni_aio_result(&aio);
+ nni_aio_fini(&aio);
// Possibly massage nonblocking attempt. Note that nonblocking is
// still done asynchronously, and the calling thread loses context.
@@ -242,7 +253,7 @@ int
nng_ctx_open(nng_ctx *cp, nng_socket s)
{
nni_sock *sock;
- nni_ctx * ctx;
+ nni_ctx *ctx;
int rv;
nng_ctx c;
@@ -280,6 +291,40 @@ nng_ctx_id(nng_ctx c)
return (((int) c.id > 0) ? (int) c.id : -1);
}
+int
+nng_ctx_recvmsg(nng_ctx cid, nng_msg **msgp, int flags)
+{
+ int rv;
+ nni_aio aio;
+ nni_ctx *ctx;
+
+ if ((rv = nni_ctx_find(&ctx, cid.id, false)) != 0) {
+ return (rv);
+ }
+
+ nni_aio_init(&aio, NULL, NULL);
+ if (flags & NNG_FLAG_NONBLOCK) {
+ nng_aio_set_timeout(&aio, NNG_DURATION_ZERO);
+ } else {
+ nng_aio_set_timeout(&aio, NNG_DURATION_DEFAULT);
+ }
+ nni_ctx_recv(ctx, &aio);
+ nni_ctx_rele(ctx);
+
+ nni_aio_wait(&aio);
+
+ if ((rv = nni_aio_result(&aio)) == 0) {
+ *msgp = nng_aio_get_msg(&aio);
+
+ } else if ((rv == NNG_ETIMEDOUT) &&
+ ((flags & NNG_FLAG_NONBLOCK) == NNG_FLAG_NONBLOCK)) {
+ rv = NNG_EAGAIN;
+ }
+ nni_aio_fini(&aio);
+
+ return (rv);
+}
+
void
nng_ctx_recv(nng_ctx cid, nng_aio *aio)
{
@@ -318,6 +363,45 @@ nng_ctx_send(nng_ctx cid, nng_aio *aio)
nni_ctx_rele(ctx);
}
+int
+nng_ctx_sendmsg(nng_ctx cid, nng_msg *msg, int flags)
+{
+ int rv;
+ nni_aio aio;
+ nni_ctx *ctx;
+
+ if (msg == NULL) {
+ return (NNG_EINVAL);
+ }
+ if ((rv = nni_ctx_find(&ctx, cid.id, false)) != 0) {
+ return (rv);
+ }
+
+ nni_aio_init(&aio, NULL, NULL);
+ if ((flags & NNG_FLAG_NONBLOCK) == NNG_FLAG_NONBLOCK) {
+ nni_aio_set_timeout(&aio, NNG_DURATION_ZERO);
+ } else {
+ nni_aio_set_timeout(&aio, NNG_DURATION_DEFAULT);
+ }
+
+ nng_aio_set_msg(&aio, msg);
+ nni_ctx_send(ctx, &aio);
+ nni_ctx_rele(ctx);
+
+ nni_aio_wait(&aio);
+ rv = nni_aio_result(&aio);
+ nni_aio_fini(&aio);
+
+ // Possibly massage nonblocking attempt. Note that nonblocking is
+ // still done asynchronously, and the calling thread loses context.
+ if ((rv == NNG_ETIMEDOUT) &&
+ ((flags & NNG_FLAG_NONBLOCK) == NNG_FLAG_NONBLOCK)) {
+ rv = NNG_EAGAIN;
+ }
+
+ return (rv);
+}
+
static int
ctx_get(nng_ctx id, const char *n, void *v, size_t *szp, nni_type t)
{
@@ -466,7 +550,7 @@ nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags)
{
nni_dialer *d;
int rv;
- nni_sock * s;
+ nni_sock *s;
if ((rv = nni_sock_find(&s, sid.id)) != 0) {
return (rv);
@@ -492,7 +576,7 @@ int
nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags)
{
int rv;
- nni_sock * s;
+ nni_sock *s;
nni_listener *l;
if ((rv = nni_sock_find(&s, sid.id)) != 0) {
@@ -519,7 +603,7 @@ nng_listen(nng_socket sid, const char *addr, nng_listener *lp, int flags)
int
nng_listener_create(nng_listener *lp, nng_socket sid, const char *addr)
{
- nni_sock * s;
+ nni_sock *s;
int rv;
nni_listener *l;
nng_listener lid;
@@ -560,7 +644,7 @@ nng_listener_id(nng_listener l)
int
nng_dialer_create(nng_dialer *dp, nng_socket sid, const char *addr)
{
- nni_sock * s;
+ nni_sock *s;
nni_dialer *d;
int rv;
nng_dialer did;
@@ -1247,7 +1331,7 @@ nng_socket
nng_pipe_socket(nng_pipe p)
{
nng_socket s = NNG_SOCKET_INITIALIZER;
- nni_pipe * pipe;
+ nni_pipe *pipe;
if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
s.id = nni_pipe_sock_id(pipe);
@@ -1260,7 +1344,7 @@ nng_dialer
nng_pipe_dialer(nng_pipe p)
{
nng_dialer d = NNG_DIALER_INITIALIZER;
- nni_pipe * pipe;
+ nni_pipe *pipe;
if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
d.id = nni_pipe_dialer_id(pipe);
nni_pipe_rele(pipe);
@@ -1272,7 +1356,7 @@ nng_listener
nng_pipe_listener(nng_pipe p)
{
nng_listener l = NNG_LISTENER_INITIALIZER;
- nni_pipe * pipe;
+ nni_pipe *pipe;
if ((nni_init() == 0) && (nni_pipe_find(&pipe, p.id) == 0)) {
l.id = nni_pipe_listener_id(pipe);
nni_pipe_rele(pipe);