diff options
Diffstat (limited to 'src/sp/protocol/pubsub0')
| -rw-r--r-- | src/sp/protocol/pubsub0/pub.c | 35 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/pub_test.c | 5 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub.c | 30 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/sub_test.c | 23 | ||||
| -rw-r--r-- | src/sp/protocol/pubsub0/xsub_test.c | 7 |
5 files changed, 34 insertions, 66 deletions
diff --git a/src/sp/protocol/pubsub0/pub.c b/src/sp/protocol/pubsub0/pub.c index 9539b851..c40725f7 100644 --- a/src/sp/protocol/pubsub0/pub.c +++ b/src/sp/protocol/pubsub0/pub.c @@ -253,21 +253,13 @@ pub0_sock_send(void *arg, nni_aio *aio) } static int -pub0_sock_get_sendfd(void *arg, void *buf, size_t *szp, nni_type t) +pub0_sock_get_sendfd(void *arg, int *fdp) { pub0_sock *sock = arg; - int fd; - int rv; - nni_mtx_lock(&sock->mtx); + // PUB sockets are *always* writable. nni_pollable_raise(&sock->sendable); - rv = nni_pollable_getfd(&sock->sendable, &fd); - nni_mtx_unlock(&sock->mtx); - - if (rv == 0) { - rv = nni_copyout_int(fd, buf, szp, t); - } - return (rv); + return (nni_pollable_getfd(&sock->sendable, fdp)); } static int @@ -321,10 +313,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = { static nni_option pub0_sock_options[] = { // terminate list { - .o_name = NNG_OPT_SENDFD, - .o_get = pub0_sock_get_sendfd, - }, - { .o_name = NNG_OPT_SENDBUF, .o_get = pub0_sock_get_sendbuf, .o_set = pub0_sock_set_sendbuf, @@ -335,14 +323,15 @@ static nni_option pub0_sock_options[] = { }; static nni_proto_sock_ops pub0_sock_ops = { - .sock_size = sizeof(pub0_sock), - .sock_init = pub0_sock_init, - .sock_fini = pub0_sock_fini, - .sock_open = pub0_sock_open, - .sock_close = pub0_sock_close, - .sock_send = pub0_sock_send, - .sock_recv = pub0_sock_recv, - .sock_options = pub0_sock_options, + .sock_size = sizeof(pub0_sock), + .sock_init = pub0_sock_init, + .sock_fini = pub0_sock_fini, + .sock_open = pub0_sock_open, + .sock_close = pub0_sock_close, + .sock_send = pub0_sock_send, + .sock_recv = pub0_sock_recv, + .sock_send_poll_fd = pub0_sock_get_sendfd, + .sock_options = pub0_sock_options, }; static nni_proto pub0_proto = { diff --git a/src/sp/protocol/pubsub0/pub_test.c b/src/sp/protocol/pubsub0/pub_test.c index 462decd2..4be4b06f 100644 --- a/src/sp/protocol/pubsub0/pub_test.c +++ b/src/sp/protocol/pubsub0/pub_test.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include <nuts.h> static void @@ -58,7 +59,7 @@ test_pub_not_readable(void) nng_socket pub; NUTS_PASS(nng_pub0_open(&pub)); - NUTS_FAIL(nng_socket_get_int(pub, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_socket_get_recv_poll_fd(pub, &fd), NNG_ENOTSUP); NUTS_CLOSE(pub); } @@ -71,7 +72,7 @@ test_pub_poll_writeable(void) NUTS_PASS(nng_sub0_open(&sub)); NUTS_PASS(nng_pub0_open(&pub)); - NUTS_PASS(nng_socket_get_int(pub, NNG_OPT_SENDFD, &fd)); + NUTS_PASS(nng_socket_get_send_poll_fd(pub, &fd)); NUTS_TRUE(fd >= 0); // Pub is *always* writeable diff --git a/src/sp/protocol/pubsub0/sub.c b/src/sp/protocol/pubsub0/sub.c index ee911153..5d6a2a05 100644 --- a/src/sp/protocol/pubsub0/sub.c +++ b/src/sp/protocol/pubsub0/sub.c @@ -615,16 +615,11 @@ sub0_sock_recv(void *arg, nni_aio *aio) } static int -sub0_sock_get_recv_fd(void *arg, void *buf, size_t *szp, nni_opt_type t) +sub0_sock_get_recv_fd(void *arg, int *fdp) { sub0_sock *sock = arg; - int rv; - int fd; - if ((rv = nni_pollable_getfd(&sock->readable, &fd)) != 0) { - return (rv); - } - return (nni_copyout_int(fd, buf, szp, t)); + return (nni_pollable_getfd(&sock->readable, fdp)); } static int @@ -699,10 +694,6 @@ static nni_option sub0_sock_options[] = { .o_set = sub0_sock_unsubscribe, }, { - .o_name = NNG_OPT_RECVFD, - .o_get = sub0_sock_get_recv_fd, - }, - { .o_name = NNG_OPT_RECVBUF, .o_get = sub0_sock_get_recv_buf_len, .o_set = sub0_sock_set_recv_buf_len, @@ -719,14 +710,15 @@ static nni_option sub0_sock_options[] = { }; static nni_proto_sock_ops sub0_sock_ops = { - .sock_size = sizeof(sub0_sock), - .sock_init = sub0_sock_init, - .sock_fini = sub0_sock_fini, - .sock_open = sub0_sock_open, - .sock_close = sub0_sock_close, - .sock_send = sub0_sock_send, - .sock_recv = sub0_sock_recv, - .sock_options = sub0_sock_options, + .sock_size = sizeof(sub0_sock), + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_recv_poll_fd = sub0_sock_get_recv_fd, + .sock_options = sub0_sock_options, }; static nni_proto sub0_proto = { diff --git a/src/sp/protocol/pubsub0/sub_test.c b/src/sp/protocol/pubsub0/sub_test.c index a332a0a7..ef38042a 100644 --- a/src/sp/protocol/pubsub0/sub_test.c +++ b/src/sp/protocol/pubsub0/sub_test.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include <nuts.h> static void @@ -70,7 +71,7 @@ test_sub_not_writeable(void) nng_socket sub; NUTS_PASS(nng_sub0_open(&sub)); - NUTS_FAIL(nng_socket_get_int(sub, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_socket_get_send_poll_fd(sub, &fd), NNG_ENOTSUP); NUTS_CLOSE(sub); } @@ -86,7 +87,7 @@ test_sub_poll_readable(void) NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "a", 1)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd)); NUTS_TRUE(fd >= 0); // Not readable if not connected! @@ -130,7 +131,7 @@ test_sub_recv_late(void) NUTS_PASS(nng_socket_set(sub, NNG_OPT_SUB_SUBSCRIBE, "", 0)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd)); NUTS_TRUE(fd >= 0); // Not readable if not connected! @@ -162,21 +163,6 @@ test_sub_recv_late(void) } void -test_sub_context_no_poll(void) -{ - int fd; - nng_socket sub; - nng_ctx ctx; - - NUTS_PASS(nng_sub0_open(&sub)); - NUTS_PASS(nng_ctx_open(&ctx, sub)); - NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); - NUTS_FAIL(nng_ctx_get_int(ctx, NNG_OPT_RECVFD, &fd), NNG_ENOTSUP); - NUTS_PASS(nng_ctx_close(ctx)); - NUTS_CLOSE(sub); -} - -void test_sub_validate_peer(void) { nng_socket s1, s2; @@ -603,7 +589,6 @@ TEST_LIST = { { "sub context cannot send", test_sub_context_cannot_send }, { "sub not writeable", test_sub_not_writeable }, { "sub poll readable", test_sub_poll_readable }, - { "sub context does not poll", test_sub_context_no_poll }, { "sub validate peer", test_sub_validate_peer }, { "sub recv late", test_sub_recv_late }, { "sub recv ctx closed", test_sub_recv_ctx_closed }, diff --git a/src/sp/protocol/pubsub0/xsub_test.c b/src/sp/protocol/pubsub0/xsub_test.c index 9f2be2be..b8f303f3 100644 --- a/src/sp/protocol/pubsub0/xsub_test.c +++ b/src/sp/protocol/pubsub0/xsub_test.c @@ -7,6 +7,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include <nuts.h> static void @@ -47,7 +48,7 @@ test_xsub_not_writeable(void) nng_socket sub; NUTS_PASS(nng_sub0_open_raw(&sub)); - NUTS_FAIL(nng_socket_get_int(sub, NNG_OPT_SENDFD, &fd), NNG_ENOTSUP); + NUTS_FAIL(nng_socket_get_send_poll_fd(sub, &fd), NNG_ENOTSUP); NUTS_CLOSE(sub); } @@ -62,7 +63,7 @@ test_xsub_poll_readable(void) NUTS_PASS(nng_pub0_open(&pub)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd)); NUTS_TRUE(fd >= 0); // Not readable if not connected! @@ -101,7 +102,7 @@ test_xsub_recv_late(void) NUTS_PASS(nng_aio_alloc(&aio, NULL, NULL)); NUTS_PASS(nng_socket_set_ms(sub, NNG_OPT_RECVTIMEO, 1000)); NUTS_PASS(nng_socket_set_ms(pub, NNG_OPT_SENDTIMEO, 1000)); - NUTS_PASS(nng_socket_get_int(sub, NNG_OPT_RECVFD, &fd)); + NUTS_PASS(nng_socket_get_recv_poll_fd(sub, &fd)); NUTS_TRUE(fd >= 0); // Not readable if not connected! |
