aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/msgqueue.c113
-rw-r--r--src/core/msgqueue.h36
-rw-r--r--src/core/protocol.h16
-rw-r--r--src/core/socket.c142
-rw-r--r--src/core/socket.h29
-rw-r--r--src/protocol/bus/bus.c28
-rw-r--r--src/protocol/pair/pair_v0.c20
-rw-r--r--src/protocol/pair/pair_v1.c20
-rw-r--r--src/protocol/pipeline/pull.c27
-rw-r--r--src/protocol/pipeline/push.c18
-rw-r--r--src/protocol/pubsub/pub.c18
-rw-r--r--src/protocol/pubsub/sub.c40
-rw-r--r--src/protocol/reqrep/rep.c109
-rw-r--r--src/protocol/reqrep/req.c91
-rw-r--r--src/protocol/survey/respond.c66
-rw-r--r--src/protocol/survey/survey.c64
16 files changed, 536 insertions, 301 deletions
diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c
index 6fd95f98..c1f3701c 100644
--- a/src/core/msgqueue.c
+++ b/src/core/msgqueue.c
@@ -27,12 +27,17 @@ struct nni_msgq {
int mq_puterr;
int mq_geterr;
int mq_draining;
+ int mq_besteffort;
nni_msg **mq_msgs;
nni_list mq_aio_putq;
nni_list mq_aio_getq;
nni_list mq_aio_notify_get;
nni_list mq_aio_notify_put;
+
+ // Filters.
+ nni_msgq_filter mq_filter_fn;
+ void * mq_filter_arg;
};
int
@@ -157,6 +162,13 @@ nni_msgq_set_error(nni_msgq *mq, int error)
nni_mtx_unlock(&mq->mq_lock);
}
+void
+nni_msgq_set_filter(nni_msgq *mq, nni_msgq_filter filter, void *arg)
+{
+ mq->mq_filter_fn = filter;
+ mq->mq_filter_arg = arg;
+}
+
static void
nni_msgq_run_putq(nni_msgq *mq)
{
@@ -173,12 +185,19 @@ nni_msgq_run_putq(nni_msgq *mq)
// the queue is empty, otherwise it would have just taken
// data from the queue.
if ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
- nni_aio_set_msg(waio, NULL);
- nni_aio_list_remove(raio);
+ nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
+
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
+ }
+
nni_aio_finish(waio, 0, len);
- nni_aio_finish_msg(raio, msg);
continue;
}
@@ -195,40 +214,76 @@ nni_msgq_run_putq(nni_msgq *mq)
continue;
}
+ // If we are in best effort mode, just drop the message
+ // as if we delivered.
+ if (mq->mq_besteffort) {
+ nni_list_remove(&mq->mq_aio_putq, waio);
+ nni_aio_set_msg(waio, NULL);
+ nni_msg_free(msg);
+ nni_aio_finish(waio, 0, len);
+ continue;
+ }
+
// Unable to make progress, leave the aio where it is.
break;
}
}
+void
+nni_msgq_set_best_effort(nni_msgq *mq, int on)
+{
+ nni_mtx_lock(&mq->mq_lock);
+ mq->mq_besteffort = on;
+ if (on) {
+ nni_msgq_run_putq(mq);
+ }
+ nni_mtx_unlock(&mq->mq_lock);
+}
+
static void
nni_msgq_run_getq(nni_msgq *mq)
{
nni_aio *raio;
nni_aio *waio;
- nni_msg *msg;
while ((raio = nni_list_first(&mq->mq_aio_getq)) != NULL) {
// If anything is waiting in the queue, get it first.
if (mq->mq_len != 0) {
- msg = mq->mq_msgs[mq->mq_get++];
+ nni_msg *msg = mq->mq_msgs[mq->mq_get++];
if (mq->mq_get == mq->mq_alloc) {
mq->mq_get = 0;
}
mq->mq_len--;
- nni_aio_list_remove(raio);
- nni_aio_finish_msg(raio, msg);
+
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
+ }
continue;
}
// Nothing queued (unbuffered?), maybe a writer is waiting.
if ((waio = nni_list_first(&mq->mq_aio_putq)) != NULL) {
+ nni_msg *msg;
+ size_t len;
msg = nni_aio_get_msg(waio);
+ len = nni_msg_len(msg);
+
nni_aio_set_msg(waio, NULL);
nni_aio_list_remove(waio);
- nni_aio_list_remove(raio);
- nni_aio_finish(waio, 0, nni_msg_len(msg));
- nni_aio_finish_msg(raio, msg);
+ if (mq->mq_filter_fn != NULL) {
+ msg = mq->mq_filter_fn(mq->mq_filter_arg, msg);
+ }
+ if (msg != NULL) {
+ nni_aio_list_remove(raio);
+ nni_aio_finish_msg(raio, msg);
+ }
+
+ nni_aio_finish(waio, 0, len);
continue;
}
@@ -393,44 +448,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg)
return (NNG_EAGAIN);
}
-int
-nni_msgq_get_until(nni_msgq *mq, nni_msg **msgp, nni_time expire)
-{
- nni_aio *aio;
- int rv;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
- nni_aio_set_timeout(aio, expire);
- nni_msgq_aio_get(mq, aio);
- nni_aio_wait(aio);
- if ((rv = nni_aio_result(aio)) == 0) {
- *msgp = nni_aio_get_msg(aio);
- nni_aio_set_msg(aio, NULL);
- }
- nni_aio_fini(aio);
- return (rv);
-}
-
-int
-nni_msgq_put_until(nni_msgq *mq, nni_msg *msg, nni_time expire)
-{
- nni_aio *aio;
- int rv;
-
- if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
- return (rv);
- }
- nni_aio_set_timeout(aio, expire);
- nni_aio_set_msg(aio, msg);
- nni_msgq_aio_put(mq, aio);
- nni_aio_wait(aio);
- rv = nni_aio_result(aio);
- nni_aio_fini(aio);
- return (rv);
-}
-
void
nni_msgq_drain(nni_msgq *mq, nni_time expire)
{
diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h
index fa6b845a..bbac505d 100644
--- a/src/core/msgqueue.h
+++ b/src/core/msgqueue.h
@@ -41,28 +41,11 @@ extern void nni_msgq_aio_get(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_notify_get(nni_msgq *, nni_aio *);
extern void nni_msgq_aio_notify_put(nni_msgq *, nni_aio *);
-// nni_msgq_put puts the message to the queue. It blocks until it
-// was able to do so, or the queue is closed, or a timeout is reached.
-// It returns 0 on success, NNG_ECLOSED if the queue was closed, or
-// NNG_ETIMEDOUT if the timeout is reached. If an error is returned,
-// the caller is responsible for freeing the message with nni_msg_free(),
-// otherwise the message is "owned" by the queue, and the caller is not
-// permitted to access it further.
-extern int nni_msgq_put_until(nni_msgq *, nni_msg *, nni_time);
-
// nni_msgq_tryput performs a non-blocking attempt to put a message on
// the message queue. It is the same as calling nng_msgq_put_until with
// a zero time.
extern int nni_msgq_tryput(nni_msgq *, nni_msg *);
-// nni_msgq_get_until gets a message from the queue. It blocks until a
-// message is available, the queue is closed, or time out is reached.
-// It returns 0 on success, NNG_ECLOSED if the queue was closed, or
-// NNG_ETIMEDOUT if the timeout is reached. On successful return,
-// the caller assumes ownership of the message and must call
-// nni_msg_free() when it is finished with it.
-extern int nni_msgq_get_until(nni_msgq *, nni_msg **, nni_time);
-
// nni_msgq_set_error sets an error condition on the message queue,
// which causes all current and future readers/writes to return the
// given error condition (if non-zero). Threads waiting to put or get
@@ -81,6 +64,25 @@ extern void nni_msgq_set_put_error(nni_msgq *, int);
// Readers (nni_msgq_put*) are unaffected.
extern void nni_msgq_set_get_error(nni_msgq *, int);
+// nni_msgq_set_best_effort marks the message queue best effort on send.
+// What this does is treat the message queue condition as if it were
+// successful, returning 0, and discarding the message. If zero is
+// passed then this mode is reset to normal.
+extern void nni_msgq_set_best_effort(nni_msgq *, int);
+
+// nni_msgq_filter is a callback function used to filter messages.
+// The function is called on entry (put) or exit (get). The void
+// argument is an opaque pointer supplied with the function at registration
+// time. The primary use for these functions is to support the protocol
+// socket needs.
+typedef nni_msg *(*nni_msgq_filter)(void *, nni_msg *);
+
+// nni_msgq_set_filter sets the filter on the queue. Messages
+// are filtered through this just before they are returned via the get
+// functions. If the filter returns NULL, then the message is silently
+// discarded instead, and any get waiters remain waiting.
+extern void nni_msgq_set_filter(nni_msgq *, nni_msgq_filter, void *);
+
// nni_msgq_close closes the queue. After this all operates on the
// message queue will return NNG_ECLOSED. Messages inside the queue
// are freed. Unlike closing a go channel, this operation is idempotent.
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 0c0d93ce..253452d8 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -74,15 +74,17 @@ struct nni_proto_sock_ops {
// it can signal the socket worker threads to exit.
void (*sock_close)(void *);
- // Receive filter. This may be NULL, but if it isn't, then
+ // Send a message.
+ void (*sock_send)(void *, nni_aio *);
+
+ // Receive a message.
+ void (*sock_recv)(void *, nni_aio *);
+
+ // Message filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
- // delivered to the application. To drop the message, the prtocol
+ // delivered to the application. To drop the message, the protocol
// should return NULL, otherwise the message (possibly modified).
- nni_msg *(*sock_rfilter)(void *, nni_msg *);
-
- // Send filter. This may be NULL, but if it isn't, then messages
- // here are filtered just after they come from the application.
- nni_msg *(*sock_sfilter)(void *, nni_msg *);
+ nni_msg *(*sock_filter)(void *, nni_msg *);
// Options. Must not be NULL. Final entry should have NULL name.
nni_proto_sock_option *sock_options;
diff --git a/src/core/socket.c b/src/core/socket.c
index 8895f7a7..bc1f446d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -66,12 +66,9 @@ struct nni_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
- int s_besteffort; // Best effort mode delivery
- int s_senderr; // Protocol state machine use
- int s_recverr; // Protocol state machine use
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_closed; // Socket closed, protected by global lock
nni_event s_recv_ev; // Event for readability
nni_event s_send_ev; // Event for sendability
@@ -374,15 +371,19 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
}
void
-nni_sock_lock(nni_sock *sock)
+nni_sock_send_pending(nni_sock *sock)
{
- nni_mtx_lock(&sock->s_mx);
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
+ }
}
void
-nni_sock_unlock(nni_sock *sock)
+nni_sock_recv_pending(nni_sock *sock)
{
- nni_mtx_unlock(&sock->s_mx);
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
+ }
}
static void
@@ -551,6 +552,11 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
return (rv);
}
+ if (s->s_sock_ops.sock_filter != NULL) {
+ nni_msgq_set_filter(
+ s->s_urq, s->s_sock_ops.sock_filter, s->s_data);
+ }
+
*sp = s;
return (rv);
}
@@ -779,13 +785,23 @@ nni_sock_closeall(void)
}
}
+void
+nni_sock_send(nni_sock *sock, nni_aio *aio)
+{
+ sock->s_sock_ops.sock_send(sock->s_data, aio);
+}
+
int
nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
{
int rv;
- int besteffort;
nni_time expire;
nni_time timeo = sock->s_sndtimeo;
+ nni_aio *aio;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
expire = NNI_TIME_ZERO;
@@ -795,47 +811,24 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
expire = nni_clock();
expire += timeo;
}
+ nni_aio_set_timeout(aio, expire);
+ nni_aio_set_msg(aio, msg);
- // Senderr is typically set by protocols when the state machine
- // indicates that it is no longer valid to send a message. E.g.
- // a REP socket with no REQ pending.
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if ((rv = sock->s_senderr) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
- besteffort = sock->s_besteffort;
-
- if (sock->s_sock_ops.sock_sfilter != NULL) {
- msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg);
- }
- nni_mtx_unlock(&sock->s_mx);
+ nni_sock_send(sock, aio);
+ nni_aio_wait(aio);
- if (msg == NULL) {
- return (0);
- }
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
- if (besteffort) {
- // BestEffort mode -- if we cannot handle the message due to
- // backpressure, we just throw it away, and don't complain.
- expire = NNI_TIME_ZERO;
- }
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
- }
- rv = nni_msgq_put_until(sock->s_uwq, msg, expire);
- if (besteffort && (rv == NNG_ETIMEDOUT)) {
- // Pretend this worked... it didn't, but pretend.
- nni_msg_free(msg);
- return (0);
- }
return (rv);
}
+void
+nni_sock_recv(nni_sock *sock, nni_aio *aio)
+{
+ sock->s_sock_ops.sock_recv(sock->s_data, aio);
+}
+
int
nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
{
@@ -843,6 +836,11 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
nni_msg *msg;
nni_time expire;
nni_time timeo = sock->s_rcvtimeo;
+ nni_aio *aio;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
expire = NNI_TIME_ZERO;
@@ -853,39 +851,23 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
expire += timeo;
}
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if ((rv = sock->s_recverr) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
- nni_mtx_unlock(&sock->s_mx);
-
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
- }
-
for (;;) {
- rv = nni_msgq_get_until(sock->s_urq, &msg, expire);
+ nni_aio_set_timeout(aio, expire);
+ nni_sock_recv(sock, aio);
+ nni_aio_wait(aio);
+
+ rv = nni_aio_result(aio);
if (rv != 0) {
- return (rv);
- }
- if (sock->s_sock_ops.sock_rfilter != NULL) {
- nni_mtx_lock(&sock->s_mx);
- msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg);
- nni_mtx_unlock(&sock->s_mx);
- }
- if (msg != NULL) {
break;
}
- // Protocol dropped the message; try again.
- }
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
- *msgp = msg;
- return (0);
+ *msgp = msg;
+ break;
+ }
+ nni_aio_fini(aio);
+ return (rv);
}
// nni_sock_protocol returns the socket's 16-bit protocol number.
@@ -949,18 +931,6 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
nni_mtx_unlock(&sock->s_mx);
}
-void
-nni_sock_recverr(nni_sock *sock, int err)
-{
- sock->s_recverr = err;
-}
-
-void
-nni_sock_senderr(nni_sock *sock, int err)
-{
- sock->s_senderr = err;
-}
-
int
nni_sock_setopt(nni_sock *s, const char *name, const void *val, size_t size)
{
diff --git a/src/core/socket.h b/src/core/socket.h
index 850c4641..edfe6447 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -22,15 +22,14 @@ extern void nni_sock_closeall(void);
extern int nni_sock_shutdown(nni_sock *);
extern uint16_t nni_sock_proto(nni_sock *);
extern uint16_t nni_sock_peer(nni_sock *);
-extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t);
-extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *);
-extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
-extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int);
+extern int nni_sock_setopt(nni_sock *, const char *, const void *, size_t);
+extern int nni_sock_getopt(nni_sock *, const char *, void *, size_t *);
+extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
+extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int);
+extern void nni_sock_send(nni_sock *, nni_aio *);
+extern void nni_sock_recv(nni_sock *, nni_aio *);
extern uint32_t nni_sock_id(nni_sock *);
-extern void nni_sock_lock(nni_sock *);
-extern void nni_sock_unlock(nni_sock *);
-
extern nni_notify *nni_sock_notify(nni_sock *, int, nng_notify_func, void *);
extern void nni_sock_unnotify(nni_sock *, nni_notify *);
@@ -46,16 +45,20 @@ extern int nni_sock_pipe_start(nni_sock *, nni_pipe *p);
extern int nni_sock_ep_add(nni_sock *, nni_ep *);
extern void nni_sock_ep_remove(nni_sock *, nni_ep *);
-// Set error codes for applications. These are only ever
-// called from the filter functions in protocols, and thus
-// already have the socket lock held.
-extern void nni_sock_recverr(nni_sock *, int);
-extern void nni_sock_senderr(nni_sock *, int);
-
// These are socket methods that protocol operations can expect to call.
// Note that each of these should be called without any locks held, since
// the socket can reenter the protocol.
+// nni_sock_send_pending is called by the protocol when it enqueues
+// a send operation. The main purpose of this is to clear the raised
+// signal raised on the event descriptor.
+extern void nni_sock_send_pending(nni_sock *);
+
+// nni_sock_recv_pending is called by the protocl when it enqueues
+// a receive operation. The main purpose of this is to clear the raised
+// signal raised on the event descriptor.
+extern void nni_sock_recv_pending(nni_sock *);
+
// nni_socket_sendq obtains the upper writeq. The protocol should
// recieve messages from this, and place them on the appropriate pipe.
extern nni_msgq *nni_sock_sendq(nni_sock *);
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index cad21989..046aa19a 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -22,6 +22,9 @@ typedef struct bus_pipe bus_pipe;
typedef struct bus_sock bus_sock;
static void bus_sock_getq(bus_sock *);
+static void bus_sock_send(void *, nni_aio *);
+static void bus_sock_recv(void *, nni_aio *);
+
static void bus_pipe_getq(bus_pipe *);
static void bus_pipe_send(bus_pipe *);
static void bus_pipe_recv(bus_pipe *);
@@ -39,6 +42,8 @@ struct bus_sock {
nni_aio * aio_getq;
nni_list pipes;
nni_mtx mtx;
+ nni_msgq *uwq;
+ nni_msgq *urq;
};
// A bus_pipe is our per-pipe protocol private structure.
@@ -82,6 +87,8 @@ bus_sock_init(void **sp, nni_sock *nsock)
}
s->nsock = nsock;
s->raw = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
*sp = s;
return (0);
@@ -259,7 +266,6 @@ bus_sock_getq_cb(void *arg)
bus_sock *s = arg;
bus_pipe *p;
bus_pipe *lastp;
- nni_msgq *uwq = nni_sock_sendq(s->nsock);
nni_msg * msg;
nni_msg * dup;
uint32_t sender;
@@ -338,6 +344,24 @@ bus_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+bus_sock_send(void *arg, nni_aio *aio)
+{
+ bus_sock *s = arg;
+
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+bus_sock_recv(void *arg, nni_aio *aio)
+{
+ bus_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops bus_pipe_ops = {
.pipe_init = bus_pipe_init,
.pipe_fini = bus_pipe_fini,
@@ -360,6 +384,8 @@ static nni_proto_sock_ops bus_sock_ops = {
.sock_fini = bus_sock_fini,
.sock_open = bus_sock_open,
.sock_close = bus_sock_close,
+ .sock_send = bus_sock_send,
+ .sock_recv = bus_sock_recv,
.sock_options = bus_sock_options,
};
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index a87af593..cca03cc8 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -243,6 +243,24 @@ pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+pair0_sock_send(void *arg, nni_aio *aio)
+{
+ pair0_sock *s = arg;
+
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+pair0_sock_recv(void *arg, nni_aio *aio)
+{
+ pair0_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops pair0_pipe_ops = {
.pipe_init = pair0_pipe_init,
.pipe_fini = pair0_pipe_fini,
@@ -265,6 +283,8 @@ static nni_proto_sock_ops pair0_sock_ops = {
.sock_fini = pair0_sock_fini,
.sock_open = pair0_sock_open,
.sock_close = pair0_sock_close,
+ .sock_send = pair0_sock_send,
+ .sock_recv = pair0_sock_recv,
.sock_options = pair0_sock_options,
};
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index 6c9d4bd3..f43a4785 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -446,6 +446,24 @@ pair1_sock_getopt_poly(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->poly, buf, szp));
}
+static void
+pair1_sock_send(void *arg, nni_aio *aio)
+{
+ pair1_sock *s = arg;
+
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+pair1_sock_recv(void *arg, nni_aio *aio)
+{
+ pair1_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops pair1_pipe_ops = {
.pipe_init = pair1_pipe_init,
.pipe_fini = pair1_pipe_fini,
@@ -478,6 +496,8 @@ static nni_proto_sock_ops pair1_sock_ops = {
.sock_fini = pair1_sock_fini,
.sock_open = pair1_sock_open,
.sock_close = pair1_sock_close,
+ .sock_recv = pair1_sock_recv,
+ .sock_send = pair1_sock_send,
.sock_options = pair1_sock_options,
};
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 80db2425..7dd0c8ed 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -26,6 +26,7 @@ static void pull_putq(pull_pipe *, nni_msg *);
struct pull_sock {
nni_msgq *urq;
int raw;
+ nni_sock *sock;
};
// A pull_pipe is our per-pipe protocol private structure.
@@ -44,10 +45,11 @@ pull_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = 0;
- s->urq = nni_sock_recvq(sock);
- *sp = s;
- nni_sock_senderr(sock, NNG_ENOTSUP);
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
+ s->sock = sock;
+
+ *sp = s;
return (0);
}
@@ -185,6 +187,21 @@ pull_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+pull_sock_send(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+pull_sock_recv(void *arg, nni_aio *aio)
+{
+ pull_sock *s = arg;
+
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops pull_pipe_ops = {
.pipe_init = pull_pipe_init,
.pipe_fini = pull_pipe_fini,
@@ -207,6 +224,8 @@ static nni_proto_sock_ops pull_sock_ops = {
.sock_fini = pull_sock_fini,
.sock_open = pull_sock_open,
.sock_close = pull_sock_close,
+ .sock_send = pull_sock_send,
+ .sock_recv = pull_sock_recv,
.sock_options = pull_sock_options,
};
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index 77db7fcb..af7b80ca 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -54,7 +54,6 @@ push_sock_init(void **sp, nni_sock *sock)
s->sock = sock;
s->uwq = nni_sock_sendq(sock);
*sp = s;
- nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
@@ -205,6 +204,21 @@ push_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+push_sock_send(void *arg, nni_aio *aio)
+{
+ push_sock *s = arg;
+
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+push_sock_recv(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
static nni_proto_pipe_ops push_pipe_ops = {
.pipe_init = push_pipe_init,
.pipe_fini = push_pipe_fini,
@@ -228,6 +242,8 @@ static nni_proto_sock_ops push_sock_ops = {
.sock_open = push_sock_open,
.sock_close = push_sock_close,
.sock_options = push_sock_options,
+ .sock_send = push_sock_send,
+ .sock_recv = push_sock_recv,
};
static nni_proto push_proto = {
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 03f4603a..4604d0ff 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -82,7 +82,6 @@ pub_sock_init(void **sp, nni_sock *sock)
s->uwq = nni_sock_sendq(sock);
*sp = s;
- nni_sock_recverr(sock, NNG_ENOTSUP);
return (0);
}
@@ -281,6 +280,21 @@ pub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+pub_sock_recv(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+pub_sock_send(void *arg, nni_aio *aio)
+{
+ pub_sock *s = arg;
+
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
static nni_proto_pipe_ops pub_pipe_ops = {
.pipe_init = pub_pipe_init,
.pipe_fini = pub_pipe_fini,
@@ -303,6 +317,8 @@ static nni_proto_sock_ops pub_sock_ops = {
.sock_fini = pub_sock_fini,
.sock_open = pub_sock_open,
.sock_close = pub_sock_close,
+ .sock_send = pub_sock_send,
+ .sock_recv = pub_sock_recv,
.sock_options = pub_sock_options,
};
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 8b2ed209..323bbb2f 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -40,6 +40,7 @@ struct sub_sock {
nni_list topics;
nni_msgq *urq;
int raw;
+ nni_mtx lk;
};
// An nni_rep_pipe is our per-pipe protocol private structure.
@@ -58,13 +59,13 @@ sub_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&s->lk);
NNI_LIST_INIT(&s->topics, sub_topic, node);
s->sock = sock;
s->raw = 0;
s->urq = nni_sock_recvq(sock);
- nni_sock_senderr(sock, NNG_ENOTSUP);
- *sp = s;
+ *sp = s;
return (0);
}
@@ -79,6 +80,7 @@ sub_sock_fini(void *arg)
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
}
+ nni_mtx_fini(&s->lk);
NNI_FREE_STRUCT(s);
}
@@ -190,6 +192,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
sub_topic *topic;
sub_topic *newtopic;
+ nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
int rv;
@@ -201,6 +204,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
if (rv == 0) {
if (topic->len == sz) {
// Already inserted.
+ nni_mtx_unlock(&s->lk);
return (0);
}
if (topic->len > sz) {
@@ -212,9 +216,11 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
}
if ((newtopic = NNI_ALLOC_STRUCT(newtopic)) == NULL) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOMEM);
}
if ((newtopic->buf = nni_alloc(sz)) == NULL) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOMEM);
}
NNI_LIST_NODE_INIT(&newtopic->node);
@@ -225,6 +231,7 @@ sub_subscribe(void *arg, const void *buf, size_t sz)
} else {
nni_list_append(&s->topics, newtopic);
}
+ nni_mtx_unlock(&s->lk);
return (0);
}
@@ -235,6 +242,7 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
sub_topic *topic;
int rv;
+ nni_mtx_lock(&s->lk);
NNI_LIST_FOREACH (&s->topics, topic) {
if (topic->len >= sz) {
rv = memcmp(topic->buf, buf, sz);
@@ -244,18 +252,22 @@ sub_unsubscribe(void *arg, const void *buf, size_t sz)
if (rv == 0) {
if (topic->len == sz) {
nni_list_remove(&s->topics, topic);
+ nni_mtx_unlock(&s->lk);
nni_free(topic->buf, topic->len);
NNI_FREE_STRUCT(topic);
return (0);
}
if (topic->len > sz) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOENT);
}
}
if (rv > 0) {
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOENT);
}
}
+ nni_mtx_unlock(&s->lk);
return (NNG_ENOENT);
}
@@ -273,8 +285,23 @@ sub_sock_getopt_raw(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->raw, buf, szp));
}
+static void
+sub_sock_send(void *arg, nni_aio *aio)
+{
+ nni_aio_finish_error(aio, NNG_ENOTSUP);
+}
+
+static void
+sub_sock_recv(void *arg, nni_aio *aio)
+{
+ sub_sock *s = arg;
+
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_msg *
-sub_sock_rfilter(void *arg, nni_msg *msg)
+sub_sock_filter(void *arg, nni_msg *msg)
{
sub_sock * s = arg;
sub_topic *topic;
@@ -282,7 +309,9 @@ sub_sock_rfilter(void *arg, nni_msg *msg)
size_t len;
int match;
+ nni_mtx_lock(&s->lk);
if (s->raw) {
+ nni_mtx_unlock(&s->lk);
return (msg);
}
@@ -308,6 +337,7 @@ sub_sock_rfilter(void *arg, nni_msg *msg)
break;
}
}
+ nni_mtx_unlock(&s->lk);
if (!match) {
nni_msg_free(msg);
return (NULL);
@@ -349,8 +379,10 @@ static nni_proto_sock_ops sub_sock_ops = {
.sock_fini = sub_sock_fini,
.sock_open = sub_sock_open,
.sock_close = sub_sock_close,
+ .sock_send = sub_sock_send,
+ .sock_recv = sub_sock_recv,
+ .sock_filter = sub_sock_filter,
.sock_options = sub_sock_options,
- .sock_rfilter = sub_sock_rfilter,
};
static nni_proto sub_proto = {
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index cd9411d9..510beab3 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -32,6 +32,7 @@ struct rep_sock {
nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
+ nni_mtx lk;
int raw;
int ttl;
nni_idhash *pipes;
@@ -62,6 +63,7 @@ rep_sock_fini(void *arg)
if (s->btrace != NULL) {
nni_free(s->btrace, s->btrace_len);
}
+ nni_mtx_fini(&s->lk);
NNI_FREE_STRUCT(s);
}
@@ -74,6 +76,7 @@ rep_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&s->lk);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, rep_sock_getq_cb, s)) != 0)) {
rep_sock_fini(s);
@@ -89,7 +92,6 @@ rep_sock_init(void **sp, nni_sock *sock)
s->urq = nni_sock_recvq(sock);
*sp = s;
- nni_sock_senderr(sock, NNG_ESTATE);
return (0);
}
@@ -215,6 +217,7 @@ rep_sock_getq_cb(void *arg)
// Look for the pipe, and attempt to put the message there
// (nonblocking) if we can. If we can't for any reason, then we
// free the message.
+ // XXX: LOCKING?!?!
if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) {
rv = nni_msgq_tryput(p->sendq, msg);
}
@@ -347,10 +350,10 @@ rep_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
rep_sock *s = arg;
int rv;
+
+ nni_mtx_lock(&s->lk);
rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
- if (rv == 0) {
- nni_sock_senderr(s->sock, s->raw ? 0 : NNG_ESTATE);
- }
+ nni_mtx_unlock(&s->lk);
return (rv);
}
@@ -376,53 +379,18 @@ rep_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
}
static nni_msg *
-rep_sock_sfilter(void *arg, nni_msg *msg)
-{
- rep_sock *s = arg;
-
- if (s->raw) {
- return (msg);
- }
-
- // Cannot send again until a receive is done...
- nni_sock_senderr(s->sock, NNG_ESTATE);
-
- // If we have a stored backtrace, append it to the header...
- // if we don't have a backtrace, discard the message.
- if (s->btrace == NULL) {
- nni_msg_free(msg);
- return (NULL);
- }
-
- // drop anything else in the header...
- nni_msg_header_clear(msg);
-
- if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- nni_msg_free(msg);
- return (NULL);
- }
-
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- return (msg);
-}
-
-static nni_msg *
-rep_sock_rfilter(void *arg, nni_msg *msg)
+rep_sock_filter(void *arg, nni_msg *msg)
{
rep_sock *s = arg;
char * header;
size_t len;
+ nni_mtx_lock(&s->lk);
if (s->raw) {
+ nni_mtx_unlock(&s->lk);
return (msg);
}
- nni_sock_senderr(s->sock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (s->btrace != NULL) {
@@ -437,9 +405,61 @@ rep_sock_rfilter(void *arg, nni_msg *msg)
s->btrace_len = len;
memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
+ nni_mtx_unlock(&s->lk);
return (msg);
}
+static void
+rep_sock_send(void *arg, nni_aio *aio)
+{
+ rep_sock *s = arg;
+ int rv;
+ nni_msg * msg;
+
+ nni_mtx_lock(&s->lk);
+ if (s->raw) {
+ // Pass thru
+ nni_mtx_unlock(&s->lk);
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
+ }
+ if (s->btrace == NULL) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+
+ msg = nni_aio_get_msg(aio);
+
+ // drop anything else in the header... (it should already be
+ // empty, but there can be stale backtrace info there.)
+ nni_msg_header_clear(msg);
+
+ if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) {
+ nni_mtx_unlock(&s->lk);
+ nni_aio_finish_error(aio, rv);
+ return;
+ }
+
+ nni_free(s->btrace, s->btrace_len);
+ s->btrace = NULL;
+ s->btrace_len = 0;
+
+ nni_mtx_unlock(&s->lk);
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+}
+
+static void
+rep_sock_recv(void *arg, nni_aio *aio)
+{
+ rep_sock *s = arg;
+
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
static nni_proto_pipe_ops rep_pipe_ops = {
@@ -470,8 +490,9 @@ static nni_proto_sock_ops rep_sock_ops = {
.sock_open = rep_sock_open,
.sock_close = rep_sock_close,
.sock_options = rep_sock_options,
- .sock_rfilter = rep_sock_rfilter,
- .sock_sfilter = rep_sock_sfilter,
+ .sock_filter = rep_sock_filter,
+ .sock_send = rep_sock_send,
+ .sock_recv = rep_sock_recv,
};
static nni_proto nni_rep_proto = {
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 24d01df2..81abd306 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -99,7 +99,7 @@ req_sock_init(void **sp, nni_sock *sock)
s->uwq = nni_sock_sendq(sock);
s->urq = nni_sock_recvq(sock);
*sp = s;
- nni_sock_recverr(sock, NNG_ESTATE);
+
return (0);
}
@@ -249,12 +249,7 @@ static int
req_sock_setopt_raw(void *arg, const void *buf, size_t sz)
{
req_sock *s = arg;
- int rv;
- rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
- if (rv == 0) {
- nni_sock_recverr(s->sock, s->raw ? 0 : NNG_ESTATE);
- }
- return (rv);
+ return (nni_setopt_int(&s->raw, buf, sz, 0, 1));
}
static int
@@ -505,35 +500,46 @@ req_resend(req_sock *s)
}
}
-static nni_msg *
-req_sock_sfilter(void *arg, nni_msg *msg)
+static void
+req_sock_send(void *arg, nni_aio *aio)
{
req_sock *s = arg;
uint32_t id;
+ size_t len;
+ nni_msg * msg;
+ int rv;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
- // No automatic retry, and the request ID must
- // be in the header coming down.
- return (msg);
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->sock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
}
+ msg = nni_aio_get_msg(aio);
+ len = nni_msg_len(msg);
+
+ // In cooked mode, because we need to manage our own resend logic,
+ // we bypass the upper writeq entirely.
+
// Generate a new request ID. We always set the high
// order bit so that the peer can locate the end of the
// backtrace. (Pipe IDs have the high order bit clear.)
id = (s->nextid++) | 0x80000000u;
-
// Request ID is in big endian format.
NNI_PUT32(s->reqid, id);
- if (nni_msg_header_append(msg, s->reqid, 4) != 0) {
- // Should be ENOMEM.
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
- // NB: The socket lock is also held, so this is always self-serialized.
- // But we have to serialize against other async callbacks.
- nni_mtx_lock(&s->mtx);
+ // XXX: I think we should just not do this... and leave the
+ // socket "permanently writeable". This does screw up all the
+ // backpressure.
+ // nni_sock_send_pending(s->sock);
// If another message is there, this cancels it.
if (s->reqmsg != NULL) {
@@ -541,6 +547,8 @@ req_sock_sfilter(void *arg, nni_msg *msg)
s->reqmsg = NULL;
}
+ nni_aio_set_msg(aio, NULL);
+
// Make a duplicate message... for retries.
s->reqmsg = msg;
// Schedule for immediate send
@@ -548,40 +556,41 @@ req_sock_sfilter(void *arg, nni_msg *msg)
s->wantw = 1;
req_resend(s);
- nni_mtx_unlock(&s->mtx);
- // Clear the error condition.
- nni_sock_recverr(s->sock, 0);
+ nni_mtx_unlock(&s->mtx);
- return (NULL);
+ nni_aio_finish(aio, 0, len);
}
static nni_msg *
-req_sock_rfilter(void *arg, nni_msg *msg)
+req_sock_filter(void *arg, nni_msg *msg)
{
req_sock *s = arg;
nni_msg * rmsg;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
// Pass it unmolested
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
if (nni_msg_header_len(msg) < 4) {
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
- nni_mtx_lock(&s->mtx);
-
if ((rmsg = s->reqmsg) == NULL) {
- // We had no outstanding request.
+ // We had no outstanding request. (Perhaps canceled,
+ // or duplicate response.)
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
+
if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) {
- // Wrong request id
+ // Wrong request id.
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
@@ -591,12 +600,29 @@ req_sock_rfilter(void *arg, nni_msg *msg)
s->pendpipe = NULL;
nni_mtx_unlock(&s->mtx);
- nni_sock_recverr(s->sock, NNG_ESTATE);
nni_msg_free(rmsg);
return (msg);
}
+static void
+req_sock_recv(void *arg, nni_aio *aio)
+{
+ req_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (!s->raw) {
+ if (s->reqmsg == NULL) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_recv_pending(s->sock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops req_pipe_ops = {
.pipe_init = req_pipe_init,
.pipe_fini = req_pipe_fini,
@@ -630,8 +656,9 @@ static nni_proto_sock_ops req_sock_ops = {
.sock_open = req_sock_open,
.sock_close = req_sock_close,
.sock_options = req_sock_options,
- .sock_rfilter = req_sock_rfilter,
- .sock_sfilter = req_sock_sfilter,
+ .sock_filter = req_sock_filter,
+ .sock_send = req_sock_send,
+ .sock_recv = req_sock_recv,
};
static nni_proto req_proto = {
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index fcb067b0..4c3ea8e3 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -77,6 +77,7 @@ resp_sock_init(void **sp, nni_sock *nsock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ nni_mtx_init(&s->mtx);
if (((rv = nni_idhash_init(&s->pipes)) != 0) ||
((rv = nni_aio_init(&s->aio_getq, resp_sock_getq_cb, s)) != 0)) {
resp_sock_fini(s);
@@ -91,10 +92,7 @@ resp_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
s->uwq = nni_sock_sendq(nsock);
- nni_mtx_init(&s->mtx);
-
*sp = s;
- nni_sock_senderr(nsock, NNG_ESTATE);
return (0);
}
@@ -349,9 +347,9 @@ resp_sock_setopt_raw(void *arg, const void *buf, size_t sz)
resp_sock *s = arg;
int rv;
- if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
- nni_sock_senderr(s->nsock, s->raw ? 0 : NNG_ESTATE);
- }
+ nni_mtx_lock(&s->mtx);
+ rv = nni_setopt_int(&s->raw, buf, sz, 0, 1);
+ nni_mtx_unlock(&s->mtx);
return (rv);
}
@@ -376,54 +374,62 @@ resp_sock_getopt_maxttl(void *arg, void *buf, size_t *szp)
return (nni_getopt_int(s->ttl, buf, szp));
}
-static nni_msg *
-resp_sock_sfilter(void *arg, nni_msg *msg)
+static void
+resp_sock_send(void *arg, nni_aio *aio)
{
resp_sock *s = arg;
+ nni_msg * msg;
+ int rv;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
- return (msg);
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
}
- // Cannot send again until a receive is done...
- nni_sock_senderr(s->nsock, NNG_ESTATE);
+ msg = nni_aio_get_msg(aio);
// If we have a stored backtrace, append it to the header...
// if we don't have a backtrace, discard the message.
if (s->btrace == NULL) {
- nni_msg_free(msg);
- return (NULL);
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
}
// drop anything else in the header...
nni_msg_header_clear(msg);
- if (nni_msg_header_append(msg, s->btrace, s->btrace_len) != 0) {
- nni_free(s->btrace, s->btrace_len);
- s->btrace = NULL;
- s->btrace_len = 0;
- nni_msg_free(msg);
- return (NULL);
+ if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
nni_free(s->btrace, s->btrace_len);
s->btrace = NULL;
s->btrace_len = 0;
- return (msg);
+
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
}
static nni_msg *
-resp_sock_rfilter(void *arg, nni_msg *msg)
+resp_sock_filter(void *arg, nni_msg *msg)
{
resp_sock *s = arg;
char * header;
size_t len;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
- nni_sock_senderr(s->nsock, 0);
len = nni_msg_header_len(msg);
header = nni_msg_header(msg);
if (s->btrace != NULL) {
@@ -432,15 +438,26 @@ resp_sock_rfilter(void *arg, nni_msg *msg)
s->btrace_len = 0;
}
if ((s->btrace = nni_alloc(len)) == NULL) {
+ nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
return (NULL);
}
s->btrace_len = len;
memcpy(s->btrace, header, len);
nni_msg_header_clear(msg);
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
+static void
+resp_sock_recv(void *arg, nni_aio *aio)
+{
+ resp_sock *s = arg;
+
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
static nni_proto_pipe_ops resp_pipe_ops = {
.pipe_init = resp_pipe_init,
.pipe_fini = resp_pipe_fini,
@@ -468,9 +485,10 @@ static nni_proto_sock_ops resp_sock_ops = {
.sock_fini = resp_sock_fini,
.sock_open = resp_sock_open,
.sock_close = resp_sock_close,
+ .sock_filter = resp_sock_filter,
+ .sock_send = resp_sock_send,
+ .sock_recv = resp_sock_recv,
.sock_options = resp_sock_options,
- .sock_rfilter = resp_sock_rfilter,
- .sock_sfilter = resp_sock_sfilter,
};
static nni_proto resp_proto = {
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 1205402e..1c15054f 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -92,7 +92,6 @@ surv_sock_init(void **sp, nni_sock *nsock)
s->urq = nni_sock_recvq(nsock);
*sp = s;
- nni_sock_recverr(nsock, NNG_ESTATE);
return (0);
}
@@ -273,11 +272,12 @@ surv_sock_setopt_raw(void *arg, const void *buf, size_t sz)
surv_sock *s = arg;
int rv;
+ nni_mtx_lock(&s->mtx);
if ((rv = nni_setopt_int(&s->raw, buf, sz, 0, 1)) == 0) {
- nni_sock_recverr(s->nsock, s->raw ? 0 : NNG_ESTATE);
s->survid = 0;
nni_timer_cancel(&s->timer);
}
+ nni_mtx_unlock(&s->mtx);
return (rv);
}
@@ -344,22 +344,43 @@ surv_timeout(void *arg)
{
surv_sock *s = arg;
- nni_sock_lock(s->nsock);
+ nni_mtx_lock(&s->mtx);
s->survid = 0;
- nni_sock_recverr(s->nsock, NNG_ESTATE);
+ nni_mtx_unlock(&s->mtx);
nni_msgq_set_get_error(s->urq, NNG_ETIMEDOUT);
- nni_sock_unlock(s->nsock);
}
-static nni_msg *
-surv_sock_sfilter(void *arg, nni_msg *msg)
+static void
+surv_sock_recv(void *arg, nni_aio *aio)
+{
+ surv_sock *s = arg;
+
+ nni_mtx_lock(&s->mtx);
+ if (s->survid == 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, NNG_ESTATE);
+ return;
+ }
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_recv_pending(s->nsock);
+ nni_msgq_aio_get(s->urq, aio);
+}
+
+static void
+surv_sock_send(void *arg, nni_aio *aio)
{
surv_sock *s = arg;
+ nni_msg * msg;
+ int rv;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
// No automatic retry, and the request ID must
// be in the header coming down.
- return (msg);
+ nni_mtx_unlock(&s->mtx);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
+ return;
}
// Generate a new request ID. We always set the high
@@ -367,10 +388,12 @@ surv_sock_sfilter(void *arg, nni_msg *msg)
// backtrace. (Pipe IDs have the high order bit clear.)
s->survid = (s->nextid++) | 0x80000000u;
- if (nni_msg_header_append_u32(msg, s->survid) != 0) {
- // Should be ENOMEM.
- nni_msg_free(msg);
- return (NULL);
+ msg = nni_aio_get_msg(aio);
+ nni_msg_header_clear(msg);
+ if ((rv = nni_msg_header_append_u32(msg, s->survid)) != 0) {
+ nni_mtx_unlock(&s->mtx);
+ nni_aio_finish_error(aio, rv);
+ return;
}
// If another message is there, this cancels it. We move the
@@ -379,20 +402,21 @@ surv_sock_sfilter(void *arg, nni_msg *msg)
s->expire = nni_clock() + s->survtime;
nni_timer_schedule(&s->timer, s->expire);
- // Clear the error condition.
- nni_sock_recverr(s->nsock, 0);
- // nni_msgq_set_get_error(nni_sock_recvq(psock->nsock), 0);
+ nni_mtx_unlock(&s->mtx);
- return (msg);
+ nni_sock_send_pending(s->nsock);
+ nni_msgq_aio_put(s->uwq, aio);
}
static nni_msg *
-surv_sock_rfilter(void *arg, nni_msg *msg)
+surv_sock_filter(void *arg, nni_msg *msg)
{
surv_sock *s = arg;
+ nni_mtx_lock(&s->mtx);
if (s->raw) {
// Pass it unmolested
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
@@ -402,6 +426,7 @@ surv_sock_rfilter(void *arg, nni_msg *msg)
nni_msg_free(msg);
return (NULL);
}
+ nni_mtx_unlock(&s->mtx);
return (msg);
}
@@ -433,9 +458,10 @@ static nni_proto_sock_ops surv_sock_ops = {
.sock_fini = surv_sock_fini,
.sock_open = surv_sock_open,
.sock_close = surv_sock_close,
+ .sock_send = surv_sock_send,
+ .sock_recv = surv_sock_recv,
+ .sock_filter = surv_sock_filter,
.sock_options = surv_sock_options,
- .sock_rfilter = surv_sock_rfilter,
- .sock_sfilter = surv_sock_sfilter,
};
static nni_proto surv_proto = {