From 3585000ca027740dbdb4599f4991cd2bf562e2f2 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Fri, 20 Oct 2017 17:03:12 -0700 Subject: fixes #112 Need to move some stuff from socket to message queues --- src/core/msgqueue.c | 113 +++++++++++++++++++-------------- src/core/msgqueue.h | 36 ++++++----- src/core/protocol.h | 16 ++--- src/core/socket.c | 142 +++++++++++++++++------------------------- src/core/socket.h | 29 +++++---- src/protocol/bus/bus.c | 28 ++++++++- src/protocol/pair/pair_v0.c | 20 ++++++ src/protocol/pair/pair_v1.c | 20 ++++++ src/protocol/pipeline/pull.c | 27 ++++++-- src/protocol/pipeline/push.c | 18 +++++- src/protocol/pubsub/pub.c | 18 +++++- src/protocol/pubsub/sub.c | 40 ++++++++++-- src/protocol/reqrep/rep.c | 109 +++++++++++++++++++------------- src/protocol/reqrep/req.c | 91 +++++++++++++++++---------- src/protocol/survey/respond.c | 66 +++++++++++++------- src/protocol/survey/survey.c | 64 +++++++++++++------ 16 files changed, 536 insertions(+), 301 deletions(-) (limited to 'src') diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 6fd95f98..c1f3701c 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -27,12 +27,17 @@ struct nni_msgq { int mq_puterr; int mq_geterr; int mq_draining; + int mq_besteffort; nni_msg **mq_msgs; nni_list mq_aio_putq; nni_list mq_aio_getq; nni_list mq_aio_notify_get; nni_list mq_aio_notify_put; + + // Filters. + nni_msgq_filter mq_filter_fn; + void * mq_filter_arg; }; int @@ -157,6 +162,13 @@ nni_msgq_set_error(nni_msgq *mq, int error) nni_mtx_unlock(&mq->mq_lock); } +void +nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg) +{ + mq->mq_filter_fn = filter; + mq->mq_filter_arg = arg; +} + static void nni_msgq_run_putq(nni_msgq *mq) { @@ -173,12 +185,19 @@ nni_msgq_run_putq(nni_msgq *mq) // the queue is empty, otherwise it would have just taken // data from the queue. if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { - nni_aio_set_msg(waio, NULL); - nni_aio_list_remove(raio); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); + + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); + } + nni_aio_finish(waio, 0, len); - nni_aio_finish_msg(raio, msg); continue; } @@ -195,40 +214,76 @@ nni_msgq_run_putq(nni_msgq *mq) continue; } + // If we are in best effort mode, just drop the message + // as if we delivered. + if (mq->mq_besteffort) { + nni_list_remove(&mq->mq_aio_putq, waio); + nni_aio_set_msg(waio, NULL); + nni_msg_free(msg); + nni_aio_finish(waio, 0, len); + continue; + } + // Unable to make progress, leave the aio where it is. break; } } +void +nni_msgq_set_best_effort(nni_msgq *mq, int on) +{ + nni_mtx_lock(&mq->mq_lock); + mq->mq_besteffort = on; + if (on) { + nni_msgq_run_putq(mq); + } + nni_mtx_unlock(&mq->mq_lock); +} + static void nni_msgq_run_getq(nni_msgq *mq) { nni_aio *raio; nni_aio *waio; - nni_msg *msg; while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) { // If anything is waiting in the queue, get it first. if (mq->mq_len != 0) { - msg = mq->mq_msgs[mq->mq_get++]; + nni_msg *msg = mq->mq_msgs[mq->mq_get++]; if (mq->mq_get == mq->mq_alloc) { mq->mq_get = 0; } mq->mq_len--; - nni_aio_list_remove(raio); - nni_aio_finish_msg(raio, msg); + + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); + } continue; } // Nothing queued (unbuffered?), maybe a writer is waiting. if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) { + nni_msg *msg; + size_t len; msg = nni_aio_get_msg(waio); + len = nni_msg_len(msg); + nni_aio_set_msg(waio, NULL); nni_aio_list_remove(waio); - nni_aio_list_remove(raio); - nni_aio_finish(waio, 0, nni_msg_len(msg)); - nni_aio_finish_msg(raio, msg); + if (mq->mq_filter_fn != NULL) { + msg = mq->mq_filter_fn(mq->mq_filter_arg, msg); + } + if (msg != NULL) { + nni_aio_list_remove(raio); + nni_aio_finish_msg(raio, msg); + } + + nni_aio_finish(waio, 0, len); continue; } @@ -393,44 +448,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) return (NNG_EAGAIN); } -int -nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire) -{ - nni_aio *aio; - int rv; - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - nni_aio_set_timeout(aio, expire); - nni_msgq_aio_get(mq, aio); - nni_aio_wait(aio); - if ((rv = nni_aio_result(aio)) == 0) { - *msgp = nni_aio_get_msg(aio); - nni_aio_set_msg(aio, NULL); - } - nni_aio_fini(aio); - return (rv); -} - -int -nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire) -{ - nni_aio *aio; - int rv; - - if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { - return (rv); - } - nni_aio_set_timeout(aio, expire); - nni_aio_set_msg(aio, msg); - nni_msgq_aio_put(mq, aio); - nni_aio_wait(aio); - rv = nni_aio_result(aio); - nni_aio_fini(aio); - return (rv); -} - void nni_msgq_drain(nni_msgq *mq, nni_time expire) { diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index fa6b845a..bbac505d 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -41,28 +41,11 @@ extern void nni_msgq_aio_get(nni_msgq *, nni_aio *); extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *); extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *); -// nni_msgq_put puts the message to the queue. It blocks until it -// was able to do so, or the queue is closed, or a timeout is reached. -// It returns 0 on success, NNG_ECLOSED if the queue was closed, or -// NNG_ETIMEDOUT if the timeout is reached. If an error is returned, -// the caller is responsible for freeing the message with nni_msg_free(), -// otherwise the message is "owned" by the queue, and the caller is not -// permitted to access it further. -extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time); - // nni_msgq_tryput performs a non-blocking attempt to put a message on // the message queue. It is the same as calling nng_msgq_put_until with // a zero time. extern int nni_msgq_tryput(nni_msgq *, nni_msg *); -// nni_msgq_get_until gets a message from the queue. It blocks until a -// message is available, the queue is closed, or time out is reached. -// It returns 0 on success, NNG_ECLOSED if the queue was closed, or -// NNG_ETIMEDOUT if the timeout is reached. On successful return, -// the caller assumes ownership of the message and must call -// nni_msg_free() when it is finished with it. -extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time); - // nni_msgq_set_error sets an error condition on the message queue, // which causes all current and future readers/writes to return the // given error condition (if non-zero). Threads waiting to put or get @@ -81,6 +64,25 @@ extern void nni_msgq_set_put_error(nni_msgq *, int); // Readers (nni_msgq_put*) are unaffected. extern void nni_msgq_set_get_error(nni_msgq *, int); +// nni_msgq_set_best_effort marks the message queue best effort on send. +// What this does is treat the message queue condition as if it were +// successful, returning 0, and discarding the message. If zero is +// passed then this mode is reset to normal. +extern void nni_msgq_set_best_effort(nni_msgq *, int); + +// nni_msgq_filter is a callback function used to filter messages. +// The function is called on entry (put) or exit (get). The void +// argument is an opaque pointer supplied with the function at registration +// time. The primary use for these functions is to support the protocol +// socket needs. +typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *); + +// nni_msgq_set_filter sets the filter on the queue. Messages +// are filtered through this just before they are returned via the get +// functions. If the filter returns NULL, then the message is silently +// discarded instead, and any get waiters remain waiting. +extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *); + // nni_msgq_close closes the queue. After this all operates on the // message queue will return NNG_ECLOSED. Messages inside the queue // are freed. Unlike closing a go channel, this operation is idempotent. diff --git a/src/core/protocol.h b/src/core/protocol.h index 0c0d93ce..253452d8 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -74,15 +74,17 @@ struct nni_proto_sock_ops { // it can signal the socket worker threads to exit. void (*sock_close)(void *); - // Receive filter. This may be NULL, but if it isn't, then + // Send a message. + void (*sock_send)(void *, nni_aio *); + + // Receive a message. + void (*sock_recv)(void *, nni_aio *); + + // Message filter. This may be NULL, but if it isn't, then // messages coming into the system are routed here just before being - // delivered to the application. To drop the message, the prtocol + // delivered to the application. To drop the message, the protocol // should return NULL, otherwise the message (possibly modified). - nni_msg *(*sock_rfilter)(void *, nni_msg *); - - // Send filter. This may be NULL, but if it isn't, then messages - // here are filtered just after they come from the application. - nni_msg *(*sock_sfilter)(void *, nni_msg *); + nni_msg *(*sock_filter)(void *, nni_msg *); // Options. Must not be NULL. Final entry should have NULL name. nni_proto_sock_option *sock_options; diff --git a/src/core/socket.c b/src/core/socket.c index 8895f7a7..bc1f446d 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -66,12 +66,9 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock - int s_besteffort; // Best effort mode delivery - int s_senderr; // Protocol state machine use - int s_recverr; // Protocol state machine use + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock nni_event s_recv_ev; // Event for readability nni_event s_send_ev; // Event for sendability @@ -374,15 +371,19 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe) } void -nni_sock_lock(nni_sock *sock) +nni_sock_send_pending(nni_sock *sock) { - nni_mtx_lock(&sock->s_mx); + if (sock->s_send_fd.sn_init) { + nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); + } } void -nni_sock_unlock(nni_sock *sock) +nni_sock_recv_pending(nni_sock *sock) { - nni_mtx_unlock(&sock->s_mx); + if (sock->s_recv_fd.sn_init) { + nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); + } } static void @@ -551,6 +552,11 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) return (rv); } + if (s->s_sock_ops.sock_filter != NULL) { + nni_msgq_set_filter( + s->s_urq, s->s_sock_ops.sock_filter, s->s_data); + } + *sp = s; return (rv); } @@ -779,13 +785,23 @@ nni_sock_closeall(void) } } +void +nni_sock_send(nni_sock *sock, nni_aio *aio) +{ + sock->s_sock_ops.sock_send(sock->s_data, aio); +} + int nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) { int rv; - int besteffort; nni_time expire; nni_time timeo = sock->s_sndtimeo; + nni_aio *aio; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { expire = NNI_TIME_ZERO; @@ -795,47 +811,24 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags) expire = nni_clock(); expire += timeo; } + nni_aio_set_timeout(aio, expire); + nni_aio_set_msg(aio, msg); - // Senderr is typically set by protocols when the state machine - // indicates that it is no longer valid to send a message. E.g. - // a REP socket with no REQ pending. - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if ((rv = sock->s_senderr) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } - besteffort = sock->s_besteffort; - - if (sock->s_sock_ops.sock_sfilter != NULL) { - msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg); - } - nni_mtx_unlock(&sock->s_mx); + nni_sock_send(sock, aio); + nni_aio_wait(aio); - if (msg == NULL) { - return (0); - } + rv = nni_aio_result(aio); + nni_aio_fini(aio); - if (besteffort) { - // BestEffort mode -- if we cannot handle the message due to - // backpressure, we just throw it away, and don't complain. - expire = NNI_TIME_ZERO; - } - if (sock->s_send_fd.sn_init) { - nni_plat_pipe_clear(sock->s_send_fd.sn_rfd); - } - rv = nni_msgq_put_until(sock->s_uwq, msg, expire); - if (besteffort && (rv == NNG_ETIMEDOUT)) { - // Pretend this worked... it didn't, but pretend. - nni_msg_free(msg); - return (0); - } return (rv); } +void +nni_sock_recv(nni_sock *sock, nni_aio *aio) +{ + sock->s_sock_ops.sock_recv(sock->s_data, aio); +} + int nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) { @@ -843,6 +836,11 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) nni_msg *msg; nni_time expire; nni_time timeo = sock->s_rcvtimeo; + nni_aio *aio; + + if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { + return (rv); + } if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) { expire = NNI_TIME_ZERO; @@ -853,39 +851,23 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags) expire += timeo; } - nni_mtx_lock(&sock->s_mx); - if (sock->s_closing) { - nni_mtx_unlock(&sock->s_mx); - return (NNG_ECLOSED); - } - if ((rv = sock->s_recverr) != 0) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } - nni_mtx_unlock(&sock->s_mx); - - if (sock->s_recv_fd.sn_init) { - nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd); - } - for (;;) { - rv = nni_msgq_get_until(sock->s_urq, &msg, expire); + nni_aio_set_timeout(aio, expire); + nni_sock_recv(sock, aio); + nni_aio_wait(aio); + + rv = nni_aio_result(aio); if (rv != 0) { - return (rv); - } - if (sock->s_sock_ops.sock_rfilter != NULL) { - nni_mtx_lock(&sock->s_mx); - msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg); - nni_mtx_unlock(&sock->s_mx); - } - if (msg != NULL) { break; } - // Protocol dropped the message; try again. - } + msg = nni_aio_get_msg(aio); + nni_aio_set_msg(aio, NULL); - *msgp = msg; - return (0); + *msgp = msg; + break; + } + nni_aio_fini(aio); + return (rv); } // nni_sock_protocol returns the socket's 16-bit protocol number. @@ -949,18 +931,6 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep) nni_mtx_unlock(&sock->s_mx); } -void -nni_sock_recverr(nni_sock *sock, int err) -{ - sock->s_recverr = err; -} - -void -nni_sock_senderr(nni_sock *sock, int err) -{ - sock->s_senderr = err; -} - int nni_sock_setopt(nni_sock *s, const char *name, const void *val, size_t size) { diff --git a/src/core/socket.h b/src/core/socket.h index 850c4641..edfe6447 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -22,15 +22,14 @@ extern void nni_sock_closeall(void); extern int nni_sock_shutdown(nni_sock *); extern uint16_t nni_sock_proto(nni_sock *); extern uint16_t nni_sock_peer(nni_sock *); -extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t); -extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *); -extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int); -extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int); +extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t); +extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *); +extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int); +extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int); +extern void nni_sock_send(nni_sock *, nni_aio *); +extern void nni_sock_recv(nni_sock *, nni_aio *); extern uint32_t nni_sock_id(nni_sock *); -extern void nni_sock_lock(nni_sock *); -extern void nni_sock_unlock(nni_sock *); - extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *); extern void nni_sock_unnotify(nni_sock *, nni_notify *); @@ -46,16 +45,20 @@ extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p); extern int nni_sock_ep_add(nni_sock *, nni_ep *); extern void nni_sock_ep_remove(nni_sock *, nni_ep *); -// Set error codes for applications. These are only ever -// called from the filter functions in protocols, and thus -// already have the socket lock held. -extern void nni_sock_recverr(nni_sock *, int); -extern void nni_sock_senderr(nni_sock *, int); - // These are socket methods that protocol operations can expect to call. // Note that each of these should be called without any locks held, since // the socket can reenter the protocol. +// nni_sock_send_pending is called by the protocol when it enqueues +// a send operation. The main purpose of this is to clear the raised +// signal raised on the event descriptor. +extern void nni_sock_send_pending(nni_sock *); + +// nni_sock_recv_pending is called by the protocl when it enqueues +// a receive operation. The main purpose of this is to clear the raised +// signal raised on the event descriptor. +extern void nni_sock_recv_pending(nni_sock *); + // nni_socket_sendq obtains the upper writeq. The protocol should // recieve messages from this, and place them on the appropriate pipe. extern nni_msgq *nni_sock_sendq(nni_sock *); diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index cad21989..046aa19a 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -22,6 +22,9 @@ typedef struct bus_pipe bus_pipe; typedef struct bus_sock bus_sock; static void bus_sock_getq(bus_sock *); +static void bus_sock_send(void *, nni_aio *); +static void bus_sock_recv(void *, nni_aio *); + static void bus_pipe_getq(bus_pipe *); static void bus_pipe_send(bus_pipe *); static void bus_pipe_recv(bus_pipe *); @@ -39,6 +42,8 @@ struct bus_sock { nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; + nni_msgq *uwq; + nni_msgq *urq; }; // A bus_pipe is our per-pipe protocol private structure. @@ -82,6 +87,8 @@ bus_sock_init(void **sp, nni_sock *nsock) } s->nsock = nsock; s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); *sp = s; return (0); @@ -259,7 +266,6 @@ bus_sock_getq_cb(void *arg) bus_sock *s = arg; bus_pipe *p; bus_pipe *lastp; - nni_msgq *uwq = nni_sock_sendq(s->nsock); nni_msg * msg; nni_msg * dup; uint32_t sender; @@ -338,6 +344,24 @@ bus_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +bus_sock_send(void *arg, nni_aio *aio) +{ + bus_sock *s = arg; + + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +bus_sock_recv(void *arg, nni_aio *aio) +{ + bus_sock *s = arg; + + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops bus_pipe_ops = { .pipe_init = bus_pipe_init, .pipe_fini = bus_pipe_fini, @@ -360,6 +384,8 @@ static nni_proto_sock_ops bus_sock_ops = { .sock_fini = bus_sock_fini, .sock_open = bus_sock_open, .sock_close = bus_sock_close, + .sock_send = bus_sock_send, + .sock_recv = bus_sock_recv, .sock_options = bus_sock_options, }; diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index a87af593..cca03cc8 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -243,6 +243,24 @@ pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +pair0_sock_send(void *arg, nni_aio *aio) +{ + pair0_sock *s = arg; + + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +pair0_sock_recv(void *arg, nni_aio *aio) +{ + pair0_sock *s = arg; + + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops pair0_pipe_ops = { .pipe_init = pair0_pipe_init, .pipe_fini = pair0_pipe_fini, @@ -265,6 +283,8 @@ static nni_proto_sock_ops pair0_sock_ops = { .sock_fini = pair0_sock_fini, .sock_open = pair0_sock_open, .sock_close = pair0_sock_close, + .sock_send = pair0_sock_send, + .sock_recv = pair0_sock_recv, .sock_options = pair0_sock_options, }; diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index 6c9d4bd3..f43a4785 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -446,6 +446,24 @@ pair1_sock_getopt_poly(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->poly, buf, szp)); } +static void +pair1_sock_send(void *arg, nni_aio *aio) +{ + pair1_sock *s = arg; + + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +pair1_sock_recv(void *arg, nni_aio *aio) +{ + pair1_sock *s = arg; + + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops pair1_pipe_ops = { .pipe_init = pair1_pipe_init, .pipe_fini = pair1_pipe_fini, @@ -478,6 +496,8 @@ static nni_proto_sock_ops pair1_sock_ops = { .sock_fini = pair1_sock_fini, .sock_open = pair1_sock_open, .sock_close = pair1_sock_close, + .sock_recv = pair1_sock_recv, + .sock_send = pair1_sock_send, .sock_options = pair1_sock_options, }; diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 80db2425..7dd0c8ed 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -26,6 +26,7 @@ static void pull_putq(pull_pipe *, nni_msg *); struct pull_sock { nni_msgq *urq; int raw; + nni_sock *sock; }; // A pull_pipe is our per-pipe protocol private structure. @@ -44,10 +45,11 @@ pull_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->urq = nni_sock_recvq(sock); - *sp = s; - nni_sock_senderr(sock, NNG_ENOTSUP); + s->raw = 0; + s->urq = nni_sock_recvq(sock); + s->sock = sock; + + *sp = s; return (0); } @@ -185,6 +187,21 @@ pull_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +pull_sock_send(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +pull_sock_recv(void *arg, nni_aio *aio) +{ + pull_sock *s = arg; + + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops pull_pipe_ops = { .pipe_init = pull_pipe_init, .pipe_fini = pull_pipe_fini, @@ -207,6 +224,8 @@ static nni_proto_sock_ops pull_sock_ops = { .sock_fini = pull_sock_fini, .sock_open = pull_sock_open, .sock_close = pull_sock_close, + .sock_send = pull_sock_send, + .sock_recv = pull_sock_recv, .sock_options = pull_sock_options, }; diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 77db7fcb..af7b80ca 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -54,7 +54,6 @@ push_sock_init(void **sp, nni_sock *sock) s->sock = sock; s->uwq = nni_sock_sendq(sock); *sp = s; - nni_sock_recverr(sock, NNG_ENOTSUP); return (0); } @@ -205,6 +204,21 @@ push_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +push_sock_send(void *arg, nni_aio *aio) +{ + push_sock *s = arg; + + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +push_sock_recv(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + static nni_proto_pipe_ops push_pipe_ops = { .pipe_init = push_pipe_init, .pipe_fini = push_pipe_fini, @@ -228,6 +242,8 @@ static nni_proto_sock_ops push_sock_ops = { .sock_open = push_sock_open, .sock_close = push_sock_close, .sock_options = push_sock_options, + .sock_send = push_sock_send, + .sock_recv = push_sock_recv, }; static nni_proto push_proto = { diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 03f4603a..4604d0ff 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -82,7 +82,6 @@ pub_sock_init(void **sp, nni_sock *sock) s->uwq = nni_sock_sendq(sock); *sp = s; - nni_sock_recverr(sock, NNG_ENOTSUP); return (0); } @@ -281,6 +280,21 @@ pub_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +pub_sock_recv(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +pub_sock_send(void *arg, nni_aio *aio) +{ + pub_sock *s = arg; + + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); +} + static nni_proto_pipe_ops pub_pipe_ops = { .pipe_init = pub_pipe_init, .pipe_fini = pub_pipe_fini, @@ -303,6 +317,8 @@ static nni_proto_sock_ops pub_sock_ops = { .sock_fini = pub_sock_fini, .sock_open = pub_sock_open, .sock_close = pub_sock_close, + .sock_send = pub_sock_send, + .sock_recv = pub_sock_recv, .sock_options = pub_sock_options, }; diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 8b2ed209..323bbb2f 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -40,6 +40,7 @@ struct sub_sock { nni_list topics; nni_msgq *urq; int raw; + nni_mtx lk; }; // An nni_rep_pipe is our per-pipe protocol private structure. @@ -58,13 +59,13 @@ sub_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub_topic, node); s->sock = sock; s->raw = 0; s->urq = nni_sock_recvq(sock); - nni_sock_senderr(sock, NNG_ENOTSUP); - *sp = s; + *sp = s; return (0); } @@ -79,6 +80,7 @@ sub_sock_fini(void *arg) nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); } + nni_mtx_fini(&s->lk); NNI_FREE_STRUCT(s); } @@ -190,6 +192,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz) sub_topic *topic; sub_topic *newtopic; + nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { int rv; @@ -201,6 +204,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz) if (rv == 0) { if (topic->len == sz) { // Already inserted. + nni_mtx_unlock(&s->lk); return (0); } if (topic->len > sz) { @@ -212,9 +216,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) { + nni_mtx_unlock(&s->lk); return (NNG_ENOMEM); } if ((newtopic->buf = nni_alloc(sz)) == NULL) { + nni_mtx_unlock(&s->lk); return (NNG_ENOMEM); } NNI_LIST_NODE_INIT(&newtopic->node); @@ -225,6 +231,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz) } else { nni_list_append(&s->topics, newtopic); } + nni_mtx_unlock(&s->lk); return (0); } @@ -235,6 +242,7 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) sub_topic *topic; int rv; + nni_mtx_lock(&s->lk); NNI_LIST_FOREACH (&s->topics, topic) { if (topic->len >= sz) { rv = memcmp(topic->buf, buf, sz); @@ -244,18 +252,22 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz) if (rv == 0) { if (topic->len == sz) { nni_list_remove(&s->topics, topic); + nni_mtx_unlock(&s->lk); nni_free(topic->buf, topic->len); NNI_FREE_STRUCT(topic); return (0); } if (topic->len > sz) { + nni_mtx_unlock(&s->lk); return (NNG_ENOENT); } } if (rv > 0) { + nni_mtx_unlock(&s->lk); return (NNG_ENOENT); } } + nni_mtx_unlock(&s->lk); return (NNG_ENOENT); } @@ -273,8 +285,23 @@ sub_sock_getopt_raw(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->raw, buf, szp)); } +static void +sub_sock_send(void *arg, nni_aio *aio) +{ + nni_aio_finish_error(aio, NNG_ENOTSUP); +} + +static void +sub_sock_recv(void *arg, nni_aio *aio) +{ + sub_sock *s = arg; + + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_msg * -sub_sock_rfilter(void *arg, nni_msg *msg) +sub_sock_filter(void *arg, nni_msg *msg) { sub_sock * s = arg; sub_topic *topic; @@ -282,7 +309,9 @@ sub_sock_rfilter(void *arg, nni_msg *msg) size_t len; int match; + nni_mtx_lock(&s->lk); if (s->raw) { + nni_mtx_unlock(&s->lk); return (msg); } @@ -308,6 +337,7 @@ sub_sock_rfilter(void *arg, nni_msg *msg) break; } } + nni_mtx_unlock(&s->lk); if (!match) { nni_msg_free(msg); return (NULL); @@ -349,8 +379,10 @@ static nni_proto_sock_ops sub_sock_ops = { .sock_fini = sub_sock_fini, .sock_open = sub_sock_open, .sock_close = sub_sock_close, + .sock_send = sub_sock_send, + .sock_recv = sub_sock_recv, + .sock_filter = sub_sock_filter, .sock_options = sub_sock_options, - .sock_rfilter = sub_sock_rfilter, }; static nni_proto sub_proto = { diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index cd9411d9..510beab3 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -32,6 +32,7 @@ struct rep_sock { nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; + nni_mtx lk; int raw; int ttl; nni_idhash *pipes; @@ -62,6 +63,7 @@ rep_sock_fini(void *arg) if (s->btrace != NULL) { nni_free(s->btrace, s->btrace_len); } + nni_mtx_fini(&s->lk); NNI_FREE_STRUCT(s); } @@ -74,6 +76,7 @@ rep_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&s->lk); if (((rv = nni_idhash_init(&s->pipes)) != 0) || ((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) { rep_sock_fini(s); @@ -89,7 +92,6 @@ rep_sock_init(void **sp, nni_sock *sock) s->urq = nni_sock_recvq(sock); *sp = s; - nni_sock_senderr(sock, NNG_ESTATE); return (0); } @@ -215,6 +217,7 @@ rep_sock_getq_cb(void *arg) // Look for the pipe, and attempt to put the message there // (nonblocking) if we can. If we can't for any reason, then we // free the message. + // XXX: LOCKING?!?! if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { rv = nni_msgq_tryput(p->sendq, msg); } @@ -347,10 +350,10 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz) { rep_sock *s = arg; int rv; + + nni_mtx_lock(&s->lk); rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - if (rv == 0) { - nni_sock_senderr(s->sock, s->raw ? 0 : NNG_ESTATE); - } + nni_mtx_unlock(&s->lk); return (rv); } @@ -376,53 +379,18 @@ rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) } static nni_msg * -rep_sock_sfilter(void *arg, nni_msg *msg) -{ - rep_sock *s = arg; - - if (s->raw) { - return (msg); - } - - // Cannot send again until a receive is done... - nni_sock_senderr(s->sock, NNG_ESTATE); - - // If we have a stored backtrace, append it to the header... - // if we don't have a backtrace, discard the message. - if (s->btrace == NULL) { - nni_msg_free(msg); - return (NULL); - } - - // drop anything else in the header... - nni_msg_header_clear(msg); - - if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - nni_msg_free(msg); - return (NULL); - } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - return (msg); -} - -static nni_msg * -rep_sock_rfilter(void *arg, nni_msg *msg) +rep_sock_filter(void *arg, nni_msg *msg) { rep_sock *s = arg; char * header; size_t len; + nni_mtx_lock(&s->lk); if (s->raw) { + nni_mtx_unlock(&s->lk); return (msg); } - nni_sock_senderr(s->sock, 0); len = nni_msg_header_len(msg); header = nni_msg_header(msg); if (s->btrace != NULL) { @@ -437,9 +405,61 @@ rep_sock_rfilter(void *arg, nni_msg *msg) s->btrace_len = len; memcpy(s->btrace, header, len); nni_msg_header_clear(msg); + nni_mtx_unlock(&s->lk); return (msg); } +static void +rep_sock_send(void *arg, nni_aio *aio) +{ + rep_sock *s = arg; + int rv; + nni_msg * msg; + + nni_mtx_lock(&s->lk); + if (s->raw) { + // Pass thru + nni_mtx_unlock(&s->lk); + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); + return; + } + if (s->btrace == NULL) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + + msg = nni_aio_get_msg(aio); + + // drop anything else in the header... (it should already be + // empty, but there can be stale backtrace info there.) + nni_msg_header_clear(msg); + + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + + nni_free(s->btrace, s->btrace_len); + s->btrace = NULL; + s->btrace_len = 0; + + nni_mtx_unlock(&s->lk); + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); +} + +static void +rep_sock_recv(void *arg, nni_aio *aio) +{ + rep_sock *s = arg; + + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + // This is the global protocol structure -- our linkage to the core. // This should be the only global non-static symbol in this file. static nni_proto_pipe_ops rep_pipe_ops = { @@ -470,8 +490,9 @@ static nni_proto_sock_ops rep_sock_ops = { .sock_open = rep_sock_open, .sock_close = rep_sock_close, .sock_options = rep_sock_options, - .sock_rfilter = rep_sock_rfilter, - .sock_sfilter = rep_sock_sfilter, + .sock_filter = rep_sock_filter, + .sock_send = rep_sock_send, + .sock_recv = rep_sock_recv, }; static nni_proto nni_rep_proto = { diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 24d01df2..81abd306 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -99,7 +99,7 @@ req_sock_init(void **sp, nni_sock *sock) s->uwq = nni_sock_sendq(sock); s->urq = nni_sock_recvq(sock); *sp = s; - nni_sock_recverr(sock, NNG_ESTATE); + return (0); } @@ -249,12 +249,7 @@ static int req_sock_setopt_raw(void *arg, const void *buf, size_t sz) { req_sock *s = arg; - int rv; - rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); - if (rv == 0) { - nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE); - } - return (rv); + return (nni_setopt_int(&s->raw, buf, sz, 0, 1)); } static int @@ -505,35 +500,46 @@ req_resend(req_sock *s) } } -static nni_msg * -req_sock_sfilter(void *arg, nni_msg *msg) +static void +req_sock_send(void *arg, nni_aio *aio) { req_sock *s = arg; uint32_t id; + size_t len; + nni_msg * msg; + int rv; + nni_mtx_lock(&s->mtx); if (s->raw) { - // No automatic retry, and the request ID must - // be in the header coming down. - return (msg); + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->sock); + nni_msgq_aio_put(s->uwq, aio); + return; } + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + + // In cooked mode, because we need to manage our own resend logic, + // we bypass the upper writeq entirely. + // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the // backtrace. (Pipe IDs have the high order bit clear.) id = (s->nextid++) | 0x80000000u; - // Request ID is in big endian format. NNI_PUT32(s->reqid, id); - if (nni_msg_header_append(msg, s->reqid, 4) != 0) { - // Should be ENOMEM. - nni_msg_free(msg); - return (NULL); + if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } - // NB: The socket lock is also held, so this is always self-serialized. - // But we have to serialize against other async callbacks. - nni_mtx_lock(&s->mtx); + // XXX: I think we should just not do this... and leave the + // socket "permanently writeable". This does screw up all the + // backpressure. + // nni_sock_send_pending(s->sock); // If another message is there, this cancels it. if (s->reqmsg != NULL) { @@ -541,6 +547,8 @@ req_sock_sfilter(void *arg, nni_msg *msg) s->reqmsg = NULL; } + nni_aio_set_msg(aio, NULL); + // Make a duplicate message... for retries. s->reqmsg = msg; // Schedule for immediate send @@ -548,40 +556,41 @@ req_sock_sfilter(void *arg, nni_msg *msg) s->wantw = 1; req_resend(s); - nni_mtx_unlock(&s->mtx); - // Clear the error condition. - nni_sock_recverr(s->sock, 0); + nni_mtx_unlock(&s->mtx); - return (NULL); + nni_aio_finish(aio, 0, len); } static nni_msg * -req_sock_rfilter(void *arg, nni_msg *msg) +req_sock_filter(void *arg, nni_msg *msg) { req_sock *s = arg; nni_msg * rmsg; + nni_mtx_lock(&s->mtx); if (s->raw) { // Pass it unmolested + nni_mtx_unlock(&s->mtx); return (msg); } if (nni_msg_header_len(msg) < 4) { + nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } - nni_mtx_lock(&s->mtx); - if ((rmsg = s->reqmsg) == NULL) { - // We had no outstanding request. + // We had no outstanding request. (Perhaps canceled, + // or duplicate response.) nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } + if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { - // Wrong request id + // Wrong request id. nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); @@ -591,12 +600,29 @@ req_sock_rfilter(void *arg, nni_msg *msg) s->pendpipe = NULL; nni_mtx_unlock(&s->mtx); - nni_sock_recverr(s->sock, NNG_ESTATE); nni_msg_free(rmsg); return (msg); } +static void +req_sock_recv(void *arg, nni_aio *aio) +{ + req_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (!s->raw) { + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + } + nni_mtx_unlock(&s->mtx); + nni_sock_recv_pending(s->sock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops req_pipe_ops = { .pipe_init = req_pipe_init, .pipe_fini = req_pipe_fini, @@ -630,8 +656,9 @@ static nni_proto_sock_ops req_sock_ops = { .sock_open = req_sock_open, .sock_close = req_sock_close, .sock_options = req_sock_options, - .sock_rfilter = req_sock_rfilter, - .sock_sfilter = req_sock_sfilter, + .sock_filter = req_sock_filter, + .sock_send = req_sock_send, + .sock_recv = req_sock_recv, }; static nni_proto req_proto = { diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index fcb067b0..4c3ea8e3 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -77,6 +77,7 @@ resp_sock_init(void **sp, nni_sock *nsock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + nni_mtx_init(&s->mtx); if (((rv = nni_idhash_init(&s->pipes)) != 0) || ((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) { resp_sock_fini(s); @@ -91,10 +92,7 @@ resp_sock_init(void **sp, nni_sock *nsock) s->urq = nni_sock_recvq(nsock); s->uwq = nni_sock_sendq(nsock); - nni_mtx_init(&s->mtx); - *sp = s; - nni_sock_senderr(nsock, NNG_ESTATE); return (0); } @@ -349,9 +347,9 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz) resp_sock *s = arg; int rv; - if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { - nni_sock_senderr(s->nsock, s->raw ? 0 : NNG_ESTATE); - } + nni_mtx_lock(&s->mtx); + rv = nni_setopt_int(&s->raw, buf, sz, 0, 1); + nni_mtx_unlock(&s->mtx); return (rv); } @@ -376,54 +374,62 @@ resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp) return (nni_getopt_int(s->ttl, buf, szp)); } -static nni_msg * -resp_sock_sfilter(void *arg, nni_msg *msg) +static void +resp_sock_send(void *arg, nni_aio *aio) { resp_sock *s = arg; + nni_msg * msg; + int rv; + nni_mtx_lock(&s->mtx); if (s->raw) { - return (msg); + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); + return; } - // Cannot send again until a receive is done... - nni_sock_senderr(s->nsock, NNG_ESTATE); + msg = nni_aio_get_msg(aio); // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. if (s->btrace == NULL) { - nni_msg_free(msg); - return (NULL); + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } // drop anything else in the header... nni_msg_header_clear(msg); - if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - nni_msg_free(msg); - return (NULL); + if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } nni_free(s->btrace, s->btrace_len); s->btrace = NULL; s->btrace_len = 0; - return (msg); + + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); } static nni_msg * -resp_sock_rfilter(void *arg, nni_msg *msg) +resp_sock_filter(void *arg, nni_msg *msg) { resp_sock *s = arg; char * header; size_t len; + nni_mtx_lock(&s->mtx); if (s->raw) { + nni_mtx_unlock(&s->mtx); return (msg); } - nni_sock_senderr(s->nsock, 0); len = nni_msg_header_len(msg); header = nni_msg_header(msg); if (s->btrace != NULL) { @@ -432,15 +438,26 @@ resp_sock_rfilter(void *arg, nni_msg *msg) s->btrace_len = 0; } if ((s->btrace = nni_alloc(len)) == NULL) { + nni_mtx_unlock(&s->mtx); nni_msg_free(msg); return (NULL); } s->btrace_len = len; memcpy(s->btrace, header, len); nni_msg_header_clear(msg); + nni_mtx_unlock(&s->mtx); return (msg); } +static void +resp_sock_recv(void *arg, nni_aio *aio) +{ + resp_sock *s = arg; + + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops resp_pipe_ops = { .pipe_init = resp_pipe_init, .pipe_fini = resp_pipe_fini, @@ -468,9 +485,10 @@ static nni_proto_sock_ops resp_sock_ops = { .sock_fini = resp_sock_fini, .sock_open = resp_sock_open, .sock_close = resp_sock_close, + .sock_filter = resp_sock_filter, + .sock_send = resp_sock_send, + .sock_recv = resp_sock_recv, .sock_options = resp_sock_options, - .sock_rfilter = resp_sock_rfilter, - .sock_sfilter = resp_sock_sfilter, }; static nni_proto resp_proto = { diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 1205402e..1c15054f 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -92,7 +92,6 @@ surv_sock_init(void **sp, nni_sock *nsock) s->urq = nni_sock_recvq(nsock); *sp = s; - nni_sock_recverr(nsock, NNG_ESTATE); return (0); } @@ -273,11 +272,12 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz) surv_sock *s = arg; int rv; + nni_mtx_lock(&s->mtx); if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) { - nni_sock_recverr(s->nsock, s->raw ? 0 : NNG_ESTATE); s->survid = 0; nni_timer_cancel(&s->timer); } + nni_mtx_unlock(&s->mtx); return (rv); } @@ -344,22 +344,43 @@ surv_timeout(void *arg) { surv_sock *s = arg; - nni_sock_lock(s->nsock); + nni_mtx_lock(&s->mtx); s->survid = 0; - nni_sock_recverr(s->nsock, NNG_ESTATE); + nni_mtx_unlock(&s->mtx); nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT); - nni_sock_unlock(s->nsock); } -static nni_msg * -surv_sock_sfilter(void *arg, nni_msg *msg) +static void +surv_sock_recv(void *arg, nni_aio *aio) +{ + surv_sock *s = arg; + + nni_mtx_lock(&s->mtx); + if (s->survid == 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + nni_mtx_unlock(&s->mtx); + nni_sock_recv_pending(s->nsock); + nni_msgq_aio_get(s->urq, aio); +} + +static void +surv_sock_send(void *arg, nni_aio *aio) { surv_sock *s = arg; + nni_msg * msg; + int rv; + nni_mtx_lock(&s->mtx); if (s->raw) { // No automatic retry, and the request ID must // be in the header coming down. - return (msg); + nni_mtx_unlock(&s->mtx); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); + return; } // Generate a new request ID. We always set the high @@ -367,10 +388,12 @@ surv_sock_sfilter(void *arg, nni_msg *msg) // backtrace. (Pipe IDs have the high order bit clear.) s->survid = (s->nextid++) | 0x80000000u; - if (nni_msg_header_append_u32(msg, s->survid) != 0) { - // Should be ENOMEM. - nni_msg_free(msg); - return (NULL); + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, rv); + return; } // If another message is there, this cancels it. We move the @@ -379,20 +402,21 @@ surv_sock_sfilter(void *arg, nni_msg *msg) s->expire = nni_clock() + s->survtime; nni_timer_schedule(&s->timer, s->expire); - // Clear the error condition. - nni_sock_recverr(s->nsock, 0); - // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0); + nni_mtx_unlock(&s->mtx); - return (msg); + nni_sock_send_pending(s->nsock); + nni_msgq_aio_put(s->uwq, aio); } static nni_msg * -surv_sock_rfilter(void *arg, nni_msg *msg) +surv_sock_filter(void *arg, nni_msg *msg) { surv_sock *s = arg; + nni_mtx_lock(&s->mtx); if (s->raw) { // Pass it unmolested + nni_mtx_unlock(&s->mtx); return (msg); } @@ -402,6 +426,7 @@ surv_sock_rfilter(void *arg, nni_msg *msg) nni_msg_free(msg); return (NULL); } + nni_mtx_unlock(&s->mtx); return (msg); } @@ -433,9 +458,10 @@ static nni_proto_sock_ops surv_sock_ops = { .sock_fini = surv_sock_fini, .sock_open = surv_sock_open, .sock_close = surv_sock_close, + .sock_send = surv_sock_send, + .sock_recv = surv_sock_recv, + .sock_filter = surv_sock_filter, .sock_options = surv_sock_options, - .sock_rfilter = surv_sock_rfilter, - .sock_sfilter = surv_sock_sfilter, }; static nni_proto surv_proto = { -- cgit v1.2.3-70-g09d2