diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-04 13:36:54 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-10 15:40:00 -0700 |
| commit | 5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 (patch) | |
| tree | 39debf4ecde234b2a0be19c9cb15628cc32c2edb /src/protocol/reqrep0/rep.c | |
| parent | 56f1bf30e61c53646dd2f8425da7c7fa0d97b3e1 (diff) | |
| download | nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.gz nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.tar.bz2 nng-5f7289e1f8e1427c9214c8e3e96ad56b1f868d53.zip | |
fixes #334 Separate context for state machines from sockets
This provides context support for REQ and REP sockets.
More discussion around this is in the issue itself.
Optionally we would like to extend this to the surveyor pattern.
Note that we specifically do not support pollable descriptors
for non-default contexts, and the results of using file descriptors
for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined.
In the future, it might be nice to figure out how to factor in
optional use of a message queue for users who want more buffering,
but we think there is little need for this with cooked mode.
Diffstat (limited to 'src/protocol/reqrep0/rep.c')
| -rw-r--r-- | src/protocol/reqrep0/rep.c | 621 |
1 files changed, 412 insertions, 209 deletions
diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 78a1f2ee..e512c18b 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -28,48 +28,219 @@ typedef struct rep0_pipe rep0_pipe; typedef struct rep0_sock rep0_sock; +typedef struct rep0_ctx rep0_ctx; -static void rep0_sock_getq_cb(void *); -static void rep0_pipe_getq_cb(void *); -static void rep0_pipe_putq_cb(void *); static void rep0_pipe_send_cb(void *); static void rep0_pipe_recv_cb(void *); static void rep0_pipe_fini(void *); +struct rep0_ctx { + rep0_sock * sock; + bool closed; + char * btrace; + size_t btrace_len; + size_t btrace_size; + int ttl; + uint32_t pipe_id; + nni_aio * saio; // send aio + nni_aio * raio; // recv aio + nni_list_node sqnode; + nni_list_node rqnode; +}; + // rep0_sock is our per-socket protocol private structure. struct rep0_sock { - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx lk; - int ttl; - nni_idhash *pipes; - char * btrace; - size_t btrace_len; - nni_aio * aio_getq; + nni_mtx lk; + int ttl; + nni_idhash * pipes; + nni_list recvpipes; // list of pipes with data to receive + nni_list recvq; + bool closed; + rep0_ctx * ctx; + nni_pollable *recvable; + nni_pollable *sendable; }; // rep0_pipe is our per-pipe protocol private structure. struct rep0_pipe { - nni_pipe * pipe; - rep0_sock *rep; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; + nni_pipe * pipe; + rep0_sock * rep; + uint32_t id; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_list_node rnode; // receivable list linkage + nni_list sendq; // contexts waiting to send + bool busy; }; static void +rep0_ctx_close(void *arg) +{ + rep0_ctx * ctx = arg; + rep0_sock *s = ctx->sock; + nni_aio * aio; + + nni_mtx_lock(&s->lk); + ctx->closed = true; + if ((aio = ctx->saio) != NULL) { + nni_msg *msg; + nni_list_node_remove(&ctx->sqnode); + msg = nni_aio_get_msg(aio); + nni_msg_free(msg); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if ((aio = ctx->raio) != NULL) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->lk); +} + +static void +rep0_ctx_fini(void *arg) +{ + rep0_ctx *ctx = arg; + + rep0_ctx_close(ctx); + nni_free(ctx->btrace, ctx->btrace_size); + NNI_FREE_STRUCT(ctx); +} + +static int +rep0_ctx_init(void **ctxp, void *sarg) +{ + rep0_sock *s = sarg; + rep0_ctx * ctx; + + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + // this is 1kB, which covers the worst case. + ctx->btrace_size = 256 * sizeof(uint32_t); + if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) { + NNI_FREE_STRUCT(ctx); + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&ctx->sqnode); + NNI_LIST_NODE_INIT(&ctx->rqnode); + ctx->btrace_len = 0; + ctx->sock = s; + ctx->pipe_id = 0; + *ctxp = ctx; + + return (0); +} + +static void +rep0_ctx_cancel_send(nni_aio *aio, int rv) +{ + rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_sock *s = ctx->sock; + + nni_mtx_lock(&s->lk); + if (ctx->saio != aio) { + nni_mtx_unlock(&s->lk); + return; + } + nni_list_node_remove(&ctx->sqnode); + ctx->saio = NULL; + nni_mtx_unlock(&s->lk); + + nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers + nni_aio_finish_error(aio, rv); +} + +static void +rep0_ctx_send(void *arg, nni_aio *aio) +{ + rep0_ctx * ctx = arg; + rep0_sock *s = ctx->sock; + rep0_pipe *p; + nni_msg * msg; + int rv; + size_t len; + uint32_t p_id; // pipe id + + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + + nni_mtx_lock(&s->lk); + len = ctx->btrace_len; + p_id = ctx->pipe_id; + + // Assert "completion" of the previous req request. This ensures + // exactly one send for one receive ordering. + ctx->btrace_len = 0; + ctx->pipe_id = 0; + + if (ctx == s->ctx) { + // No matter how this goes, we will no longer be able + // to send on the socket (root context). That's because + // we will have finished (successfully or otherwise) the + // reply for the single request we got. + nni_pollable_clear(s->sendable); + } + + if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) { + nni_mtx_unlock(&s->lk); + return; + } + if (ctx->closed) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (len == 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + + if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { + // Pipe is gone. Make this look like a good send to avoid + // disrupting the state machine. We don't care if the peer + // lost interest in our reply. + nni_aio_set_msg(aio, NULL); + nni_mtx_unlock(&s->lk); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + return; + } + if (p->busy) { + ctx->saio = aio; + nni_list_append(&p->sendq, ctx); + nni_mtx_unlock(&s->lk); + return; + } + + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + nni_mtx_unlock(&s->lk); + + nni_aio_finish(aio, 0, len); +} + +static void rep0_sock_fini(void *arg) { rep0_sock *s = arg; - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); + if (s->ctx != NULL) { + rep0_ctx_fini(s->ctx); } + nni_pollable_free(s->sendable); + nni_pollable_free(s->recvable); nni_mtx_fini(&s->lk); NNI_FREE_STRUCT(s); } @@ -80,21 +251,34 @@ rep0_sock_init(void **sp, nni_sock *sock) rep0_sock *s; int rv; + NNI_ARG_UNUSED(sock); + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + if ((rv = nni_idhash_init(&s->pipes)) != 0) { + rep0_sock_fini(s); + return (rv); + } + + NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode); + NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode); + + s->ttl = 8; + + if ((rv = rep0_ctx_init((void **) &s->ctx, s)) != 0) { rep0_sock_fini(s); return (rv); } - s->ttl = 8; // Per RFC - s->btrace = NULL; - s->btrace_len = 0; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); + // We start off without being either readable or pollable. + // Readability comes when there is something on the socket. + if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || + ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { + rep0_sock_fini(s); + return (rv); + } *sp = s; @@ -104,9 +288,7 @@ rep0_sock_init(void **sp, nni_sock *sock) static void rep0_sock_open(void *arg) { - rep0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void @@ -114,7 +296,7 @@ rep0_sock_close(void *arg) { rep0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + rep0_ctx_close(s->ctx); } static void @@ -122,11 +304,8 @@ rep0_pipe_fini(void *arg) { rep0_pipe *p = arg; - nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_msgq_fini(p->sendq); NNI_FREE_STRUCT(p); } @@ -139,15 +318,15 @@ rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { rep0_pipe_fini(p); return (rv); } + NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode); + + p->id = nni_pipe_id(pipe); p->pipe = pipe; p->rep = s; *pp = p; @@ -164,8 +343,8 @@ rep0_pipe_start(void *arg) if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); } - - nni_msgq_aio_get(p->sendq, p->aio_getq); + // By definition, we have not received a request yet on this pipe, + // so it cannot cause us to become sendable. nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -175,94 +354,136 @@ rep0_pipe_stop(void *arg) { rep0_pipe *p = arg; rep0_sock *s = p->rep; + rep0_ctx * ctx; + + nni_mtx_lock(&s->lk); + while ((ctx = nni_list_first(&p->sendq)) != NULL) { + nni_aio *aio; + nni_msg *msg; + // Pipe was closed. To avoid pushing an error back to the + // entire socket, we pretend we completed this successfully. + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + msg = nni_aio_get_msg(aio); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } + if (p->id == s->ctx->pipe_id) { + // We "can" send. (Well, not really, but we will happily + // accept a message and discard it.) + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->lk); - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); } static void -rep0_sock_getq_cb(void *arg) +rep0_pipe_send_cb(void *arg) { - rep0_sock *s = arg; - nni_msgq * uwq = s->uwq; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + rep0_ctx * ctx; + nni_aio * aio; nni_msg * msg; - uint32_t id; - rep0_pipe *p; - int rv; - - // This watches for messages from the upper write queue, - // extracts the destination pipe, and forwards it to the appropriate - // destination pipe via a separate queue. This prevents a single bad - // or slow pipe from gumming up the works for the entire socket. + size_t len; - if (nni_aio_result(s->aio_getq) != 0) { - // Closed socket? + nni_mtx_lock(&s->lk); + p->busy = false; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + nni_mtx_unlock(&s->lk); return; } - - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // We yank the outgoing pipe id from the header - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - - // Look for another message on the upper write queue. - nni_msgq_aio_get(uwq, s->aio_getq); + if ((ctx = nni_list_first(&p->sendq)) == NULL) { + // Nothing else to send. + if (p->id == s->ctx->pipe_id) { + // Mark us ready for the other side to send! + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->lk); return; } - id = nni_msg_header_trim_u32(msg); + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + p->busy = true; + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); - // Look for the pipe, and attempt to put the message there - // (nonblocking) if we can. If we can't for any reason, then we - // free the message. - // XXX: LOCKING?!?! - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { - rv = nni_msgq_tryput(p->sendq, msg); - } - if (rv != 0) { - nni_msg_free(msg); - } + nni_mtx_unlock(&s->lk); - // Now look for another message on the upper write queue. - nni_msgq_aio_get(uwq, s->aio_getq); + nni_aio_finish_synch(aio, 0, len); } static void -rep0_pipe_getq_cb(void *arg) +rep0_cancel_recv(nni_aio *aio, int rv) { - rep0_pipe *p = arg; + rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_sock *s = ctx->sock; - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; + nni_mtx_lock(&s->lk); + if (ctx->raio == aio) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, rv); } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->pipe, p->aio_send); + nni_mtx_unlock(&s->lk); } static void -rep0_pipe_send_cb(void *arg) +rep0_ctx_recv(void *arg, nni_aio *aio) { - rep0_pipe *p = arg; + rep0_ctx * ctx = arg; + rep0_sock *s = ctx->sock; + rep0_pipe *p; + size_t len; + nni_msg * msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_mtx_lock(&s->lk); + if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) { + nni_mtx_unlock(&s->lk); + return; + } + if (ctx->closed) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if ((p = nni_list_first(&s->recvpipes)) == NULL) { + nni_pollable_clear(s->recvable); + ctx->raio = aio; + nni_list_append(&s->recvq, ctx); + nni_mtx_unlock(&s->lk); + return; + } + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_list_remove(&s->recvpipes, p); + if (nni_list_empty(&s->recvpipes)) { + nni_pollable_clear(s->recvable); + } + nni_pipe_recv(p->pipe, p->aio_recv); + + len = nni_msg_header_len(msg); + memcpy(ctx->btrace, nni_msg_header(msg), len); + ctx->btrace_len = len; + ctx->pipe_id = nni_pipe_id(p->pipe); + nni_mtx_unlock(&s->lk); - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_msg_header_clear(msg); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void @@ -270,9 +491,12 @@ rep0_pipe_recv_cb(void *arg) { rep0_pipe *p = arg; rep0_sock *s = p->rep; + rep0_ctx * ctx; nni_msg * msg; int rv; uint8_t * body; + nni_aio * aio; + size_t len; int hops; if (nni_aio_result(p->aio_recv) != 0) { @@ -281,28 +505,22 @@ rep0_pipe_recv_cb(void *arg) } msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // Store the pipe id in the header, first thing. - rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); - if (rv != 0) { - // Failure here causes us to drop the message. - goto drop; - } + nni_msg_set_pipe(msg, p->id); // Move backtrace from body to header hops = 1; for (;;) { int end = 0; - if (hops >= s->ttl) { + + if (hops > s->ttl) { // This isn't malformed, but it has gone through // too many hops. Do not disconnect, because we // can legitimately receive messages with too many // hops from devices, etc. goto drop; } + hops++; if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); @@ -313,10 +531,7 @@ rep0_pipe_recv_cb(void *arg) end = (body[0] & 0x80) ? 1 : 0; rv = nni_msg_header_append(msg, body, 4); if (rv != 0) { - // Presumably this is due to out of memory. - // We could just discard and try again, but we - // just toss the connection for now. Given the - // out of memory situation, this is not unreasonable. + // Out of memory, so drop it. goto drop; } nni_msg_trim(msg, 4); @@ -325,28 +540,46 @@ rep0_pipe_recv_cb(void *arg) } } - // Go ahead and send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(s->urq, p->aio_putq); - return; + len = nni_msg_header_len(msg); -drop: - nni_msg_free(msg); + nni_mtx_lock(&s->lk); + + if ((ctx = nni_list_first(&s->recvq)) == NULL) { + // No one waiting to receive yet, holding pattern. + nni_list_append(&s->recvpipes, p); + nni_pollable_raise(s->recvable); + nni_mtx_unlock(&s->lk); + return; + } + + nni_list_remove(&s->recvq, ctx); + aio = ctx->raio; + ctx->raio = NULL; + nni_aio_set_msg(aio, msg); + nni_aio_set_msg(p->aio_recv, NULL); + + // schedule another receive nni_pipe_recv(p->pipe, p->aio_recv); -} -static void -rep0_pipe_putq_cb(void *arg) -{ - rep0_pipe *p = arg; + ctx->btrace_len = len; + memcpy(ctx->btrace, nni_msg_header(msg), len); + nni_msg_header_clear(msg); + ctx->pipe_id = p->id; - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; + // If we got a request on a pipe that wasn't busy, we should mark + // it sendable. (The sendable flag is not set when there is no + // request needing a reply.) + if ((ctx == s->ctx) && (!p->busy)) { + nni_pollable_raise(s->sendable); } + nni_mtx_unlock(&s->lk); + + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + return; + +drop: + nni_msg_free(msg); nni_pipe_recv(p->pipe, p->aio_recv); } @@ -354,6 +587,7 @@ static int rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { rep0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); } @@ -361,75 +595,43 @@ static int rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) { rep0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); } -static nni_msg * -rep0_sock_filter(void *arg, nni_msg *msg) +static int +rep0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) { rep0_sock *s = arg; - char * header; - size_t len; - - nni_mtx_lock(&s->lk); + int rv; + int fd; - len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - } - if ((s->btrace = nni_alloc(len)) == NULL) { - nni_msg_free(msg); - return (NULL); + if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + return (rv); } - s->btrace_len = len; - memcpy(s->btrace, header, len); - nni_msg_header_clear(msg); - nni_mtx_unlock(&s->lk); - return (msg); + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -rep0_sock_send_raw(void *arg, nni_aio *aio) +static int +rep0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { rep0_sock *s = arg; - nni_msgq_aio_put(s->uwq, aio); + int rv; + int fd; + + if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + return (rv); + } + + return (nni_copyout_int(fd, buf, szp, typ)); } static void rep0_sock_send(void *arg, nni_aio *aio) { rep0_sock *s = arg; - int rv; - nni_msg * msg; - - nni_mtx_lock(&s->lk); - if (s->btrace == NULL) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - - msg = nni_aio_get_msg(aio); - - // drop anything else in the header... (it should already be - // empty, but there can be stale backtrace info there.) - nni_msg_header_clear(msg); - - if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); + rep0_ctx_send(s->ctx, aio); } static void @@ -437,7 +639,7 @@ rep0_sock_recv(void *arg, nni_aio *aio) { rep0_sock *s = arg; - nni_msgq_aio_get(s->urq, aio); + rep0_ctx_recv(s->ctx, aio); } // This is the global protocol structure -- our linkage to the core. @@ -449,6 +651,21 @@ static nni_proto_pipe_ops rep0_pipe_ops = { .pipe_stop = rep0_pipe_stop, }; +static nni_proto_ctx_option rep0_ctx_options[] = { + // terminate list + { + .co_name = NULL, + }, +}; + +static nni_proto_ctx_ops rep0_ctx_ops = { + .ctx_init = rep0_ctx_init, + .ctx_fini = rep0_ctx_fini, + .ctx_send = rep0_ctx_send, + .ctx_recv = rep0_ctx_recv, + .ctx_options = rep0_ctx_options, +}; + static nni_proto_sock_option rep0_sock_options[] = { { .pso_name = NNG_OPT_MAXTTL, @@ -456,6 +673,18 @@ static nni_proto_sock_option rep0_sock_options[] = { .pso_getopt = rep0_sock_getopt_maxttl, .pso_setopt = rep0_sock_setopt_maxttl, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = rep0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = rep0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -468,38 +697,18 @@ static nni_proto_sock_ops rep0_sock_ops = { .sock_open = rep0_sock_open, .sock_close = rep0_sock_close, .sock_options = rep0_sock_options, - .sock_filter = rep0_sock_filter, .sock_send = rep0_sock_send, .sock_recv = rep0_sock_recv, }; -static nni_proto_sock_ops rep0_sock_ops_raw = { - .sock_init = rep0_sock_init, - .sock_fini = rep0_sock_fini, - .sock_open = rep0_sock_open, - .sock_close = rep0_sock_close, - .sock_options = rep0_sock_options, - .sock_filter = NULL, // No filtering for raw mode - .sock_send = rep0_sock_send_raw, - .sock_recv = rep0_sock_recv, -}; - static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, .proto_peer = { NNI_PROTO_REQ_V0, "req" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &rep0_sock_ops, .proto_pipe_ops = &rep0_pipe_ops, -}; - -static nni_proto rep0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &rep0_sock_ops_raw, - .proto_pipe_ops = &rep0_pipe_ops, + .proto_ctx_ops = &rep0_ctx_ops, }; int @@ -507,9 +716,3 @@ nng_rep0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &rep0_proto)); } - -int -nng_rep0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &rep0_proto_raw)); -} |
