diff options
Diffstat (limited to 'src/protocol')
| -rw-r--r-- | src/protocol/bus/bus.c | 14 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v0.c | 4 | ||||
| -rw-r--r-- | src/protocol/pair/pair_v1.c | 16 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 7 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 9 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 5 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 5 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 9 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 13 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 5 |
11 files changed, 21 insertions, 71 deletions
diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 046aa19a..07f91602 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -37,7 +37,6 @@ static void bus_pipe_putq_cb(void *); // A bus_sock is our per-socket protocol private structure. struct bus_sock { - nni_sock *nsock; int raw; nni_aio * aio_getq; nni_list pipes; @@ -85,10 +84,9 @@ bus_sock_init(void **sp, nni_sock *nsock) bus_sock_fini(s); return (rv); } - s->nsock = nsock; - s->raw = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); + s->raw = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); *sp = s; return (0); @@ -241,7 +239,7 @@ bus_pipe_recv_cb(void *arg) nni_aio_set_msg(p->aio_putq, msg); nni_aio_set_msg(p->aio_recv, NULL); - nni_msgq_aio_put(nni_sock_recvq(s->nsock), p->aio_putq); + nni_msgq_aio_put(s->urq, p->aio_putq); } static void @@ -315,7 +313,7 @@ bus_sock_getq_cb(void *arg) static void bus_sock_getq(bus_sock *s) { - nni_msgq_aio_get(nni_sock_sendq(s->nsock), s->aio_getq); + nni_msgq_aio_get(s->uwq, s->aio_getq); } static void @@ -349,7 +347,6 @@ bus_sock_send(void *arg, nni_aio *aio) { bus_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -358,7 +355,6 @@ bus_sock_recv(void *arg, nni_aio *aio) { bus_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pair/pair_v0.c b/src/protocol/pair/pair_v0.c index cca03cc8..29f3b59c 100644 --- a/src/protocol/pair/pair_v0.c +++ b/src/protocol/pair/pair_v0.c @@ -28,7 +28,6 @@ static void pair0_pipe_fini(void *); // pair0_sock is our per-socket protocol private structure. struct pair0_sock { - nni_sock * nsock; pair0_pipe *ppipe; nni_msgq * uwq; nni_msgq * urq; @@ -58,7 +57,6 @@ pair0_sock_init(void **sp, nni_sock *nsock) return (NNG_ENOMEM); } nni_mtx_init(&s->mtx); - s->nsock = nsock; s->ppipe = NULL; s->raw = 0; s->uwq = nni_sock_sendq(nsock); @@ -248,7 +246,6 @@ pair0_sock_send(void *arg, nni_aio *aio) { pair0_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -257,7 +254,6 @@ pair0_sock_recv(void *arg, nni_aio *aio) { pair0_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c index f43a4785..86ee97eb 100644 --- a/src/protocol/pair/pair_v1.c +++ b/src/protocol/pair/pair_v1.c @@ -29,7 +29,6 @@ static void pair1_pipe_fini(void *); // pair1_sock is our per-socket protocol private structure. struct pair1_sock { - nni_sock * nsock; nni_msgq * uwq; nni_msgq * urq; int raw; @@ -89,13 +88,12 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (rv); } - s->nsock = nsock; - s->raw = 0; - s->poly = 0; - s->uwq = nni_sock_sendq(nsock); - s->urq = nni_sock_recvq(nsock); - s->ttl = 8; - *sp = s; + s->raw = 0; + s->poly = 0; + s->uwq = nni_sock_sendq(nsock); + s->urq = nni_sock_recvq(nsock); + s->ttl = 8; + *sp = s; return (0); } @@ -451,7 +449,6 @@ pair1_sock_send(void *arg, nni_aio *aio) { pair1_sock *s = arg; - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -460,7 +457,6 @@ pair1_sock_recv(void *arg, nni_aio *aio) { pair1_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 7dd0c8ed..267352c5 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -26,7 +26,6 @@ static void pull_putq(pull_pipe *, nni_msg *); struct pull_sock { nni_msgq *urq; int raw; - nni_sock *sock; }; // A pull_pipe is our per-pipe protocol private structure. @@ -45,9 +44,8 @@ pull_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->urq = nni_sock_recvq(sock); - s->sock = sock; + s->raw = 0; + s->urq = nni_sock_recvq(sock); *sp = s; return (0); @@ -198,7 +196,6 @@ pull_sock_recv(void *arg, nni_aio *aio) { pull_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index af7b80ca..995ac56d 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -28,7 +28,6 @@ static void push_getq_cb(void *); struct push_sock { nni_msgq *uwq; int raw; - nni_sock *sock; }; // An nni_push_pipe is our per-pipe protocol private structure. @@ -50,10 +49,9 @@ push_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = 0; - s->sock = sock; - s->uwq = nni_sock_sendq(sock); - *sp = s; + s->raw = 0; + s->uwq = nni_sock_sendq(sock); + *sp = s; return (0); } @@ -209,7 +207,6 @@ push_sock_send(void *arg, nni_aio *aio) { push_sock *s = arg; - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index 4604d0ff..29863322 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -30,7 +30,6 @@ static void pub_pipe_fini(void *); // A pub_sock is our per-socket protocol private structure. struct pub_sock { - nni_sock *sock; nni_msgq *uwq; int raw; nni_aio * aio_getq; @@ -75,8 +74,7 @@ pub_sock_init(void **sp, nni_sock *sock) return (rv); } - s->sock = sock; - s->raw = 0; + s->raw = 0; NNI_LIST_INIT(&s->pipes, pub_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -291,7 +289,6 @@ pub_sock_send(void *arg, nni_aio *aio) { pub_sock *s = arg; - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 323bbb2f..d87b42ec 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -36,7 +36,6 @@ struct sub_topic { // An nni_rep_sock is our per-socket protocol private structure. struct sub_sock { - nni_sock *sock; nni_list topics; nni_msgq *urq; int raw; @@ -61,8 +60,7 @@ sub_sock_init(void **sp, nni_sock *sock) } nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub_topic, node); - s->sock = sock; - s->raw = 0; + s->raw = 0; s->urq = nni_sock_recvq(sock); *sp = s; @@ -296,7 +294,6 @@ sub_sock_recv(void *arg, nni_aio *aio) { sub_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 510beab3..5d924e32 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -29,7 +29,6 @@ static void rep_pipe_fini(void *); // A rep_sock is our per-socket protocol private structure. struct rep_sock { - nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; @@ -84,7 +83,6 @@ rep_sock_init(void **sp, nni_sock *sock) } s->ttl = 8; // Per RFC - s->sock = sock; s->raw = 0; s->btrace = NULL; s->btrace_len = 0; @@ -420,7 +418,6 @@ rep_sock_send(void *arg, nni_aio *aio) if (s->raw) { // Pass thru nni_mtx_unlock(&s->lk); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -447,7 +444,6 @@ rep_sock_send(void *arg, nni_aio *aio) s->btrace_len = 0; nni_mtx_unlock(&s->lk); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); } @@ -456,7 +452,6 @@ rep_sock_recv(void *arg, nni_aio *aio) { rep_sock *s = arg; - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 81abd306..1ab49f3e 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -28,7 +28,6 @@ static void req_pipe_fini(void *); // A req_sock is our per-socket protocol private structure. struct req_sock { - nni_sock * sock; nni_msgq * uwq; nni_msgq * urq; nni_duration retry; @@ -90,7 +89,6 @@ req_sock_init(void **sp, nni_sock *sock) // this is "semi random" start for request IDs. s->nextid = nni_random(); s->retry = NNI_SECOND * 60; - s->sock = sock; s->reqmsg = NULL; s->raw = 0; s->wantw = 0; @@ -512,7 +510,6 @@ req_sock_send(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if (s->raw) { nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->sock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -536,11 +533,6 @@ req_sock_send(void *arg, nni_aio *aio) return; } - // XXX: I think we should just not do this... and leave the - // socket "permanently writeable". This does screw up all the - // backpressure. - // nni_sock_send_pending(s->sock); - // If another message is there, this cancels it. if (s->reqmsg != NULL) { nni_msg_free(s->reqmsg); @@ -619,7 +611,6 @@ req_sock_recv(void *arg, nni_aio *aio) } } nni_mtx_unlock(&s->mtx); - nni_sock_recv_pending(s->sock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 4c3ea8e3..dbce0751 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -29,7 +29,6 @@ static void resp_pipe_fini(void *); // A resp_sock is our per-socket protocol private structure. struct resp_sock { - nni_sock * nsock; nni_msgq * urq; nni_msgq * uwq; int raw; @@ -85,7 +84,6 @@ resp_sock_init(void **sp, nni_sock *nsock) } s->ttl = 8; // Per RFC - s->nsock = nsock; s->raw = 0; s->btrace = NULL; s->btrace_len = 0; @@ -268,9 +266,9 @@ resp_send_cb(void *arg) static void resp_recv_cb(void *arg) { - resp_pipe *p = arg; - resp_sock *s = p->psock; - nni_msgq * urq; + resp_pipe *p = arg; + resp_sock *s = p->psock; + nni_msgq * urq = s->urq; nni_msg * msg; int hops; int rv; @@ -279,8 +277,6 @@ resp_recv_cb(void *arg) goto error; } - urq = nni_sock_recvq(s->nsock); - msg = nni_aio_get_msg(p->aio_recv); nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, p->id); @@ -384,7 +380,6 @@ resp_sock_send(void *arg, nni_aio *aio) nni_mtx_lock(&s->mtx); if (s->raw) { nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -413,7 +408,6 @@ resp_sock_send(void *arg, nni_aio *aio) s->btrace_len = 0; nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } @@ -454,7 +448,6 @@ resp_sock_recv(void *arg, nni_aio *aio) { resp_sock *s = arg; - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 1c15054f..f44cd63a 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -28,7 +28,6 @@ static void surv_timeout(void *); // A surv_sock is our per-socket protocol private structure. struct surv_sock { - nni_sock * nsock; nni_duration survtime; nni_time expire; int raw; @@ -84,7 +83,6 @@ surv_sock_init(void **sp, nni_sock *nsock) nni_timer_init(&s->timer, surv_timeout, s); s->nextid = nni_random(); - s->nsock = nsock; s->raw = 0; s->survtime = NNI_SECOND * 60; s->expire = NNI_TIME_ZERO; @@ -362,7 +360,6 @@ surv_sock_recv(void *arg, nni_aio *aio) return; } nni_mtx_unlock(&s->mtx); - nni_sock_recv_pending(s->nsock); nni_msgq_aio_get(s->urq, aio); } @@ -378,7 +375,6 @@ surv_sock_send(void *arg, nni_aio *aio) // No automatic retry, and the request ID must // be in the header coming down. nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); return; } @@ -404,7 +400,6 @@ surv_sock_send(void *arg, nni_aio *aio) nni_mtx_unlock(&s->mtx); - nni_sock_send_pending(s->nsock); nni_msgq_aio_put(s->uwq, aio); } |
