diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-10-20 17:03:12 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-10-23 16:14:53 -0700 |
| commit | 3585000ca027740dbdb4599f4991cd2bf562e2f2 (patch) | |
| tree | a45b4c1bcc2d11777dde0e38d4b742d121d55e45 /src/protocol/survey | |
| parent | fdb73b69a887d868f8e976ef8a990a5d7f6687f9 (diff) | |
| download | nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.gz nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.tar.bz2 nng-3585000ca027740dbdb4599f4991cd2bf562e2f2.zip | |
fixes #112 Need to move some stuff from socket to message queues
Diffstat (limited to 'src/protocol/survey')
| -rw-r--r-- | src/protocol/survey/respond.c | 66 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 64 |
2 files changed, 87 insertions, 43 deletions
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index fcb067b0..4c3ea8e3 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -77,6 +77,7 @@ resp_sock_init(void **sp, nni_sock *nsock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { resp_sock_fini(s); @@ -91,10 +92,7 @@ resp_sock_init(void **sp, nni_sock *nsock) s->urq = nni_sock_recvq(nsock); s->uwq = nni_sock_sendq(nsock); - nni_mtx_init(&s->mtx); - *sp = s; - nni_sock_senderr(nsock, NNG_ESTATE); return (0); } @@ -349,9 +347,9 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) resp_sock *s = arg; int rv; - if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { - nni_sock_senderr(s->nsock, s->raw ? 0 : NNG_ESTATE); - } + nni_mtx_lock(&s->mtx); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->mtx); return (rv); } @@ -376,54 +374,62 @@ resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->ttl, buf, szp)); } -static nni_msg * -resp_sock_sfilter(void *arg, nni_msg *msg) +static void +resp_sock_send(void *arg, nni_aio *aio) { resp_sock *s = arg; + nni_msg * msg; + int rv; + nni_mtx_lock(&s->mtx); if (s->raw) { - return (msg); + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); + return; } - // Cannot send again until a receive is done... - nni_sock_senderr(s->nsock, NNG_ESTATE); + msg = nni_aio_get_msg(aio); // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. if (s->btrace == NULL) { - nni_msg_free(msg); - return (NULL); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } // drop anything else in the header... nni_msg_header_clear(msg); - if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - nni_msg_free(msg); - return (NULL); + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } nni_free(s->btrace, s->btrace_len); s->btrace = NULL; s->btrace_len = 0; - return (msg); + + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); } static nni_msg * -resp_sock_rfilter(void *arg, nni_msg *msg) +resp_sock_filter(void *arg, nni_msg *msg) { resp_sock *s = arg; char * header; size_t len; + nni_mtx_lock(&s->mtx); if (s->raw) { + nni_mtx_unlock(&s->mtx); return (msg); } - nni_sock_senderr(s->nsock, 0); len = nni_msg_header_len(msg); header = nni_msg_header(msg); if (s->btrace != NULL) { @@ -432,15 +438,26 @@ resp_sock_rfilter(void *arg, nni_msg *msg) s->btrace_len = 0; } if ((s->btrace = nni_alloc(len)) == NULL) { + nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } s->btrace_len = len; memcpy(s->btrace, header, len); nni_msg_header_clear(msg); + nni_mtx_unlock(&s->mtx); return (msg); } +static void +resp_sock_recv(void *arg, nni_aio *aio) +{ + resp_sock *s = arg; + + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops resp_pipe_ops = { .pipe_init = resp_pipe_init, .pipe_fini = resp_pipe_fini, @@ -468,9 +485,10 @@ static nni_proto_sock_ops resp_sock_ops = { .sock_fini = resp_sock_fini, .sock_open = resp_sock_open, .sock_close = resp_sock_close, + .sock_filter = resp_sock_filter, + .sock_send = resp_sock_send, + .sock_recv = resp_sock_recv, .sock_options = resp_sock_options, - .sock_rfilter = resp_sock_rfilter, - .sock_sfilter = resp_sock_sfilter, }; static nni_proto resp_proto = { diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 1205402e..1c15054f 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -92,7 +92,6 @@ surv_sock_init(void **sp, nni_sock *nsock) s->urq = nni_sock_recvq(nsock); *sp = s; - nni_sock_recverr(nsock, NNG_ESTATE); return (0); } @@ -273,11 +272,12 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) surv_sock *s = arg; int rv; + nni_mtx_lock(&s->mtx); if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { - nni_sock_recverr(s->nsock, s->raw ? 0 : NNG_ESTATE); s->survid = 0; nni_timer_cancel(&s->timer); } + nni_mtx_unlock(&s->mtx); return (rv); } @@ -344,22 +344,43 @@ surv_timeout(void *arg) { surv_sock *s = arg; - nni_sock_lock(s->nsock); + nni_mtx_lock(&s->mtx); s->survid = 0; - nni_sock_recverr(s->nsock, NNG_ESTATE); + nni_mtx_unlock(&s->mtx); nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); - nni_sock_unlock(s->nsock); } -static nni_msg * -surv_sock_sfilter(void *arg, nni_msg *msg) +static void +surv_sock_recv(void *arg, nni_aio *aio) +{ + surv_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->survid == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + nni_mtx_unlock(&s->mtx); + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + +static void +surv_sock_send(void *arg, nni_aio *aio) { surv_sock *s = arg; + nni_msg * msg; + int rv; + nni_mtx_lock(&s->mtx); if (s->raw) { // No automatic retry, and the request ID must // be in the header coming down. - return (msg); + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); + return; } // Generate a new request ID. We always set the high @@ -367,10 +388,12 @@ surv_sock_sfilter(void *arg, nni_msg *msg) // backtrace. (Pipe IDs have the high order bit clear.) s->survid = (s->nextid++) | 0x80000000u; - if (nni_msg_header_append_u32(msg, s->survid) != 0) { - // Should be ENOMEM. - nni_msg_free(msg); - return (NULL); + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } // If another message is there, this cancels it. We move the @@ -379,20 +402,21 @@ surv_sock_sfilter(void *arg, nni_msg *msg) s->expire = nni_clock() + s->survtime; nni_timer_schedule(&s->timer, s->expire); - // Clear the error condition. - nni_sock_recverr(s->nsock, 0); - // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0); + nni_mtx_unlock(&s->mtx); - return (msg); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); } static nni_msg * -surv_sock_rfilter(void *arg, nni_msg *msg) +surv_sock_filter(void *arg, nni_msg *msg) { surv_sock *s = arg; + nni_mtx_lock(&s->mtx); if (s->raw) { // Pass it unmolested + nni_mtx_unlock(&s->mtx); return (msg); } @@ -402,6 +426,7 @@ surv_sock_rfilter(void *arg, nni_msg *msg) nni_msg_free(msg); return (NULL); } + nni_mtx_unlock(&s->mtx); return (msg); } @@ -433,9 +458,10 @@ static nni_proto_sock_ops surv_sock_ops = { .sock_fini = surv_sock_fini, .sock_open = surv_sock_open, .sock_close = surv_sock_close, + .sock_send = surv_sock_send, + .sock_recv = surv_sock_recv, + .sock_filter = surv_sock_filter, .sock_options = surv_sock_options, - .sock_rfilter = surv_sock_rfilter, - .sock_sfilter = surv_sock_sfilter, }; static nni_proto surv_proto = { |
