aboutsummaryrefslogtreecommitdiff
path: root/src/protocol/pipeline/push.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/protocol/pipeline/push.c')
-rw-r--r--src/protocol/pipeline/push.c93
1 files changed, 40 insertions, 53 deletions
diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c
index de774125..6cdc9cc5 100644
--- a/src/protocol/pipeline/push.c
+++ b/src/protocol/pipeline/push.c
@@ -21,16 +21,15 @@ typedef struct nni_push_sock nni_push_sock;
// An nni_push_sock is our per-socket protocol private structure.
struct nni_push_sock {
- nni_mtx mx;
nni_cv cv;
nni_msgq * uwq;
- nni_thr sender;
int raw;
int closing;
int wantw;
nni_list pipes;
nni_push_pipe * nextpipe;
int npipes;
+ nni_sock * sock;
};
// An nni_push_pipe is our per-pipe protocol private structure.
@@ -42,10 +41,8 @@ struct nni_push_pipe {
nni_list_node node;
};
-static void nni_push_rrdist(void *);
-
static int
-nni_push_init(void **pushp, nni_sock *sock)
+nni_push_sock_init(void **pushp, nni_sock *sock)
{
nni_push_sock *push;
int rv;
@@ -53,12 +50,7 @@ nni_push_init(void **pushp, nni_sock *sock)
if ((push = NNI_ALLOC_STRUCT(push)) == NULL) {
return (NNG_ENOMEM);
}
- if ((rv = nni_mtx_init(&push->mx)) != 0) {
- NNI_FREE_STRUCT(push);
- return (rv);
- }
- if ((rv = nni_cv_init(&push->cv, &push->mx)) != 0) {
- nni_mtx_fini(&push->mx);
+ if ((rv = nni_cv_init(&push->cv, nni_sock_mtx(sock))) != 0) {
NNI_FREE_STRUCT(push);
return (rv);
}
@@ -67,36 +59,32 @@ nni_push_init(void **pushp, nni_sock *sock)
push->npipes = 0;
push->wantw = 0;
push->nextpipe = NULL;
+ push->sock = sock;
push->uwq = nni_sock_sendq(sock);
*pushp = push;
nni_sock_recverr(sock, NNG_ENOTSUP);
- rv = nni_thr_init(&push->sender, nni_push_rrdist, push);
- if (rv != 0) {
- nni_cv_fini(&push->cv);
- nni_mtx_fini(&push->mx);
- NNI_FREE_STRUCT(push);
- return (rv);
- }
- nni_thr_run(&push->sender);
return (0);
}
static void
-nni_push_fini(void *arg)
+nni_push_sock_close(void *arg)
{
nni_push_sock *push = arg;
// Shut down the resender. We request it to exit by clearing
// its old value, then kick it.
- nni_mtx_lock(&push->mx);
push->closing = 1;
nni_cv_wake(&push->cv);
- nni_mtx_unlock(&push->mx);
+}
+
+
+static void
+nni_push_sock_fini(void *arg)
+{
+ nni_push_sock *push = arg;
- nni_thr_fini(&push->sender);
nni_cv_fini(&push->cv);
- nni_mtx_fini(&push->mx);
NNI_FREE_STRUCT(push);
}
@@ -142,9 +130,6 @@ nni_push_pipe_add(void *arg)
if (nni_pipe_peer(pp->pipe) != NNG_PROTO_PULL) {
return (NNG_EPROTO);
}
- // Wake the sender since we have a new pipe.
- nni_mtx_lock(&push->mx);
-
// Turns out it should not really matter where we stick this.
// The end makes our test cases easier.
nni_list_append(&push->pipes, pp);
@@ -152,7 +137,6 @@ nni_push_pipe_add(void *arg)
// Wake the top sender, as we can accept a job.
push->npipes++;
nni_cv_wake(&push->cv);
- nni_mtx_unlock(&push->mx);
return (0);
}
@@ -163,13 +147,11 @@ nni_push_pipe_rem(void *arg)
nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
- nni_mtx_lock(&push->mx);
if (pp == push->nextpipe) {
push->nextpipe = nni_list_next(&push->pipes, pp);
}
push->npipes--;
nni_list_remove(&push->pipes, pp);
- nni_mtx_unlock(&push->mx);
}
@@ -178,17 +160,18 @@ nni_push_pipe_send(void *arg)
{
nni_push_pipe *pp = arg;
nni_push_sock *push = pp->push;
+ nni_mtx *mx = nni_sock_mtx(push->sock);
nni_msg *msg;
for (;;) {
if (nni_msgq_get_sig(pp->mq, &msg, &pp->sigclose) != 0) {
break;
}
- nni_mtx_lock(&push->mx);
+ nni_mtx_lock(mx);
if (push->wantw) {
nni_cv_wake(&push->cv);
}
- nni_mtx_unlock(&push->mx);
+ nni_mtx_unlock(mx);
if (nni_pipe_send(pp->pipe, msg) != 0) {
nni_msg_free(msg);
break;
@@ -216,16 +199,14 @@ nni_push_pipe_recv(void *arg)
static int
-nni_push_setopt(void *arg, int opt, const void *buf, size_t sz)
+nni_push_sock_setopt(void *arg, int opt, const void *buf, size_t sz)
{
nni_push_sock *push = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&push->mx);
rv = nni_setopt_int(&push->raw, buf, sz, 0, 1);
- nni_mtx_unlock(&push->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -235,16 +216,14 @@ nni_push_setopt(void *arg, int opt, const void *buf, size_t sz)
static int
-nni_push_getopt(void *arg, int opt, void *buf, size_t *szp)
+nni_push_sock_getopt(void *arg, int opt, void *buf, size_t *szp)
{
nni_push_sock *push = arg;
int rv;
switch (opt) {
case NNG_OPT_RAW:
- nni_mtx_lock(&push->mx);
rv = nni_getopt_int(&push->raw, buf, szp);
- nni_mtx_unlock(&push->mx);
break;
default:
rv = NNG_ENOTSUP;
@@ -254,12 +233,13 @@ nni_push_getopt(void *arg, int opt, void *buf, size_t *szp)
static void
-nni_push_rrdist(void *arg)
+nni_push_sock_send(void *arg)
{
nni_push_sock *push = arg;
nni_push_pipe *pp;
nni_msgq *uwq = push->uwq;
nni_msg *msg = NULL;
+ nni_mtx *mx = nni_sock_mtx(push->sock);
int rv;
int i;
@@ -269,10 +249,10 @@ nni_push_rrdist(void *arg)
return;
}
- nni_mtx_lock(&push->mx);
+ nni_mtx_lock(mx);
if (push->closing) {
if (msg != NULL) {
- nni_mtx_unlock(&push->mx);
+ nni_mtx_unlock(mx);
nni_msg_free(msg);
return;
}
@@ -296,14 +276,14 @@ nni_push_rrdist(void *arg)
} else {
push->wantw = 0;
}
- nni_mtx_unlock(&push->mx);
+ nni_mtx_unlock(mx);
}
}
// This is the global protocol structure -- our linkage to the core.
// This should be the only global non-static symbol in this file.
-static nni_proto_pipe nni_push_proto_pipe = {
+static nni_proto_pipe_ops nni_push_pipe_ops = {
.pipe_init = nni_push_pipe_init,
.pipe_fini = nni_push_pipe_fini,
.pipe_add = nni_push_pipe_add,
@@ -312,15 +292,22 @@ static nni_proto_pipe nni_push_proto_pipe = {
.pipe_recv = nni_push_pipe_recv,
};
+static nni_proto_sock_ops nni_push_sock_ops = {
+ .sock_init = nni_push_sock_init,
+ .sock_fini = nni_push_sock_fini,
+ .sock_close = nni_push_sock_close,
+ .sock_setopt = nni_push_sock_setopt,
+ .sock_getopt = nni_push_sock_getopt,
+ .sock_send = nni_push_sock_send,
+ .sock_recv = NULL,
+ .sock_rfilter = NULL,
+ .sock_sfilter = NULL,
+};
+
nni_proto nni_push_proto = {
- .proto_self = NNG_PROTO_PUSH,
- .proto_peer = NNG_PROTO_PULL,
- .proto_name = "push",
- .proto_pipe = &nni_push_proto_pipe,
- .proto_init = nni_push_init,
- .proto_fini = nni_push_fini,
- .proto_setopt = nni_push_setopt,
- .proto_getopt = nni_push_getopt,
- .proto_recv_filter = NULL,
- .proto_send_filter = NULL,
+ .proto_self = NNG_PROTO_PUSH,
+ .proto_peer = NNG_PROTO_PULL,
+ .proto_name = "push",
+ .proto_pipe_ops = &nni_push_pipe_ops,
+ .proto_sock_ops = &nni_push_sock_ops,
};