aboutsummaryrefslogtreecommitdiff
path: root/src/protocol
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol')
-rw-r--r--src/protocol/bus/bus.c14
-rw-r--r--src/protocol/pair/pair_v0.c4
-rw-r--r--src/protocol/pair/pair_v1.c16
-rw-r--r--src/protocol/pipeline/pull.c7
-rw-r--r--src/protocol/pipeline/push.c9
-rw-r--r--src/protocol/pubsub/pub.c5
-rw-r--r--src/protocol/pubsub/sub.c5
-rw-r--r--src/protocol/reqrep/rep.c5
-rw-r--r--src/protocol/reqrep/req.c9
-rw-r--r--src/protocol/survey/respond.c13
-rw-r--r--src/protocol/survey/survey.c5
11 files changed, 21 insertions, 71 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c
index 046aa19a..07f91602 100644
--- a/src/protocol/bus/bus.c
+++ b/src/protocol/bus/bus.c
@@ -37,7 +37,6 @@ static void bus_pipe_putq_cb(void *);
// A bus_sock is our per-socket protocol private structure.
struct bus_sock {
- nni_sock *nsock;
int raw;
nni_aio * aio_getq;
nni_list pipes;
@@ -85,10 +84,9 @@ bus_sock_init(void **sp, nni_sock *nsock)
bus_sock_fini(s);
return (rv);
}
- s->nsock = nsock;
- s->raw = 0;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
+ s->raw = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
*sp = s;
return (0);
@@ -241,7 +239,7 @@ bus_pipe_recv_cb(void *arg)
nni_aio_set_msg(p->aio_putq, msg);
nni_aio_set_msg(p->aio_recv, NULL);
- nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq);
+ nni_msgq_aio_put(s->urq, p->aio_putq);
}
static void
@@ -315,7 +313,7 @@ bus_sock_getq_cb(void *arg)
static void
bus_sock_getq(bus_sock *s)
{
- nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq);
+ nni_msgq_aio_get(s->uwq, s->aio_getq);
}
static void
@@ -349,7 +347,6 @@ bus_sock_send(void *arg, nni_aio *aio)
{
bus_sock *s = arg;
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -358,7 +355,6 @@ bus_sock_recv(void *arg, nni_aio *aio)
{
bus_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c
index cca03cc8..29f3b59c 100644
--- a/src/protocol/pair/pair_v0.c
+++ b/src/protocol/pair/pair_v0.c
@@ -28,7 +28,6 @@ static void pair0_pipe_fini(void *);
// pair0_sock is our per-socket protocol private structure.
struct pair0_sock {
- nni_sock * nsock;
pair0_pipe *ppipe;
nni_msgq * uwq;
nni_msgq * urq;
@@ -58,7 +57,6 @@ pair0_sock_init(void **sp, nni_sock *nsock)
return (NNG_ENOMEM);
}
nni_mtx_init(&s->mtx);
- s->nsock = nsock;
s->ppipe = NULL;
s->raw = 0;
s->uwq = nni_sock_sendq(nsock);
@@ -248,7 +246,6 @@ pair0_sock_send(void *arg, nni_aio *aio)
{
pair0_sock *s = arg;
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -257,7 +254,6 @@ pair0_sock_recv(void *arg, nni_aio *aio)
{
pair0_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index f43a4785..86ee97eb 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -29,7 +29,6 @@ static void pair1_pipe_fini(void *);
// pair1_sock is our per-socket protocol private structure.
struct pair1_sock {
- nni_sock * nsock;
nni_msgq * uwq;
nni_msgq * urq;
int raw;
@@ -89,13 +88,12 @@ pair1_sock_init(void **sp, nni_sock *nsock)
return (rv);
}
- s->nsock = nsock;
- s->raw = 0;
- s->poly = 0;
- s->uwq = nni_sock_sendq(nsock);
- s->urq = nni_sock_recvq(nsock);
- s->ttl = 8;
- *sp = s;
+ s->raw = 0;
+ s->poly = 0;
+ s->uwq = nni_sock_sendq(nsock);
+ s->urq = nni_sock_recvq(nsock);
+ s->ttl = 8;
+ *sp = s;
return (0);
}
@@ -451,7 +449,6 @@ pair1_sock_send(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -460,7 +457,6 @@ pair1_sock_recv(void *arg, nni_aio *aio)
{
pair1_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c
index 7dd0c8ed..267352c5 100644
--- a/src/protocol/pipeline/pull.c
+++ b/src/protocol/pipeline/pull.c
@@ -26,7 +26,6 @@ static void pull_putq(pull_pipe *, nni_msg *);
struct pull_sock {
nni_msgq *urq;
int raw;
- nni_sock *sock;
};
// A pull_pipe is our per-pipe protocol private structure.
@@ -45,9 +44,8 @@ pull_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = 0;
- s->urq = nni_sock_recvq(sock);
- s->sock = sock;
+ s->raw = 0;
+ s->urq = nni_sock_recvq(sock);
*sp = s;
return (0);
@@ -198,7 +196,6 @@ pull_sock_recv(void *arg, nni_aio *aio)
{
pull_sock *s = arg;
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index af7b80ca..995ac56d 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -28,7 +28,6 @@ static void push_getq_cb(void *);
struct push_sock {
nni_msgq *uwq;
int raw;
- nni_sock *sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
@@ -50,10 +49,9 @@ push_sock_init(void **sp, nni_sock *sock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
- s->raw = 0;
- s->sock = sock;
- s->uwq = nni_sock_sendq(sock);
- *sp = s;
+ s->raw = 0;
+ s->uwq = nni_sock_sendq(sock);
+ *sp = s;
return (0);
}
@@ -209,7 +207,6 @@ push_sock_send(void *arg, nni_aio *aio)
{
push_sock *s = arg;
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
}
diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c
index 4604d0ff..29863322 100644
--- a/src/protocol/pubsub/pub.c
+++ b/src/protocol/pubsub/pub.c
@@ -30,7 +30,6 @@ static void pub_pipe_fini(void *);
// A pub_sock is our per-socket protocol private structure.
struct pub_sock {
- nni_sock *sock;
nni_msgq *uwq;
int raw;
nni_aio * aio_getq;
@@ -75,8 +74,7 @@ pub_sock_init(void **sp, nni_sock *sock)
return (rv);
}
- s->sock = sock;
- s->raw = 0;
+ s->raw = 0;
NNI_LIST_INIT(&s->pipes, pub_pipe, node);
s->uwq = nni_sock_sendq(sock);
@@ -291,7 +289,6 @@ pub_sock_send(void *arg, nni_aio *aio)
{
pub_sock *s = arg;
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
}
diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c
index 323bbb2f..d87b42ec 100644
--- a/src/protocol/pubsub/sub.c
+++ b/src/protocol/pubsub/sub.c
@@ -36,7 +36,6 @@ struct sub_topic {
// An nni_rep_sock is our per-socket protocol private structure.
struct sub_sock {
- nni_sock *sock;
nni_list topics;
nni_msgq *urq;
int raw;
@@ -61,8 +60,7 @@ sub_sock_init(void **sp, nni_sock *sock)
}
nni_mtx_init(&s->lk);
NNI_LIST_INIT(&s->topics, sub_topic, node);
- s->sock = sock;
- s->raw = 0;
+ s->raw = 0;
s->urq = nni_sock_recvq(sock);
*sp = s;
@@ -296,7 +294,6 @@ sub_sock_recv(void *arg, nni_aio *aio)
{
sub_sock *s = arg;
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c
index 510beab3..5d924e32 100644
--- a/src/protocol/reqrep/rep.c
+++ b/src/protocol/reqrep/rep.c
@@ -29,7 +29,6 @@ static void rep_pipe_fini(void *);
// A rep_sock is our per-socket protocol private structure.
struct rep_sock {
- nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
nni_mtx lk;
@@ -84,7 +83,6 @@ rep_sock_init(void **sp, nni_sock *sock)
}
s->ttl = 8; // Per RFC
- s->sock = sock;
s->raw = 0;
s->btrace = NULL;
s->btrace_len = 0;
@@ -420,7 +418,6 @@ rep_sock_send(void *arg, nni_aio *aio)
if (s->raw) {
// Pass thru
nni_mtx_unlock(&s->lk);
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -447,7 +444,6 @@ rep_sock_send(void *arg, nni_aio *aio)
s->btrace_len = 0;
nni_mtx_unlock(&s->lk);
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -456,7 +452,6 @@ rep_sock_recv(void *arg, nni_aio *aio)
{
rep_sock *s = arg;
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c
index 81abd306..1ab49f3e 100644
--- a/src/protocol/reqrep/req.c
+++ b/src/protocol/reqrep/req.c
@@ -28,7 +28,6 @@ static void req_pipe_fini(void *);
// A req_sock is our per-socket protocol private structure.
struct req_sock {
- nni_sock * sock;
nni_msgq * uwq;
nni_msgq * urq;
nni_duration retry;
@@ -90,7 +89,6 @@ req_sock_init(void **sp, nni_sock *sock)
// this is "semi random" start for request IDs.
s->nextid = nni_random();
s->retry = NNI_SECOND * 60;
- s->sock = sock;
s->reqmsg = NULL;
s->raw = 0;
s->wantw = 0;
@@ -512,7 +510,6 @@ req_sock_send(void *arg, nni_aio *aio)
nni_mtx_lock(&s->mtx);
if (s->raw) {
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->sock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -536,11 +533,6 @@ req_sock_send(void *arg, nni_aio *aio)
return;
}
- // XXX: I think we should just not do this... and leave the
- // socket "permanently writeable". This does screw up all the
- // backpressure.
- // nni_sock_send_pending(s->sock);
-
// If another message is there, this cancels it.
if (s->reqmsg != NULL) {
nni_msg_free(s->reqmsg);
@@ -619,7 +611,6 @@ req_sock_recv(void *arg, nni_aio *aio)
}
}
nni_mtx_unlock(&s->mtx);
- nni_sock_recv_pending(s->sock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c
index 4c3ea8e3..dbce0751 100644
--- a/src/protocol/survey/respond.c
+++ b/src/protocol/survey/respond.c
@@ -29,7 +29,6 @@ static void resp_pipe_fini(void *);
// A resp_sock is our per-socket protocol private structure.
struct resp_sock {
- nni_sock * nsock;
nni_msgq * urq;
nni_msgq * uwq;
int raw;
@@ -85,7 +84,6 @@ resp_sock_init(void **sp, nni_sock *nsock)
}
s->ttl = 8; // Per RFC
- s->nsock = nsock;
s->raw = 0;
s->btrace = NULL;
s->btrace_len = 0;
@@ -268,9 +266,9 @@ resp_send_cb(void *arg)
static void
resp_recv_cb(void *arg)
{
- resp_pipe *p = arg;
- resp_sock *s = p->psock;
- nni_msgq * urq;
+ resp_pipe *p = arg;
+ resp_sock *s = p->psock;
+ nni_msgq * urq = s->urq;
nni_msg * msg;
int hops;
int rv;
@@ -279,8 +277,6 @@ resp_recv_cb(void *arg)
goto error;
}
- urq = nni_sock_recvq(s->nsock);
-
msg = nni_aio_get_msg(p->aio_recv);
nni_aio_set_msg(p->aio_recv, NULL);
nni_msg_set_pipe(msg, p->id);
@@ -384,7 +380,6 @@ resp_sock_send(void *arg, nni_aio *aio)
nni_mtx_lock(&s->mtx);
if (s->raw) {
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -413,7 +408,6 @@ resp_sock_send(void *arg, nni_aio *aio)
s->btrace_len = 0;
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}
@@ -454,7 +448,6 @@ resp_sock_recv(void *arg, nni_aio *aio)
{
resp_sock *s = arg;
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c
index 1c15054f..f44cd63a 100644
--- a/src/protocol/survey/survey.c
+++ b/src/protocol/survey/survey.c
@@ -28,7 +28,6 @@ static void surv_timeout(void *);
// A surv_sock is our per-socket protocol private structure.
struct surv_sock {
- nni_sock * nsock;
nni_duration survtime;
nni_time expire;
int raw;
@@ -84,7 +83,6 @@ surv_sock_init(void **sp, nni_sock *nsock)
nni_timer_init(&s->timer, surv_timeout, s);
s->nextid = nni_random();
- s->nsock = nsock;
s->raw = 0;
s->survtime = NNI_SECOND * 60;
s->expire = NNI_TIME_ZERO;
@@ -362,7 +360,6 @@ surv_sock_recv(void *arg, nni_aio *aio)
return;
}
nni_mtx_unlock(&s->mtx);
- nni_sock_recv_pending(s->nsock);
nni_msgq_aio_get(s->urq, aio);
}
@@ -378,7 +375,6 @@ surv_sock_send(void *arg, nni_aio *aio)
// No automatic retry, and the request ID must
// be in the header coming down.
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
return;
}
@@ -404,7 +400,6 @@ surv_sock_send(void *arg, nni_aio *aio)
nni_mtx_unlock(&s->mtx);
- nni_sock_send_pending(s->nsock);
nni_msgq_aio_put(s->uwq, aio);
}