aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/survey0/survey.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/survey0/survey.c')
-rw-r--r--src/protocol/survey0/survey.c546
1 files changed, 326 insertions, 220 deletions
diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c
index b7158464..e725d2b3 100644
--- a/src/protocol/survey0/survey.c
+++ b/src/protocol/survey0/survey.c
@@ -16,6 +16,9 @@
// Surveyor protocol. The SURVEYOR protocol is the "survey" side of the
// survey pattern. This is useful for building service discovery, voting, etc.
+// Note that this pattern is not optimized for extreme low latency, as it makes
+// multiple use of queues for simplicity. Typically this is used in cases
+// where a few dozen extra microseconds does not matter.
#ifndef NNI_PROTO_SURVEYOR_V0
#define NNI_PROTO_SURVEYOR_V0 NNI_PROTO(6, 2)
@@ -27,86 +30,249 @@
typedef struct surv0_pipe surv0_pipe;
typedef struct surv0_sock surv0_sock;
+typedef struct surv0_ctx surv0_ctx;
-static void surv0_sock_getq_cb(void *);
-static void surv0_getq_cb(void *);
-static void surv0_putq_cb(void *);
-static void surv0_send_cb(void *);
-static void surv0_recv_cb(void *);
-static void surv0_timeout(void *);
+static void surv0_pipe_getq_cb(void *);
+static void surv0_pipe_send_cb(void *);
+static void surv0_pipe_recv_cb(void *);
+static void surv0_ctx_timeout(void *);
+
+struct surv0_ctx {
+ surv0_sock * sock;
+ uint64_t survid; // survey id
+ nni_timer_node timer;
+ nni_time expire;
+ nni_duration survtime;
+ nni_msgq * rq; // recv message queue
+};
// surv0_sock is our per-socket protocol private structure.
struct surv0_sock {
- nni_duration survtime;
- nni_time expire;
- int ttl;
- uint32_t nextid; // next id
- uint32_t survid; // outstanding request ID (big endian)
- nni_list pipes;
- nni_aio * aio_getq;
- nni_timer_node timer;
- nni_msgq * uwq;
- nni_msgq * urq;
- nni_mtx mtx;
+ int ttl;
+ nni_list pipes;
+ nni_mtx mtx;
+ surv0_ctx * ctx;
+ nni_idhash * surveys;
+ nni_pollable *sendable;
};
// surv0_pipe is our per-pipe protocol private structure.
struct surv0_pipe {
nni_pipe * npipe;
- surv0_sock * psock;
+ surv0_sock * sock;
nni_msgq * sendq;
nni_list_node node;
nni_aio * aio_getq;
- nni_aio * aio_putq;
nni_aio * aio_send;
nni_aio * aio_recv;
};
static void
+surv0_ctx_fini(void *arg)
+{
+ surv0_ctx *ctx = arg;
+
+ if (ctx->rq != NULL) {
+ nni_msgq_close(ctx->rq);
+ nni_msgq_fini(ctx->rq);
+ }
+ nni_timer_cancel(&ctx->timer);
+ NNI_FREE_STRUCT(ctx);
+}
+
+static int
+surv0_ctx_init(void **ctxp, void *sarg)
+{
+ surv0_ctx * ctx;
+ surv0_sock *sock = sarg;
+ int rv;
+
+ if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) {
+ return (NNG_ENOMEM);
+ }
+ nni_mtx_lock(&sock->mtx);
+ if (sock->ctx != NULL) {
+ ctx->survtime = sock->ctx->survtime;
+ }
+ nni_mtx_unlock(&sock->mtx);
+ ctx->sock = sock;
+ // 126 is a deep enough queue, and leaves 2 extra cells for the
+ // pushback bit in msgqs. This can result in up to 1kB of allocation
+ // for the message queue.
+ if ((rv = nni_msgq_init(&ctx->rq, 126)) != 0) {
+ surv0_ctx_fini(ctx);
+ return (rv);
+ }
+
+ nni_timer_init(&ctx->timer, surv0_ctx_timeout, ctx);
+ *ctxp = ctx;
+ return (0);
+}
+
+static void
+surv0_ctx_recv(void *arg, nni_aio *aio)
+{
+ surv0_ctx * ctx = arg;
+ surv0_sock *sock = ctx->sock;
+
+ nni_mtx_lock(&sock->mtx);
+ if (ctx->survid == 0) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ nni_msgq_aio_get(ctx->rq, aio);
+ nni_mtx_unlock(&sock->mtx);
+}
+
+void
+surv0_ctx_timeout(void *arg)
+{
+ surv0_ctx * ctx = arg;
+ surv0_sock *sock = ctx->sock;
+
+ nni_mtx_lock(&sock->mtx);
+ if (nni_clock() < ctx->expire) {
+ nni_mtx_unlock(&sock->mtx);
+ return;
+ }
+ // Abort any pending receives.
+ nni_msgq_set_get_error(ctx->rq, NNG_ETIMEDOUT);
+ if (ctx->survid != 0) {
+ nni_idhash_remove(sock->surveys, ctx->survid);
+ ctx->survid = 0;
+ }
+ nni_mtx_unlock(&sock->mtx);
+}
+
+static void
+surv0_ctx_send(void *arg, nni_aio *aio)
+{
+ surv0_ctx * ctx = arg;
+ surv0_sock *sock = ctx->sock;
+ surv0_pipe *pipe;
+ nni_msg * msg = nni_aio_get_msg(aio);
+ size_t len = nni_msg_len(msg);
+ nni_time now = nni_clock();
+ int rv;
+
+ if (nni_aio_begin(aio) != 0) {
+ return;
+ }
+
+ nni_mtx_lock(&sock->mtx);
+
+ // Abort any pending receives -- this is the same as cancellation.
+ nni_msgq_set_get_error(ctx->rq, NNG_ECANCELED);
+ nni_msgq_flush(ctx->rq);
+
+ // New survey id will be generated, so unregister the old one.
+ if (ctx->survid) {
+ nni_idhash_remove(sock->surveys, ctx->survid);
+ ctx->survid = 0;
+ }
+ // Allocate the new ID.
+ if ((rv = nni_idhash_alloc(sock->surveys, &ctx->survid, ctx)) != 0) {
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+ // Insert it into the message. We report an error if one occurs,
+ // although arguably at this point we could just discard silently.
+ if ((rv = nni_msg_header_append_u32(msg, (uint32_t) ctx->survid)) !=
+ 0) {
+ nni_idhash_remove(sock->surveys, ctx->survid);
+ ctx->survid = 0;
+ nni_mtx_unlock(&sock->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ // From this point, we're committed to success. Note that we send
+ // regardless of whether there are any pipes or not. If no pipes,
+ // then it just gets discarded.
+ nni_aio_set_msg(aio, NULL);
+ NNI_LIST_FOREACH (&sock->pipes, pipe) {
+ nni_msg *dmsg;
+
+ if (nni_list_next(&sock->pipes, pipe) != NULL) {
+ if (nni_msg_dup(&dmsg, msg) != 0) {
+ continue;
+ }
+ } else {
+ dmsg = msg;
+ msg = NULL;
+ }
+ if (nni_msgq_tryput(pipe->sendq, dmsg) != 0) {
+ nni_msg_free(dmsg);
+ }
+ }
+
+ ctx->expire = now + ctx->survtime;
+ nni_timer_schedule(&ctx->timer, ctx->expire);
+
+ // Allow recv to run.
+ nni_msgq_set_get_error(ctx->rq, 0);
+
+ nni_mtx_unlock(&sock->mtx);
+ if (msg != NULL) {
+ nni_msg_free(msg);
+ }
+
+ nni_aio_finish(aio, 0, len);
+}
+
+static void
surv0_sock_fini(void *arg)
{
- surv0_sock *s = arg;
+ surv0_sock *sock = arg;
- nni_aio_stop(s->aio_getq);
- nni_aio_fini(s->aio_getq);
- nni_mtx_fini(&s->mtx);
- NNI_FREE_STRUCT(s);
+ if (sock->ctx != NULL) {
+ surv0_ctx_fini(sock->ctx);
+ }
+ nni_idhash_fini(sock->surveys);
+ nni_pollable_free(sock->sendable);
+ nni_mtx_fini(&sock->mtx);
+ NNI_FREE_STRUCT(sock);
}
static int
surv0_sock_init(void **sp, nni_sock *nsock)
{
- surv0_sock *s;
+ surv0_sock *sock;
int rv;
- if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
+ NNI_ARG_UNUSED(nsock);
+
+ if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_aio_init(&s->aio_getq, surv0_sock_getq_cb, s)) != 0) {
- surv0_sock_fini(s);
+ NNI_LIST_INIT(&sock->pipes, surv0_pipe, node);
+ nni_mtx_init(&sock->mtx);
+
+ if (((rv = nni_idhash_init(&sock->surveys)) != 0) ||
+ ((rv = surv0_ctx_init((void **) &sock->ctx, sock)) != 0)) {
+ surv0_sock_fini(sock);
return (rv);
}
- NNI_LIST_INIT(&s->pipes, surv0_pipe, node);
- nni_mtx_init(&s->mtx);
- nni_timer_init(&s->timer, surv0_timeout, s);
-
- s->nextid = nni_random();
- s->survtime = NNI_SECOND;
- s->expire = NNI_TIME_ZERO;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
-
- *sp = s;
+
+ // Survey IDs are 32 bits, with the high order bit set.
+ // We start at a random point, to minimize likelihood of
+ // accidental collision across restarts.
+ nni_idhash_set_limits(sock->surveys, 0x80000000u, 0xffffffffu,
+ nni_random() | 0x80000000u);
+
+ sock->ctx->survtime = NNI_SECOND;
+ sock->ttl = 8;
+
+ *sp = sock;
return (0);
}
static void
surv0_sock_open(void *arg)
{
- surv0_sock *s = arg;
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
+ NNI_ARG_UNUSED(arg);
}
static void
@@ -114,8 +280,7 @@ surv0_sock_close(void *arg)
{
surv0_sock *s = arg;
- nni_timer_cancel(&s->timer);
- nni_aio_abort(s->aio_getq, NNG_ECLOSED);
+ nni_msgq_close(s->ctx->rq);
}
static void
@@ -126,7 +291,6 @@ surv0_pipe_fini(void *arg)
nni_aio_fini(p->aio_getq);
nni_aio_fini(p->aio_send);
nni_aio_fini(p->aio_recv);
- nni_aio_fini(p->aio_putq);
nni_msgq_fini(p->sendq);
NNI_FREE_STRUCT(p);
}
@@ -140,18 +304,20 @@ surv0_pipe_init(void **pp, nni_pipe *npipe, void *s)
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
- // This depth could be tunable.
+ // This depth could be tunable. The deeper the queue, the more
+ // concurrent surveys that can be delivered. Having said that, this
+ // is best effort, and a deep queue doesn't really do much for us.
+ // Note that surveys can be *outstanding*, but not yet put on the wire.
if (((rv = nni_msgq_init(&p->sendq, 16)) != 0) ||
- ((rv = nni_aio_init(&p->aio_getq, surv0_getq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_putq, surv0_putq_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_send, surv0_send_cb, p)) != 0) ||
- ((rv = nni_aio_init(&p->aio_recv, surv0_recv_cb, p)) != 0)) {
+ ((rv = nni_aio_init(&p->aio_getq, surv0_pipe_getq_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_send, surv0_pipe_send_cb, p)) != 0) ||
+ ((rv = nni_aio_init(&p->aio_recv, surv0_pipe_recv_cb, p)) != 0)) {
surv0_pipe_fini(p);
return (rv);
}
p->npipe = npipe;
- p->psock = s;
+ p->sock = s;
*pp = p;
return (0);
}
@@ -160,7 +326,7 @@ static int
surv0_pipe_start(void *arg)
{
surv0_pipe *p = arg;
- surv0_sock *s = p->psock;
+ surv0_sock *s = p->sock;
nni_mtx_lock(&s->mtx);
nni_list_append(&s->pipes, p);
@@ -175,12 +341,11 @@ static void
surv0_pipe_stop(void *arg)
{
surv0_pipe *p = arg;
- surv0_sock *s = p->psock;
+ surv0_sock *s = p->sock;
nni_aio_stop(p->aio_getq);
nni_aio_stop(p->aio_send);
nni_aio_stop(p->aio_recv);
- nni_aio_stop(p->aio_putq);
nni_msgq_close(p->sendq);
@@ -192,7 +357,7 @@ surv0_pipe_stop(void *arg)
}
static void
-surv0_getq_cb(void *arg)
+surv0_pipe_getq_cb(void *arg)
{
surv0_pipe *p = arg;
@@ -208,7 +373,7 @@ surv0_getq_cb(void *arg)
}
static void
-surv0_send_cb(void *arg)
+surv0_pipe_send_cb(void *arg)
{
surv0_pipe *p = arg;
@@ -223,28 +388,17 @@ surv0_send_cb(void *arg)
}
static void
-surv0_putq_cb(void *arg)
+surv0_pipe_recv_cb(void *arg)
{
- surv0_pipe *p = arg;
-
- if (nni_aio_result(p->aio_putq) != 0) {
- nni_msg_free(nni_aio_get_msg(p->aio_putq));
- nni_aio_set_msg(p->aio_putq, NULL);
- nni_pipe_stop(p->npipe);
- return;
- }
-
- nni_pipe_recv(p->npipe, p->aio_recv);
-}
-
-static void
-surv0_recv_cb(void *arg)
-{
- surv0_pipe *p = arg;
+ surv0_pipe *p = arg;
+ surv0_sock *sock = p->sock;
+ surv0_ctx * ctx;
nni_msg * msg;
+ uint32_t id;
if (nni_aio_result(p->aio_recv) != 0) {
- goto failed;
+ nni_pipe_stop(p->npipe);
+ return;
}
msg = nni_aio_get_msg(p->aio_recv);
@@ -253,23 +407,45 @@ surv0_recv_cb(void *arg)
// We yank 4 bytes of body, and move them to the header.
if (nni_msg_len(msg) < 4) {
- // Not enough data, just toss it.
+ // Peer sent us garbage. Kick it.
nni_msg_free(msg);
- goto failed;
+ nni_pipe_stop(p->npipe);
+ return;
+ }
+ id = nni_msg_trim_u32(msg);
+ if (nni_msg_header_append_u32(msg, id) != 0) {
+ // Should be NNG_ENOMEM - discard and try again.
+ nni_msg_free(msg);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+ return;
}
- if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) {
- // Should be NNG_ENOMEM
+
+ nni_mtx_lock(&sock->mtx);
+
+ // Best effort at delivery. Discard if no context or context is
+ // unable to receive it.
+ if ((nni_idhash_find(sock->surveys, id, (void **) &ctx) != 0) ||
+ (nni_msgq_tryput(ctx->rq, msg) != 0)) {
nni_msg_free(msg);
- goto failed;
}
- (void) nni_msg_trim(msg, 4);
- nni_aio_set_msg(p->aio_putq, msg);
- nni_msgq_aio_put(p->psock->urq, p->aio_putq);
- return;
+ nni_mtx_unlock(&sock->mtx);
-failed:
- nni_pipe_stop(p->npipe);
+ nni_pipe_recv(p->npipe, p->aio_recv);
+}
+
+static int
+surv0_ctx_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ)
+{
+ surv0_ctx *ctx = arg;
+ return (nni_copyin_ms(&ctx->survtime, buf, sz, typ));
+}
+
+static int
+surv0_ctx_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ)
+{
+ surv0_ctx *ctx = arg;
+ return (nni_copyout_ms(ctx->survtime, buf, szp, typ));
}
static int
@@ -290,141 +466,66 @@ static int
surv0_sock_setopt_surveytime(void *arg, const void *buf, size_t sz, int typ)
{
surv0_sock *s = arg;
- return (nni_copyin_ms(&s->survtime, buf, sz, typ));
+ return (surv0_ctx_setopt_surveytime(s->ctx, buf, sz, typ));
}
static int
surv0_sock_getopt_surveytime(void *arg, void *buf, size_t *szp, int typ)
{
surv0_sock *s = arg;
- return (nni_copyout_ms(s->survtime, buf, szp, typ));
+ return (surv0_ctx_getopt_surveytime(s->ctx, buf, szp, typ));
}
-static void
-surv0_sock_getq_cb(void *arg)
+static int
+surv0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ)
{
- surv0_sock *s = arg;
- surv0_pipe *p;
- surv0_pipe *last;
- nni_msg * msg, *dup;
-
- if (nni_aio_result(s->aio_getq) != 0) {
- // Should be NNG_ECLOSED.
- return;
- }
- msg = nni_aio_get_msg(s->aio_getq);
- nni_aio_set_msg(s->aio_getq, NULL);
+ surv0_sock *sock = arg;
+ int rv;
+ int fd;
- nni_mtx_lock(&s->mtx);
- last = nni_list_last(&s->pipes);
- NNI_LIST_FOREACH (&s->pipes, p) {
- if (p != last) {
- if (nni_msg_dup(&dup, msg) != 0) {
- continue;
- }
- } else {
- dup = msg;
- }
- if (nni_msgq_tryput(p->sendq, dup) != 0) {
- nni_msg_free(dup);
+ nni_mtx_lock(&sock->mtx);
+ if (sock->sendable == NULL) {
+ if ((rv = nni_pollable_alloc(&sock->sendable)) != 0) {
+ nni_mtx_unlock(&sock->mtx);
+ return (rv);
}
+ // We are always sendable.
+ nni_pollable_raise(sock->sendable);
}
-
- nni_msgq_aio_get(s->uwq, s->aio_getq);
- nni_mtx_unlock(&s->mtx);
-
- if (last == NULL) {
- // If there were no pipes to send on, just toss the message.
- nni_msg_free(msg);
+ nni_mtx_unlock(&sock->mtx);
+ if ((rv = nni_pollable_getfd(sock->sendable, &fd)) != 0) {
+ return (rv);
}
+ return (nni_copyout_int(fd, buf, szp, typ));
}
-static void
-surv0_timeout(void *arg)
-{
- surv0_sock *s = arg;
-
- nni_mtx_lock(&s->mtx);
- s->survid = 0;
- nni_mtx_unlock(&s->mtx);
-
- nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT);
-}
-
-static void
-surv0_sock_recv(void *arg, nni_aio *aio)
+static int
+surv0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ)
{
- surv0_sock *s = arg;
+ surv0_sock * sock = arg;
+ nni_pollable *recvable;
+ int rv;
+ int fd;
- nni_mtx_lock(&s->mtx);
- if (s->survid == 0) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, NNG_ESTATE);
- return;
+ if (((rv = nni_msgq_get_recvable(sock->ctx->rq, &recvable)) != 0) ||
+ ((rv = nni_pollable_getfd(recvable, &fd)) != 0)) {
+ return (rv);
}
- nni_mtx_unlock(&s->mtx);
- nni_msgq_aio_get(s->urq, aio);
+ return (nni_copyout_int(fd, buf, szp, typ));
}
static void
-surv0_sock_send_raw(void *arg, nni_aio *aio)
+surv0_sock_recv(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
-
- nni_msgq_aio_put(s->uwq, aio);
+ surv0_ctx_recv(s->ctx, aio);
}
static void
surv0_sock_send(void *arg, nni_aio *aio)
{
surv0_sock *s = arg;
- nni_msg * msg;
- int rv;
-
- nni_mtx_lock(&s->mtx);
-
- // Generate a new request ID. We always set the high
- // order bit so that the peer can locate the end of the
- // backtrace. (Pipe IDs have the high order bit clear.)
- s->survid = (s->nextid++) | 0x80000000u;
-
- msg = nni_aio_get_msg(aio);
- nni_msg_header_clear(msg);
- if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) {
- nni_mtx_unlock(&s->mtx);
- nni_aio_finish_error(aio, rv);
- return;
- }
-
- // If another message is there, this cancels it. We move the
- // survey expiration out. The timeout thread will wake up in
- // the wake below, and reschedule itself appropriately.
- nni_msgq_set_get_error(s->urq, 0);
- s->expire = nni_clock() + s->survtime;
- nni_timer_schedule(&s->timer, s->expire);
-
- nni_mtx_unlock(&s->mtx);
-
- nni_msgq_aio_put(s->uwq, aio);
-}
-
-static nni_msg *
-surv0_sock_filter(void *arg, nni_msg *msg)
-{
- surv0_sock *s = arg;
-
- nni_mtx_lock(&s->mtx);
-
- if ((nni_msg_header_len(msg) < sizeof(uint32_t)) ||
- (nni_msg_header_trim_u32(msg) != s->survid)) {
- // Wrong request id
- nni_mtx_unlock(&s->mtx);
- nni_msg_free(msg);
- return (NULL);
- }
- nni_mtx_unlock(&s->mtx);
-
- return (msg);
+ surv0_ctx_send(s->ctx, aio);
}
static nni_proto_pipe_ops surv0_pipe_ops = {
@@ -434,6 +535,25 @@ static nni_proto_pipe_ops surv0_pipe_ops = {
.pipe_stop = surv0_pipe_stop,
};
+static nni_proto_ctx_option surv0_ctx_options[] = {
+ {
+ .co_name = NNG_OPT_SURVEYOR_SURVEYTIME,
+ .co_type = NNI_TYPE_DURATION,
+ .co_getopt = surv0_ctx_getopt_surveytime,
+ .co_setopt = surv0_ctx_setopt_surveytime,
+ },
+ {
+ .co_name = NULL,
+ }
+};
+static nni_proto_ctx_ops surv0_ctx_ops = {
+ .ctx_init = surv0_ctx_init,
+ .ctx_fini = surv0_ctx_fini,
+ .ctx_send = surv0_ctx_send,
+ .ctx_recv = surv0_ctx_recv,
+ .ctx_options = surv0_ctx_options,
+};
+
static nni_proto_sock_option surv0_sock_options[] = {
{
.pso_name = NNG_OPT_SURVEYOR_SURVEYTIME,
@@ -447,6 +567,18 @@ static nni_proto_sock_option surv0_sock_options[] = {
.pso_getopt = surv0_sock_getopt_maxttl,
.pso_setopt = surv0_sock_setopt_maxttl,
},
+ {
+ .pso_name = NNG_OPT_RECVFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = surv0_sock_getopt_recvfd,
+ .pso_setopt = NULL,
+ },
+ {
+ .pso_name = NNG_OPT_SENDFD,
+ .pso_type = NNI_TYPE_INT32,
+ .pso_getopt = surv0_sock_getopt_sendfd,
+ .pso_setopt = NULL,
+ },
// terminate list
{
.pso_name = NULL,
@@ -460,18 +592,6 @@ static nni_proto_sock_ops surv0_sock_ops = {
.sock_close = surv0_sock_close,
.sock_send = surv0_sock_send,
.sock_recv = surv0_sock_recv,
- .sock_filter = surv0_sock_filter,
- .sock_options = surv0_sock_options,
-};
-
-static nni_proto_sock_ops surv0_sock_ops_raw = {
- .sock_init = surv0_sock_init,
- .sock_fini = surv0_sock_fini,
- .sock_open = surv0_sock_open,
- .sock_close = surv0_sock_close,
- .sock_send = surv0_sock_send_raw,
- .sock_recv = surv0_sock_recv,
- .sock_filter = surv0_sock_filter,
.sock_options = surv0_sock_options,
};
@@ -479,18 +599,10 @@ static nni_proto surv0_proto = {
.proto_version = NNI_PROTOCOL_VERSION,
.proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
.proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV,
+ .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ,
.proto_sock_ops = &surv0_sock_ops,
.proto_pipe_ops = &surv0_pipe_ops,
-};
-
-static nni_proto surv0_proto_raw = {
- .proto_version = NNI_PROTOCOL_VERSION,
- .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" },
- .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" },
- .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW,
- .proto_sock_ops = &surv0_sock_ops_raw,
- .proto_pipe_ops = &surv0_pipe_ops,
+ .proto_ctx_ops = &surv0_ctx_ops,
};
int
@@ -498,9 +610,3 @@ nng_surveyor0_open(nng_socket *sidp)
{
return (nni_proto_open(sidp, &surv0_proto));
}
-
-int
-nng_surveyor0_open_raw(nng_socket *sidp)
-{
- return (nni_proto_open(sidp, &surv0_proto_raw));
-}