diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-20 20:52:32 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-24 15:06:33 -0700 |
| commit | fdefff742662ed4eb476bf19b9dda245f86bc406 (patch) | |
| tree | a4e132716debd64e434478f8814f368db052cbc6 /src/protocol/survey0/respond.c | |
| parent | e0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff) | |
| download | nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2 nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip | |
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error
fixes #361 nng_send_aio should check for NULL message
fixes #362 nni_msgq does not signal pollable on certain events
This adds support for contexts for both sides of the surveyor pattern.
Prior to this commit, the raw mode was completely broken, and there
were numerous other bugs found and fixed. This integration includes
*much* deeper validation of this pattern.
Some changes to the core and other patterns have been made, where it
was obvioius that we could make such improvements. (The obviousness
stemming from the fact that RESPONDENT in particular is very closely
derived from REP.)
Diffstat (limited to 'src/protocol/survey0/respond.c')
| -rw-r--r-- | src/protocol/survey0/respond.c | 644 |
1 files changed, 411 insertions, 233 deletions
diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index 1605d9e6..60cf188a 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -28,49 +28,212 @@ typedef struct resp0_pipe resp0_pipe; typedef struct resp0_sock resp0_sock; +typedef struct resp0_ctx resp0_ctx; -static void resp0_recv_cb(void *); -static void resp0_putq_cb(void *); -static void resp0_getq_cb(void *); -static void resp0_send_cb(void *); -static void resp0_sock_getq_cb(void *); +static void resp0_pipe_send_cb(void *); +static void resp0_pipe_recv_cb(void *); static void resp0_pipe_fini(void *); +struct resp0_ctx { + resp0_sock * sock; + char * btrace; + size_t btrace_len; + size_t btrace_size; + uint32_t pipe_id; + resp0_pipe * spipe; // send pipe + nni_aio * saio; // send aio + nni_aio * raio; // recv aio + nni_list_node sqnode; + nni_list_node rqnode; +}; + // resp0_sock is our per-socket protocol private structure. struct resp0_sock { - nni_msgq * urq; - nni_msgq * uwq; - int ttl; - nni_idhash *pipes; - char * btrace; - size_t btrace_len; - nni_aio * aio_getq; - nni_mtx mtx; + nni_mtx mtx; + int ttl; + nni_idhash * pipes; + resp0_ctx * ctx; + nni_list recvpipes; + nni_list recvq; + nni_pollable *recvable; + nni_pollable *sendable; }; // resp0_pipe is our per-pipe protocol private structure. struct resp0_pipe { - nni_pipe * npipe; - resp0_sock *psock; - uint32_t id; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_putq; - nni_aio * aio_send; - nni_aio * aio_recv; + nni_pipe * npipe; + resp0_sock * psock; + bool busy; + uint32_t id; + nni_list sendq; // contexts waiting to send + nni_aio * aio_send; + nni_aio * aio_recv; + nni_list_node rnode; // receivable linkage }; static void +resp0_ctx_close(void *arg) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + nni_aio * aio; + + // complete any outstanding operations here, cancellation, etc. + + nni_mtx_lock(&s->mtx); + if ((aio = ctx->saio) != NULL) { + resp0_pipe *p = ctx->spipe; + ctx->saio = NULL; + ctx->spipe = NULL; + nni_list_remove(&p->sendq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_list_remove(&s->recvq, ctx); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->mtx); +} + +static void +resp0_ctx_fini(void *arg) +{ + resp0_ctx *ctx = arg; + + resp0_ctx_close(ctx); + nni_free(ctx->btrace, ctx->btrace_size); + NNI_FREE_STRUCT(ctx); +} + +static int +resp0_ctx_init(void **ctxp, void *sarg) +{ + resp0_sock *s = sarg; + resp0_ctx * ctx; + + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + // this is 1kB, which covers the worst case. + ctx->btrace_size = 256 * sizeof(uint32_t); + if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) { + NNI_FREE_STRUCT(ctx); + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&ctx->sqnode); + // XXX: NNI_LIST_NODE_INIT(&ctx->rqnode); + ctx->btrace_len = 0; + ctx->sock = s; + ctx->pipe_id = 0; + *ctxp = ctx; + + return (0); +} + +static void +resp0_ctx_cancel_send(nni_aio *aio, int rv) +{ + resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->saio != aio) { + nni_mtx_unlock(&s->mtx); + return; + } + nni_list_node_remove(&ctx->sqnode); + ctx->saio = NULL; + nni_mtx_unlock(&s->mtx); + nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers + nni_aio_finish_error(aio, rv); +} + +static void +resp0_ctx_send(void *arg, nni_aio *aio) +{ + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + nni_msg * msg; + size_t len; + uint32_t pid; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + + if (ctx == s->ctx) { + // We can't send anymore, because only one send per request. + nni_pollable_clear(s->sendable); + } + + nni_mtx_lock(&s->mtx); + + if ((len = ctx->btrace_len) == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + pid = ctx->pipe_id; + ctx->pipe_id = 0; + ctx->btrace_len = 0; + + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + if (nni_idhash_find(s->pipes, pid, (void **) &p) != 0) { + // Surveyor has left the building. Just discard the reply. + nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + return; + } + + if (!p->busy) { + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, len); + return; + } + + if ((rv = nni_aio_schedule_verify(aio, resp0_ctx_cancel_send, ctx)) != + 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + ctx->saio = aio; + ctx->spipe = p; + nni_list_append(&p->sendq, ctx); + nni_mtx_unlock(&s->mtx); +} + +static void resp0_sock_fini(void *arg) { resp0_sock *s = arg; - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); + if (s->ctx != NULL) { + resp0_ctx_fini(s->ctx); } + nni_pollable_free(s->sendable); + nni_pollable_free(s->recvable); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); } @@ -81,22 +244,34 @@ resp0_sock_init(void **sp, nni_sock *nsock) resp0_sock *s; int rv; + NNI_ARG_UNUSED(nsock); + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, resp0_sock_getq_cb, s)) != 0)) { + if ((rv = nni_idhash_init(&s->pipes)) != 0) { resp0_sock_fini(s); return (rv); } - s->ttl = 8; // Per RFC - s->btrace = NULL; - s->btrace_len = 0; - s->urq = nni_sock_recvq(nsock); - s->uwq = nni_sock_sendq(nsock); + NNI_LIST_INIT(&s->recvq, resp0_ctx, rqnode); + NNI_LIST_INIT(&s->recvpipes, resp0_pipe, rnode); + + s->ttl = 8; // Per RFC + + if ((rv = resp0_ctx_init((void **) &s->ctx, s)) != 0) { + resp0_ctx_fini(s); + return (rv); + } + // We start off without being either readable or pollable. + // Readability comes when there is something on the socket. + if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || + ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { + resp0_sock_fini(s); + return (rv); + } *sp = s; return (0); } @@ -104,9 +279,7 @@ resp0_sock_init(void **sp, nni_sock *nsock) static void resp0_sock_open(void *arg) { - resp0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void @@ -114,7 +287,7 @@ resp0_sock_close(void *arg) { resp0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + resp0_ctx_close(s->ctx); } static void @@ -122,11 +295,8 @@ resp0_pipe_fini(void *arg) { resp0_pipe *p = arg; - nni_aio_fini(p->aio_putq); - nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_msgq_fini(p->sendq); NNI_FREE_STRUCT(p); } @@ -139,18 +309,20 @@ resp0_pipe_init(void **pp, nni_pipe *npipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, resp0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, resp0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, resp0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, resp0_send_cb, p)) != 0)) { + if (((rv = nni_aio_init(&p->aio_recv, resp0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, resp0_pipe_send_cb, p)) != 0)) { resp0_pipe_fini(p); return (rv); } + NNI_LIST_INIT(&p->sendq, resp0_ctx, sqnode); + p->npipe = npipe; p->psock = s; - *pp = p; + p->busy = false; + p->id = nni_pipe_id(npipe); + + *pp = p; return (0); } @@ -161,8 +333,6 @@ resp0_pipe_start(void *arg) resp0_sock *s = p->psock; int rv; - p->id = nni_pipe_id(p->npipe); - nni_mtx_lock(&s->mtx); rv = nni_idhash_insert(s->pipes, p->id, p); nni_mtx_unlock(&s->mtx); @@ -171,8 +341,6 @@ resp0_pipe_start(void *arg) } nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); - return (rv); } @@ -181,139 +349,179 @@ resp0_pipe_stop(void *arg) { resp0_pipe *p = arg; resp0_sock *s = p->psock; + resp0_ctx * ctx; + + nni_mtx_lock(&s->mtx); + while ((ctx = nni_list_first(&p->sendq)) != NULL) { + nni_aio *aio; + nni_msg *msg; + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } + if (p->id == s->ctx->pipe_id) { + // Make sure user space knows they can send a message to us, + // which we will happily discard. + nni_pollable_raise(s->sendable); + } + nni_idhash_remove(s->pipes, p->id); + nni_mtx_unlock(&s->mtx); - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_putq); - nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - - if (p->id != 0) { - nni_mtx_lock(&s->mtx); - nni_idhash_remove(s->pipes, p->id); - nni_mtx_unlock(&s->mtx); - p->id = 0; - } } -// resp0_sock_send watches for messages from the upper write queue, -// extracts the destination pipe, and forwards it to the appropriate -// destination pipe via a separate queue. This prevents a single bad -// or slow pipe from gumming up the works for the entire socket.s - -void -resp0_sock_getq_cb(void *arg) +static void +resp0_pipe_send_cb(void *arg) { - resp0_sock *s = arg; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; + nni_aio * aio; nni_msg * msg; - uint32_t id; - resp0_pipe *p; - int rv; + size_t len; - if (nni_aio_result(s->aio_getq) != 0) { + nni_mtx_lock(&s->mtx); + p->busy = false; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->npipe); + nni_mtx_unlock(&s->mtx); return; } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // We yank the outgoing pipe id from the header - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - // We can't really close down the socket, so just keep going. - nni_msgq_aio_get(s->uwq, s->aio_getq); + if ((ctx = nni_list_first(&p->sendq)) == NULL) { + // Nothing else to send. + if (p->id == s->ctx->pipe_id) { + // Mark us ready for the other side to send! + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->mtx); return; } - id = nni_msg_header_trim_u32(msg); - nni_mtx_lock(&s->mtx); - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) != 0) { - // Destination pipe not present. - nni_msg_free(msg); - } else { - // Non-blocking put. - if (nni_msgq_tryput(p->sendq, msg) != 0) { - nni_msg_free(msg); - } - } - nni_msgq_aio_get(s->uwq, s->aio_getq); + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + ctx->spipe = NULL; + p->busy = true; + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->npipe, p->aio_send); + nni_mtx_unlock(&s->mtx); + + nni_aio_finish_synch(aio, 0, len); } -void -resp0_getq_cb(void *arg) +static void +resp0_cancel_recv(nni_aio *aio, int rv) { - resp0_pipe *p = arg; + resp0_ctx * ctx = nni_aio_get_prov_data(aio); + resp0_sock *s = ctx->sock; - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->npipe); - return; + nni_mtx_lock(&s->mtx); + if (ctx->raio == aio) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, rv); } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->npipe, p->aio_send); + nni_mtx_unlock(&s->mtx); } -void -resp0_send_cb(void *arg) +static void +resp0_ctx_recv(void *arg, nni_aio *aio) { - resp0_pipe *p = arg; + resp0_ctx * ctx = arg; + resp0_sock *s = ctx->sock; + resp0_pipe *p; + size_t len; + nni_msg * msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->npipe); + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&s->mtx); + if ((p = nni_list_first(&s->recvpipes)) == NULL) { + int rv; + rv = nni_aio_schedule_verify(aio, resp0_cancel_recv, ctx); + if (rv != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; + } + ctx->raio = aio; + nni_list_append(&s->recvq, ctx); + nni_mtx_unlock(&s->mtx); return; } + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_list_remove(&s->recvpipes, p); + if (nni_list_empty(&s->recvpipes)) { + nni_pollable_clear(s->recvable); + } + nni_pipe_recv(p->npipe, p->aio_recv); - nni_msgq_aio_get(p->sendq, p->aio_getq); + len = nni_msg_header_len(msg); + memcpy(ctx->btrace, nni_msg_header(msg), len); + ctx->btrace_len = len; + ctx->pipe_id = p->id; + if (ctx == s->ctx) { + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->mtx); + + nni_msg_header_clear(msg); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void -resp0_recv_cb(void *arg) +resp0_pipe_recv_cb(void *arg) { - resp0_pipe *p = arg; - resp0_sock *s = p->psock; - nni_msgq * urq = s->urq; + resp0_pipe *p = arg; + resp0_sock *s = p->psock; + resp0_ctx * ctx; nni_msg * msg; + nni_aio * aio; int hops; - int rv; + size_t len; if (nni_aio_result(p->aio_recv) != 0) { - goto error; + nni_pipe_stop(p->npipe); + return; } msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); - // Store the pipe id in the header, first thing. - if (nni_msg_header_append_u32(msg, p->id) != 0) { - nni_msg_free(msg); - goto error; - } - // Move backtrace from body to header - hops = 0; + hops = 1; for (;;) { - int end = 0; + bool end = 0; uint8_t *body; - if (hops >= s->ttl) { - nni_msg_free(msg); - goto error; + if (hops > s->ttl) { + goto drop; } + hops++; if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage, kick it. nni_msg_free(msg); - goto error; + nni_pipe_stop(p->npipe); + return; } body = nni_msg_body(msg); - end = (body[0] & 0x80) ? 1 : 0; - rv = nni_msg_header_append(msg, body, 4); - if (rv != 0) { - nni_msg_free(msg); - goto error; + end = ((body[0] & 0x80) != 0); + if (nni_msg_header_append(msg, body, 4) != 0) { + goto drop; } nni_msg_trim(msg, 4); if (end) { @@ -321,26 +529,41 @@ resp0_recv_cb(void *arg) } } - // Now send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(urq, p->aio_putq); - return; + len = nni_msg_header_len(msg); -error: - nni_pipe_stop(p->npipe); -} + nni_mtx_lock(&s->mtx); + if ((ctx = nni_list_first(&s->recvq)) == NULL) { + // No one blocked in recv, stall. + nni_list_append(&s->recvpipes, p); + nni_pollable_raise(s->recvable); + nni_mtx_unlock(&s->mtx); + return; + } -static void -resp0_putq_cb(void *arg) -{ - resp0_pipe *p = arg; + nni_list_remove(&s->recvq, ctx); + aio = ctx->raio; + ctx->raio = NULL; + nni_aio_set_msg(p->aio_recv, NULL); - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); + // Start the next receive. + nni_pipe_recv(p->npipe, p->aio_recv); + + ctx->btrace_len = len; + memcpy(ctx->btrace, nni_msg_header(msg), len); + nni_msg_header_clear(msg); + ctx->pipe_id = p->id; + + if ((ctx == s->ctx) && (!p->busy)) { + nni_pollable_raise(s->sendable); } + nni_mtx_unlock(&s->mtx); + + nni_aio_set_msg(aio, msg); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + return; +drop: + nni_msg_free(msg); nni_pipe_recv(p->npipe, p->aio_recv); } @@ -358,76 +581,38 @@ resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) return (nni_copyout_int(s->ttl, buf, szp, typ)); } -static void -resp0_sock_send_raw(void *arg, nni_aio *aio) +static int +resp0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) { resp0_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_put(s->uwq, aio); + if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + return (rv); + } + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -resp0_sock_send(void *arg, nni_aio *aio) +static int +resp0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { resp0_sock *s = arg; - nni_msg * msg; int rv; + int fd; - nni_mtx_lock(&s->mtx); - - msg = nni_aio_get_msg(aio); - - // If we have a stored backtrace, append it to the header... - // if we don't have a backtrace, discard the message. - if (s->btrace == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - - // drop anything else in the header... - nni_msg_header_clear(msg); - - if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; + if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + return (rv); } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); + return (nni_copyout_int(fd, buf, szp, typ)); } -static nni_msg * -resp0_sock_filter(void *arg, nni_msg *msg) +static void +resp0_sock_send(void *arg, nni_aio *aio) { resp0_sock *s = arg; - char * header; - size_t len; - - nni_mtx_lock(&s->mtx); - len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - } - if ((s->btrace = nni_alloc(len)) == NULL) { - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - s->btrace_len = len; - memcpy(s->btrace, header, len); - nni_msg_header_clear(msg); - nni_mtx_unlock(&s->mtx); - return (msg); + resp0_ctx_send(s->ctx, aio); } static void @@ -435,7 +620,7 @@ resp0_sock_recv(void *arg, nni_aio *aio) { resp0_sock *s = arg; - nni_msgq_aio_get(s->urq, aio); + resp0_ctx_recv(s->ctx, aio); } static nni_proto_pipe_ops resp0_pipe_ops = { @@ -445,6 +630,13 @@ static nni_proto_pipe_ops resp0_pipe_ops = { .pipe_stop = resp0_pipe_stop, }; +static nni_proto_ctx_ops resp0_ctx_ops = { + .ctx_init = resp0_ctx_init, + .ctx_fini = resp0_ctx_fini, + .ctx_send = resp0_ctx_send, + .ctx_recv = resp0_ctx_recv, +}; + static nni_proto_sock_option resp0_sock_options[] = { { .pso_name = NNG_OPT_MAXTTL, @@ -452,6 +644,18 @@ static nni_proto_sock_option resp0_sock_options[] = { .pso_getopt = resp0_sock_getopt_maxttl, .pso_setopt = resp0_sock_setopt_maxttl, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = resp0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = resp0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -463,39 +667,19 @@ static nni_proto_sock_ops resp0_sock_ops = { .sock_fini = resp0_sock_fini, .sock_open = resp0_sock_open, .sock_close = resp0_sock_close, - .sock_filter = resp0_sock_filter, .sock_send = resp0_sock_send, .sock_recv = resp0_sock_recv, .sock_options = resp0_sock_options, }; -static nni_proto_sock_ops resp0_sock_ops_raw = { - .sock_init = resp0_sock_init, - .sock_fini = resp0_sock_fini, - .sock_open = resp0_sock_open, - .sock_close = resp0_sock_close, - .sock_filter = NULL, // no filter for raw - .sock_send = resp0_sock_send_raw, - .sock_recv = resp0_sock_recv, - .sock_options = resp0_sock_options, -}; - static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &resp0_sock_ops, .proto_pipe_ops = &resp0_pipe_ops, -}; - -static nni_proto resp0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &resp0_sock_ops_raw, - .proto_pipe_ops = &resp0_pipe_ops, + .proto_ctx_ops = &resp0_ctx_ops, }; int @@ -503,9 +687,3 @@ nng_respondent0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &resp0_proto)); } - -int -nng_respondent0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &resp0_proto_raw)); -} |
