diff options
| author | Garrett D'Amore <garrett@damore.org> | 2018-04-20 20:52:32 -0700 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2018-04-24 15:06:33 -0700 |
| commit | fdefff742662ed4eb476bf19b9dda245f86bc406 (patch) | |
| tree | a4e132716debd64e434478f8814f368db052cbc6 /src/protocol/survey0/survey.c | |
| parent | e0b47b12d3d1462d07c5038e4f34f5282eeec675 (diff) | |
| download | nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.gz nng-fdefff742662ed4eb476bf19b9dda245f86bc406.tar.bz2 nng-fdefff742662ed4eb476bf19b9dda245f86bc406.zip | |
fixes #342 Want Surveyor/Respondent context support
fixes #360 core should nng_aio_begin before nng_aio_finish_error
fixes #361 nng_send_aio should check for NULL message
fixes #362 nni_msgq does not signal pollable on certain events
This adds support for contexts for both sides of the surveyor pattern.
Prior to this commit, the raw mode was completely broken, and there
were numerous other bugs found and fixed. This integration includes
*much* deeper validation of this pattern.
Some changes to the core and other patterns have been made, where it
was obvioius that we could make such improvements. (The obviousness
stemming from the fact that RESPONDENT in particular is very closely
derived from REP.)
Diffstat (limited to 'src/protocol/survey0/survey.c')
| -rw-r--r-- | src/protocol/survey0/survey.c | 546 |
1 files changed, 326 insertions, 220 deletions
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index b7158464..e725d2b3 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -16,6 +16,9 @@ // Surveyor protocol. The SURVEYOR protocol is the "survey" side of the // survey pattern. This is useful for building service discovery, voting, etc. +// Note that this pattern is not optimized for extreme low latency, as it makes +// multiple use of queues for simplicity. Typically this is used in cases +// where a few dozen extra microseconds does not matter. #ifndef NNI_PROTO_SURVEYOR_V0 #define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2) @@ -27,86 +30,249 @@ typedef struct surv0_pipe surv0_pipe; typedef struct surv0_sock surv0_sock; +typedef struct surv0_ctx surv0_ctx; -static void surv0_sock_getq_cb(void *); -static void surv0_getq_cb(void *); -static void surv0_putq_cb(void *); -static void surv0_send_cb(void *); -static void surv0_recv_cb(void *); -static void surv0_timeout(void *); +static void surv0_pipe_getq_cb(void *); +static void surv0_pipe_send_cb(void *); +static void surv0_pipe_recv_cb(void *); +static void surv0_ctx_timeout(void *); + +struct surv0_ctx { + surv0_sock * sock; + uint64_t survid; // survey id + nni_timer_node timer; + nni_time expire; + nni_duration survtime; + nni_msgq * rq; // recv message queue +}; // surv0_sock is our per-socket protocol private structure. struct surv0_sock { - nni_duration survtime; - nni_time expire; - int ttl; - uint32_t nextid; // next id - uint32_t survid; // outstanding request ID (big endian) - nni_list pipes; - nni_aio * aio_getq; - nni_timer_node timer; - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx mtx; + int ttl; + nni_list pipes; + nni_mtx mtx; + surv0_ctx * ctx; + nni_idhash * surveys; + nni_pollable *sendable; }; // surv0_pipe is our per-pipe protocol private structure. struct surv0_pipe { nni_pipe * npipe; - surv0_sock * psock; + surv0_sock * sock; nni_msgq * sendq; nni_list_node node; nni_aio * aio_getq; - nni_aio * aio_putq; nni_aio * aio_send; nni_aio * aio_recv; }; static void +surv0_ctx_fini(void *arg) +{ + surv0_ctx *ctx = arg; + + if (ctx->rq != NULL) { + nni_msgq_close(ctx->rq); + nni_msgq_fini(ctx->rq); + } + nni_timer_cancel(&ctx->timer); + NNI_FREE_STRUCT(ctx); +} + +static int +surv0_ctx_init(void **ctxp, void *sarg) +{ + surv0_ctx * ctx; + surv0_sock *sock = sarg; + int rv; + + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_lock(&sock->mtx); + if (sock->ctx != NULL) { + ctx->survtime = sock->ctx->survtime; + } + nni_mtx_unlock(&sock->mtx); + ctx->sock = sock; + // 126 is a deep enough queue, and leaves 2 extra cells for the + // pushback bit in msgqs. This can result in up to 1kB of allocation + // for the message queue. + if ((rv = nni_msgq_init(&ctx->rq, 126)) != 0) { + surv0_ctx_fini(ctx); + return (rv); + } + + nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx); + *ctxp = ctx; + return (0); +} + +static void +surv0_ctx_recv(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + if (ctx->survid == 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + nni_msgq_aio_get(ctx->rq, aio); + nni_mtx_unlock(&sock->mtx); +} + +void +surv0_ctx_timeout(void *arg) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + + nni_mtx_lock(&sock->mtx); + if (nni_clock() < ctx->expire) { + nni_mtx_unlock(&sock->mtx); + return; + } + // Abort any pending receives. + nni_msgq_set_get_error(ctx->rq, NNG_ETIMEDOUT); + if (ctx->survid != 0) { + nni_idhash_remove(sock->surveys, ctx->survid); + ctx->survid = 0; + } + nni_mtx_unlock(&sock->mtx); +} + +static void +surv0_ctx_send(void *arg, nni_aio *aio) +{ + surv0_ctx * ctx = arg; + surv0_sock *sock = ctx->sock; + surv0_pipe *pipe; + nni_msg * msg = nni_aio_get_msg(aio); + size_t len = nni_msg_len(msg); + nni_time now = nni_clock(); + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + + nni_mtx_lock(&sock->mtx); + + // Abort any pending receives -- this is the same as cancellation. + nni_msgq_set_get_error(ctx->rq, NNG_ECANCELED); + nni_msgq_flush(ctx->rq); + + // New survey id will be generated, so unregister the old one. + if (ctx->survid) { + nni_idhash_remove(sock->surveys, ctx->survid); + ctx->survid = 0; + } + // Allocate the new ID. + if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survid, ctx)) != 0) { + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + // Insert it into the message. We report an error if one occurs, + // although arguably at this point we could just discard silently. + if ((rv = nni_msg_header_append_u32(msg, (uint32_t) ctx->survid)) != + 0) { + nni_idhash_remove(sock->surveys, ctx->survid); + ctx->survid = 0; + nni_mtx_unlock(&sock->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + // From this point, we're committed to success. Note that we send + // regardless of whether there are any pipes or not. If no pipes, + // then it just gets discarded. + nni_aio_set_msg(aio, NULL); + NNI_LIST_FOREACH (&sock->pipes, pipe) { + nni_msg *dmsg; + + if (nni_list_next(&sock->pipes, pipe) != NULL) { + if (nni_msg_dup(&dmsg, msg) != 0) { + continue; + } + } else { + dmsg = msg; + msg = NULL; + } + if (nni_msgq_tryput(pipe->sendq, dmsg) != 0) { + nni_msg_free(dmsg); + } + } + + ctx->expire = now + ctx->survtime; + nni_timer_schedule(&ctx->timer, ctx->expire); + + // Allow recv to run. + nni_msgq_set_get_error(ctx->rq, 0); + + nni_mtx_unlock(&sock->mtx); + if (msg != NULL) { + nni_msg_free(msg); + } + + nni_aio_finish(aio, 0, len); +} + +static void surv0_sock_fini(void *arg) { - surv0_sock *s = arg; + surv0_sock *sock = arg; - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); - nni_mtx_fini(&s->mtx); - NNI_FREE_STRUCT(s); + if (sock->ctx != NULL) { + surv0_ctx_fini(sock->ctx); + } + nni_idhash_fini(sock->surveys); + nni_pollable_free(sock->sendable); + nni_mtx_fini(&sock->mtx); + NNI_FREE_STRUCT(sock); } static int surv0_sock_init(void **sp, nni_sock *nsock) { - surv0_sock *s; + surv0_sock *sock; int rv; - if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + NNI_ARG_UNUSED(nsock); + + if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) { - surv0_sock_fini(s); + NNI_LIST_INIT(&sock->pipes, surv0_pipe, node); + nni_mtx_init(&sock->mtx); + + if (((rv = nni_idhash_init(&sock->surveys)) != 0) || + ((rv = surv0_ctx_init((void **) &sock->ctx, sock)) != 0)) { + surv0_sock_fini(sock); return (rv); } - NNI_LIST_INIT(&s->pipes, surv0_pipe, node); - nni_mtx_init(&s->mtx); - nni_timer_init(&s->timer, surv0_timeout, s); - - s->nextid = nni_random(); - s->survtime = NNI_SECOND; - s->expire = NNI_TIME_ZERO; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - - *sp = s; + + // Survey IDs are 32 bits, with the high order bit set. + // We start at a random point, to minimize likelihood of + // accidental collision across restarts. + nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu, + nni_random() | 0x80000000u); + + sock->ctx->survtime = NNI_SECOND; + sock->ttl = 8; + + *sp = sock; return (0); } static void surv0_sock_open(void *arg) { - surv0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void @@ -114,8 +280,7 @@ surv0_sock_close(void *arg) { surv0_sock *s = arg; - nni_timer_cancel(&s->timer); - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + nni_msgq_close(s->ctx->rq); } static void @@ -126,7 +291,6 @@ surv0_pipe_fini(void *arg) nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); nni_msgq_fini(p->sendq); NNI_FREE_STRUCT(p); } @@ -140,18 +304,20 @@ surv0_pipe_init(void **pp, nni_pipe *npipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - // This depth could be tunable. + // This depth could be tunable. The deeper the queue, the more + // concurrent surveys that can be delivered. Having said that, this + // is best effort, and a deep queue doesn't really do much for us. + // Note that surveys can be *outstanding*, but not yet put on the wire. if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) { + ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) { surv0_pipe_fini(p); return (rv); } p->npipe = npipe; - p->psock = s; + p->sock = s; *pp = p; return (0); } @@ -160,7 +326,7 @@ static int surv0_pipe_start(void *arg) { surv0_pipe *p = arg; - surv0_sock *s = p->psock; + surv0_sock *s = p->sock; nni_mtx_lock(&s->mtx); nni_list_append(&s->pipes, p); @@ -175,12 +341,11 @@ static void surv0_pipe_stop(void *arg) { surv0_pipe *p = arg; - surv0_sock *s = p->psock; + surv0_sock *s = p->sock; nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); nni_msgq_close(p->sendq); @@ -192,7 +357,7 @@ surv0_pipe_stop(void *arg) } static void -surv0_getq_cb(void *arg) +surv0_pipe_getq_cb(void *arg) { surv0_pipe *p = arg; @@ -208,7 +373,7 @@ surv0_getq_cb(void *arg) } static void -surv0_send_cb(void *arg) +surv0_pipe_send_cb(void *arg) { surv0_pipe *p = arg; @@ -223,28 +388,17 @@ surv0_send_cb(void *arg) } static void -surv0_putq_cb(void *arg) +surv0_pipe_recv_cb(void *arg) { - surv0_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->npipe); - return; - } - - nni_pipe_recv(p->npipe, p->aio_recv); -} - -static void -surv0_recv_cb(void *arg) -{ - surv0_pipe *p = arg; + surv0_pipe *p = arg; + surv0_sock *sock = p->sock; + surv0_ctx * ctx; nni_msg * msg; + uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { - goto failed; + nni_pipe_stop(p->npipe); + return; } msg = nni_aio_get_msg(p->aio_recv); @@ -253,23 +407,45 @@ surv0_recv_cb(void *arg) // We yank 4 bytes of body, and move them to the header. if (nni_msg_len(msg) < 4) { - // Not enough data, just toss it. + // Peer sent us garbage. Kick it. nni_msg_free(msg); - goto failed; + nni_pipe_stop(p->npipe); + return; + } + id = nni_msg_trim_u32(msg); + if (nni_msg_header_append_u32(msg, id) != 0) { + // Should be NNG_ENOMEM - discard and try again. + nni_msg_free(msg); + nni_pipe_recv(p->npipe, p->aio_recv); + return; } - if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { - // Should be NNG_ENOMEM + + nni_mtx_lock(&sock->mtx); + + // Best effort at delivery. Discard if no context or context is + // unable to receive it. + if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) || + (nni_msgq_tryput(ctx->rq, msg) != 0)) { nni_msg_free(msg); - goto failed; } - (void) nni_msg_trim(msg, 4); - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(p->psock->urq, p->aio_putq); - return; + nni_mtx_unlock(&sock->mtx); -failed: - nni_pipe_stop(p->npipe); + nni_pipe_recv(p->npipe, p->aio_recv); +} + +static int +surv0_ctx_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ) +{ + surv0_ctx *ctx = arg; + return (nni_copyin_ms(&ctx->survtime, buf, sz, typ)); +} + +static int +surv0_ctx_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ) +{ + surv0_ctx *ctx = arg; + return (nni_copyout_ms(ctx->survtime, buf, szp, typ)); } static int @@ -290,141 +466,66 @@ static int surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ) { surv0_sock *s = arg; - return (nni_copyin_ms(&s->survtime, buf, sz, typ)); + return (surv0_ctx_setopt_surveytime(s->ctx, buf, sz, typ)); } static int surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ) { surv0_sock *s = arg; - return (nni_copyout_ms(s->survtime, buf, szp, typ)); + return (surv0_ctx_getopt_surveytime(s->ctx, buf, szp, typ)); } -static void -surv0_sock_getq_cb(void *arg) +static int +surv0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) { - surv0_sock *s = arg; - surv0_pipe *p; - surv0_pipe *last; - nni_msg * msg, *dup; - - if (nni_aio_result(s->aio_getq) != 0) { - // Should be NNG_ECLOSED. - return; - } - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); + surv0_sock *sock = arg; + int rv; + int fd; - nni_mtx_lock(&s->mtx); - last = nni_list_last(&s->pipes); - NNI_LIST_FOREACH (&s->pipes, p) { - if (p != last) { - if (nni_msg_dup(&dup, msg) != 0) { - continue; - } - } else { - dup = msg; - } - if (nni_msgq_tryput(p->sendq, dup) != 0) { - nni_msg_free(dup); + nni_mtx_lock(&sock->mtx); + if (sock->sendable == NULL) { + if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) { + nni_mtx_unlock(&sock->mtx); + return (rv); } + // We are always sendable. + nni_pollable_raise(sock->sendable); } - - nni_msgq_aio_get(s->uwq, s->aio_getq); - nni_mtx_unlock(&s->mtx); - - if (last == NULL) { - // If there were no pipes to send on, just toss the message. - nni_msg_free(msg); + nni_mtx_unlock(&sock->mtx); + if ((rv = nni_pollable_getfd(sock->sendable, &fd)) != 0) { + return (rv); } + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -surv0_timeout(void *arg) -{ - surv0_sock *s = arg; - - nni_mtx_lock(&s->mtx); - s->survid = 0; - nni_mtx_unlock(&s->mtx); - - nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); -} - -static void -surv0_sock_recv(void *arg, nni_aio *aio) +static int +surv0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { - surv0_sock *s = arg; + surv0_sock * sock = arg; + nni_pollable *recvable; + int rv; + int fd; - nni_mtx_lock(&s->mtx); - if (s->survid == 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; + if (((rv = nni_msgq_get_recvable(sock->ctx->rq, &recvable)) != 0) || + ((rv = nni_pollable_getfd(recvable, &fd)) != 0)) { + return (rv); } - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->urq, aio); + return (nni_copyout_int(fd, buf, szp, typ)); } static void -surv0_sock_send_raw(void *arg, nni_aio *aio) +surv0_sock_recv(void *arg, nni_aio *aio) { surv0_sock *s = arg; - - nni_msgq_aio_put(s->uwq, aio); + surv0_ctx_recv(s->ctx, aio); } static void surv0_sock_send(void *arg, nni_aio *aio) { surv0_sock *s = arg; - nni_msg * msg; - int rv; - - nni_mtx_lock(&s->mtx); - - // Generate a new request ID. We always set the high - // order bit so that the peer can locate the end of the - // backtrace. (Pipe IDs have the high order bit clear.) - s->survid = (s->nextid++) | 0x80000000u; - - msg = nni_aio_get_msg(aio); - nni_msg_header_clear(msg); - if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); - return; - } - - // If another message is there, this cancels it. We move the - // survey expiration out. The timeout thread will wake up in - // the wake below, and reschedule itself appropriately. - nni_msgq_set_get_error(s->urq, 0); - s->expire = nni_clock() + s->survtime; - nni_timer_schedule(&s->timer, s->expire); - - nni_mtx_unlock(&s->mtx); - - nni_msgq_aio_put(s->uwq, aio); -} - -static nni_msg * -surv0_sock_filter(void *arg, nni_msg *msg) -{ - surv0_sock *s = arg; - - nni_mtx_lock(&s->mtx); - - if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || - (nni_msg_header_trim_u32(msg) != s->survid)) { - // Wrong request id - nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); - } - nni_mtx_unlock(&s->mtx); - - return (msg); + surv0_ctx_send(s->ctx, aio); } static nni_proto_pipe_ops surv0_pipe_ops = { @@ -434,6 +535,25 @@ static nni_proto_pipe_ops surv0_pipe_ops = { .pipe_stop = surv0_pipe_stop, }; +static nni_proto_ctx_option surv0_ctx_options[] = { + { + .co_name = NNG_OPT_SURVEYOR_SURVEYTIME, + .co_type = NNI_TYPE_DURATION, + .co_getopt = surv0_ctx_getopt_surveytime, + .co_setopt = surv0_ctx_setopt_surveytime, + }, + { + .co_name = NULL, + } +}; +static nni_proto_ctx_ops surv0_ctx_ops = { + .ctx_init = surv0_ctx_init, + .ctx_fini = surv0_ctx_fini, + .ctx_send = surv0_ctx_send, + .ctx_recv = surv0_ctx_recv, + .ctx_options = surv0_ctx_options, +}; + static nni_proto_sock_option surv0_sock_options[] = { { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, @@ -447,6 +567,18 @@ static nni_proto_sock_option surv0_sock_options[] = { .pso_getopt = surv0_sock_getopt_maxttl, .pso_setopt = surv0_sock_setopt_maxttl, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = surv0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = surv0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -460,18 +592,6 @@ static nni_proto_sock_ops surv0_sock_ops = { .sock_close = surv0_sock_close, .sock_send = surv0_sock_send, .sock_recv = surv0_sock_recv, - .sock_filter = surv0_sock_filter, - .sock_options = surv0_sock_options, -}; - -static nni_proto_sock_ops surv0_sock_ops_raw = { - .sock_init = surv0_sock_init, - .sock_fini = surv0_sock_fini, - .sock_open = surv0_sock_open, - .sock_close = surv0_sock_close, - .sock_send = surv0_sock_send_raw, - .sock_recv = surv0_sock_recv, - .sock_filter = surv0_sock_filter, .sock_options = surv0_sock_options, }; @@ -479,18 +599,10 @@ static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &surv0_sock_ops, .proto_pipe_ops = &surv0_pipe_ops, -}; - -static nni_proto surv0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, - .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &surv0_sock_ops_raw, - .proto_pipe_ops = &surv0_pipe_ops, + .proto_ctx_ops = &surv0_ctx_ops, }; int @@ -498,9 +610,3 @@ nng_surveyor0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &surv0_proto)); } - -int -nng_surveyor0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &surv0_proto_raw)); -} |
