aboutsummaryrefslogtreecommitdiff
path: root/src/sp/protocol/pubsub0
diff options
context:
space:
mode:
Diffstat (limited to 'src/sp/protocol/pubsub0')
-rw-r--r--src/sp/protocol/pubsub0/pub.c35
-rw-r--r--src/sp/protocol/pubsub0/pub_test.c5
-rw-r--r--src/sp/protocol/pubsub0/sub.c30
-rw-r--r--src/sp/protocol/pubsub0/sub_test.c23
-rw-r--r--src/sp/protocol/pubsub0/xsub_test.c7
5 files changed, 34 insertions, 66 deletions
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c
index 9539b851..c40725f7 100644
--- a/src/sp/protocol/pubsub0/pub.c
+++ b/src/sp/protocol/pubsub0/pub.c
@@ -253,21 +253,13 @@ pub0_sock_send(void *arg, nni_aio *aio)
}
static int
-pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t)
+pub0_sock_get_sendfd(void *arg, int *fdp)
{
pub0_sock *sock = arg;
- int fd;
- int rv;
- nni_mtx_lock(&sock->mtx);
+
// PUB sockets are *always* writable.
nni_pollable_raise(&sock->sendable);
- rv = nni_pollable_getfd(&sock->sendable, &fd);
- nni_mtx_unlock(&sock->mtx);
-
- if (rv == 0) {
- rv = nni_copyout_int(fd, buf, szp, t);
- }
- return (rv);
+ return (nni_pollable_getfd(&sock->sendable, fdp));
}
static int
@@ -321,10 +313,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = {
static nni_option pub0_sock_options[] = {
// terminate list
{
- .o_name = NNG_OPT_SENDFD,
- .o_get = pub0_sock_get_sendfd,
- },
- {
.o_name = NNG_OPT_SENDBUF,
.o_get = pub0_sock_get_sendbuf,
.o_set = pub0_sock_set_sendbuf,
@@ -335,14 +323,15 @@ static nni_option pub0_sock_options[] = {
};
static nni_proto_sock_ops pub0_sock_ops = {
- .sock_size = sizeof(pub0_sock),
- .sock_init = pub0_sock_init,
- .sock_fini = pub0_sock_fini,
- .sock_open = pub0_sock_open,
- .sock_close = pub0_sock_close,
- .sock_send = pub0_sock_send,
- .sock_recv = pub0_sock_recv,
- .sock_options = pub0_sock_options,
+ .sock_size = sizeof(pub0_sock),
+ .sock_init = pub0_sock_init,
+ .sock_fini = pub0_sock_fini,
+ .sock_open = pub0_sock_open,
+ .sock_close = pub0_sock_close,
+ .sock_send = pub0_sock_send,
+ .sock_recv = pub0_sock_recv,
+ .sock_send_poll_fd = pub0_sock_get_sendfd,
+ .sock_options = pub0_sock_options,
};
static nni_proto pub0_proto = {
diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c
index 462decd2..4be4b06f 100644
--- a/src/sp/protocol/pubsub0/pub_test.c
+++ b/src/sp/protocol/pubsub0/pub_test.c
@@ -7,6 +7,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "nng/nng.h"
#include <nuts.h>
static void
@@ -58,7 +59,7 @@ test_pub_not_readable(void)
nng_socket pub;
NUTS_PASS(nng_pub0_open(&pub));
- NUTS_FAIL(nng_socket_get_int(pub, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP);
+ NUTS_FAIL(nng_socket_get_recv_poll_fd(pub, &fd), NNG_ENOTSUP);
NUTS_CLOSE(pub);
}
@@ -71,7 +72,7 @@ test_pub_poll_writeable(void)
NUTS_PASS(nng_sub0_open(&sub));
NUTS_PASS(nng_pub0_open(&pub));
- NUTS_PASS(nng_socket_get_int(pub, NNG_OPT_SENDFD, &fd));
+ NUTS_PASS(nng_socket_get_send_poll_fd(pub, &fd));
NUTS_TRUE(fd >= 0);
// Pub is *always* writeable
diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c
index ee911153..5d6a2a05 100644
--- a/src/sp/protocol/pubsub0/sub.c
+++ b/src/sp/protocol/pubsub0/sub.c
@@ -615,16 +615,11 @@ sub0_sock_recv(void *arg, nni_aio *aio)
}
static int
-sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
+sub0_sock_get_recv_fd(void *arg, int *fdp)
{
sub0_sock *sock = arg;
- int rv;
- int fd;
- if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) {
- return (rv);
- }
- return (nni_copyout_int(fd, buf, szp, t));
+ return (nni_pollable_getfd(&sock->readable, fdp));
}
static int
@@ -699,10 +694,6 @@ static nni_option sub0_sock_options[] = {
.o_set = sub0_sock_unsubscribe,
},
{
- .o_name = NNG_OPT_RECVFD,
- .o_get = sub0_sock_get_recv_fd,
- },
- {
.o_name = NNG_OPT_RECVBUF,
.o_get = sub0_sock_get_recv_buf_len,
.o_set = sub0_sock_set_recv_buf_len,
@@ -719,14 +710,15 @@ static nni_option sub0_sock_options[] = {
};
static nni_proto_sock_ops sub0_sock_ops = {
- .sock_size = sizeof(sub0_sock),
- .sock_init = sub0_sock_init,
- .sock_fini = sub0_sock_fini,
- .sock_open = sub0_sock_open,
- .sock_close = sub0_sock_close,
- .sock_send = sub0_sock_send,
- .sock_recv = sub0_sock_recv,
- .sock_options = sub0_sock_options,
+ .sock_size = sizeof(sub0_sock),
+ .sock_init = sub0_sock_init,
+ .sock_fini = sub0_sock_fini,
+ .sock_open = sub0_sock_open,
+ .sock_close = sub0_sock_close,
+ .sock_send = sub0_sock_send,
+ .sock_recv = sub0_sock_recv,
+ .sock_recv_poll_fd = sub0_sock_get_recv_fd,
+ .sock_options = sub0_sock_options,
};
static nni_proto sub0_proto = {
diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c
index a332a0a7..ef38042a 100644
--- a/src/sp/protocol/pubsub0/sub_test.c
+++ b/src/sp/protocol/pubsub0/sub_test.c
@@ -7,6 +7,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "nng/nng.h"
#include <nuts.h>
static void
@@ -70,7 +71,7 @@ test_sub_not_writeable(void)
nng_socket sub;
NUTS_PASS(nng_sub0_open(&sub));
- NUTS_FAIL(nng_socket_get_int(sub, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP);
+ NUTS_FAIL(nng_socket_get_send_poll_fd(sub, &fd), NNG_ENOTSUP);
NUTS_CLOSE(sub);
}
@@ -86,7 +87,7 @@ test_sub_poll_readable(void)
NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "a", 1));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
- NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd));
+ NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd));
NUTS_TRUE(fd >= 0);
// Not readable if not connected!
@@ -130,7 +131,7 @@ test_sub_recv_late(void)
NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
- NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd));
+ NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd));
NUTS_TRUE(fd >= 0);
// Not readable if not connected!
@@ -162,21 +163,6 @@ test_sub_recv_late(void)
}
void
-test_sub_context_no_poll(void)
-{
- int fd;
- nng_socket sub;
- nng_ctx ctx;
-
- NUTS_PASS(nng_sub0_open(&sub));
- NUTS_PASS(nng_ctx_open(&ctx, sub));
- NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP);
- NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP);
- NUTS_PASS(nng_ctx_close(ctx));
- NUTS_CLOSE(sub);
-}
-
-void
test_sub_validate_peer(void)
{
nng_socket s1, s2;
@@ -603,7 +589,6 @@ TEST_LIST = {
{ "sub context cannot send", test_sub_context_cannot_send },
{ "sub not writeable", test_sub_not_writeable },
{ "sub poll readable", test_sub_poll_readable },
- { "sub context does not poll", test_sub_context_no_poll },
{ "sub validate peer", test_sub_validate_peer },
{ "sub recv late", test_sub_recv_late },
{ "sub recv ctx closed", test_sub_recv_ctx_closed },
diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c
index 9f2be2be..b8f303f3 100644
--- a/src/sp/protocol/pubsub0/xsub_test.c
+++ b/src/sp/protocol/pubsub0/xsub_test.c
@@ -7,6 +7,7 @@
// found online at https://opensource.org/licenses/MIT.
//
+#include "nng/nng.h"
#include <nuts.h>
static void
@@ -47,7 +48,7 @@ test_xsub_not_writeable(void)
nng_socket sub;
NUTS_PASS(nng_sub0_open_raw(&sub));
- NUTS_FAIL(nng_socket_get_int(sub, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP);
+ NUTS_FAIL(nng_socket_get_send_poll_fd(sub, &fd), NNG_ENOTSUP);
NUTS_CLOSE(sub);
}
@@ -62,7 +63,7 @@ test_xsub_poll_readable(void)
NUTS_PASS(nng_pub0_open(&pub));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
- NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd));
+ NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd));
NUTS_TRUE(fd >= 0);
// Not readable if not connected!
@@ -101,7 +102,7 @@ test_xsub_recv_late(void)
NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL));
NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000));
NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000));
- NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd));
+ NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd));
NUTS_TRUE(fd >= 0);
// Not readable if not connected!