diff options
| author | Garrett D'Amore <garrett@damore.org> | 2020-10-25 22:23:28 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-25 22:23:28 -0700 |
| commit | 4e7205cedb5f631d0fbbe72dbf89b5b9205a6260 (patch) | |
| tree | ee0d0a5009d31247520fafa1f3e6334d60a664c2 /src/protocol/reqrep0 | |
| parent | 206de0ad6f338c7e375885443836fbb44853550b (diff) | |
| download | nng-4e7205cedb5f631d0fbbe72dbf89b5b9205a6260.tar.gz nng-4e7205cedb5f631d0fbbe72dbf89b5b9205a6260.tar.bz2 nng-4e7205cedb5f631d0fbbe72dbf89b5b9205a6260.zip | |
fixes #1304 Non-blocking send on rep sockets always fails (#1305)
Diffstat (limited to 'src/protocol/reqrep0')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 12 | ||||
| -rw-r--r-- | src/protocol/reqrep0/rep_test.c | 101 |
2 files changed, 103 insertions, 10 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 6f859ee6..86593c26 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -161,12 +161,6 @@ rep0_ctx_send(void *arg, nni_aio *aio) // reply for the single request we got. nni_pollable_clear(&s->writable); } - if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - if (len == 0) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ESTATE); @@ -199,6 +193,12 @@ rep0_ctx_send(void *arg, nni_aio *aio) return; } + if ((rv = nni_aio_schedule(aio, rep0_ctx_cancel_send, ctx)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + ctx->saio = aio; ctx->spipe = p; nni_list_append(&p->sendq, ctx); diff --git a/src/protocol/reqrep0/rep_test.c b/src/protocol/reqrep0/rep_test.c index 367ec66e..dcdee423 100644 --- a/src/protocol/reqrep0/rep_test.c +++ b/src/protocol/reqrep0/rep_test.c @@ -434,12 +434,12 @@ test_rep_ctx_send_nonblock(void) nng_socket req; nng_ctx ctx; nng_aio * aio; - nng_msg *msg; + nng_msg * msg; TEST_NNG_PASS(nng_req0_open(&req)); TEST_NNG_PASS(nng_rep0_open(&rep)); TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000)); - TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 2000)); TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); TEST_NNG_PASS(nng_ctx_open(&ctx, rep)); TEST_NNG_PASS(nng_aio_alloc(&aio, NULL, NULL)); @@ -456,12 +456,103 @@ test_rep_ctx_send_nonblock(void) nng_ctx_send(ctx, aio); nng_aio_wait(aio); - TEST_NNG_FAIL(nng_aio_result(aio), NNG_ETIMEDOUT); + TEST_NNG_PASS(nng_aio_result(aio)); TEST_NNG_PASS(nng_close(rep)); TEST_NNG_PASS(nng_close(req)); nng_aio_free(aio); - nng_msg_free(msg); +} + +static void +test_rep_ctx_send_nonblock2(void) +{ + nng_socket rep; + nng_socket req; + nng_ctx rep_ctx[10]; + nng_aio * rep_aio[10]; + int num_good = 0; + int num_fail = 0; + + // We are going to send a bunch of requests, receive them, + // but then see that non-block pressure exerts for some, but + // that at least one non-blocking send works. + TEST_NNG_PASS(nng_req0_open_raw(&req)); + TEST_NNG_PASS(nng_rep0_open(&rep)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + for (int i = 0; i < 10; i++) { + TEST_NNG_PASS(nng_ctx_open(&rep_ctx[i], rep)); + TEST_NNG_PASS(nng_aio_alloc(&rep_aio[i], NULL, NULL)); + } + TEST_NNG_PASS(testutil_marry(req, rep)); + + for (int i = 0; i < 10; i++) { + nng_msg *msg; + TEST_NNG_PASS(nng_msg_alloc(&msg, 4)); + TEST_NNG_PASS( + nng_msg_append_u32(msg, (unsigned) i | 0x80000000u)); + nng_ctx_recv(rep_ctx[i], rep_aio[i]); + TEST_NNG_PASS(nng_sendmsg(req, msg, 0)); + } + for (int i = 0; i < 10; i++) { + nng_msg *msg; + nng_aio_wait(rep_aio[i]); + TEST_NNG_PASS(nng_aio_result(rep_aio[i])); + msg = nng_aio_get_msg(rep_aio[i]); + nng_aio_set_timeout(rep_aio[i], 0); + nng_aio_set_msg(rep_aio[i], msg); + nng_ctx_send(rep_ctx[i], rep_aio[i]); + } + + for (int i = 0; i < 10; i++) { + int rv; + nng_aio_wait(rep_aio[i]); + rv = nng_aio_result(rep_aio[i]); + if (rv == 0) { + num_good++; + } else { + TEST_NNG_FAIL(rv, NNG_ETIMEDOUT); + nng_msg_free(nng_aio_get_msg(rep_aio[i])); + num_fail++; + } + } + TEST_ASSERT(num_good > 0); + TEST_ASSERT(num_fail > 0); + + for (int i = 0; i < 10; i++) { + nng_aio_free(rep_aio[i]); + nng_ctx_close(rep_ctx[i]); + } + TEST_NNG_PASS(nng_close(rep)); + TEST_NNG_PASS(nng_close(req)); +} + +static void +test_rep_send_nonblock(void) +{ + nng_socket rep; + nng_socket req; + int rv; + + TEST_NNG_PASS(nng_req0_open(&req)); + TEST_NNG_PASS(nng_rep0_open(&rep)); + TEST_NNG_PASS(nng_setopt_ms(req, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_RECVTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(nng_setopt_ms(rep, NNG_OPT_SENDTIMEO, 1000)); + TEST_NNG_PASS(testutil_marry(req, rep)); + + TEST_NNG_SEND_STR(req, "SEND"); + TEST_NNG_RECV_STR(rep, "SEND"); + + // Use the nonblock flag + rv = nng_send(rep, "RECV", 5, NNG_FLAG_NONBLOCK); + + TEST_NNG_PASS(rv); + TEST_NNG_RECV_STR(req, "RECV"); + TEST_NNG_PASS(nng_close(rep)); + TEST_NNG_PASS(nng_close(req)); } void @@ -496,12 +587,14 @@ TEST_LIST = { { "rep context does not poll", test_rep_context_no_poll }, { "rep validate peer", test_rep_validate_peer }, { "rep double recv", test_rep_double_recv }, + { "rep send nonblock", test_rep_send_nonblock }, { "rep close pipe before send", test_rep_close_pipe_before_send }, { "rep close pipe during send", test_rep_close_pipe_during_send }, { "rep recv aio ctx stopped", test_rep_ctx_recv_aio_stopped }, { "rep close pipe context send", test_rep_close_pipe_context_send }, { "rep close context send", test_rep_close_context_send }, { "rep context send nonblock", test_rep_ctx_send_nonblock }, + { "rep context send nonblock 2", test_rep_ctx_send_nonblock2 }, { "rep context recv nonblock", test_rep_ctx_recv_nonblock }, { "rep recv garbage", test_rep_recv_garbage }, { NULL, NULL }, |
