aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/protocol/pair/pair_v1.c47
-rw-r--r--tests/pair1.c14
2 files changed, 37 insertions, 24 deletions
diff --git a/src/protocol/pair/pair_v1.c b/src/protocol/pair/pair_v1.c
index b76eecbb..183c260a 100644
--- a/src/protocol/pair/pair_v1.c
+++ b/src/protocol/pair/pair_v1.c
@@ -34,21 +34,22 @@ struct pair1_sock {
int ttl;
nni_mtx mtx;
nni_idhash *pipes;
+ nni_list plist;
int started;
int poly;
nni_aio aio_getq;
- pair1_pipe *pipe; // cooked mode only
};
// pair1_pipe is our per-pipe protocol private structure.
struct pair1_pipe {
- nni_pipe * npipe;
- pair1_sock *psock;
- nni_msgq * sendq;
- nni_aio aio_send;
- nni_aio aio_recv;
- nni_aio aio_getq;
- nni_aio aio_putq;
+ nni_pipe * npipe;
+ pair1_sock * psock;
+ nni_msgq * sendq;
+ nni_aio aio_send;
+ nni_aio aio_recv;
+ nni_aio aio_getq;
+ nni_aio aio_putq;
+ nni_list_node node;
};
static void
@@ -72,6 +73,8 @@ pair1_sock_init(void **sp, nni_sock *nsock)
if ((s = NNI_ALLOC_STRUCT(s)) == NULL) {
return (NNG_ENOMEM);
}
+ NNI_LIST_INIT(&s->plist, pair1_pipe, node);
+
// Raw mode uses this.
if (((rv = nni_aio_init(&s->aio_getq, pair1_sock_getq_cb, s)) != 0) ||
((rv = nni_mtx_init(&s->mtx)) != 0) ||
@@ -133,14 +136,15 @@ pair1_pipe_start(void *arg)
int rv;
id = nni_pipe_id(p->npipe);
+ nni_mtx_lock(&s->mtx);
if ((rv = nni_idhash_insert(s->pipes, id, p)) != 0) {
+ nni_mtx_unlock(&s->mtx);
return (rv);
}
- nni_mtx_lock(&s->mtx);
if (!s->poly) {
- if (s->pipe != NULL) {
- nni_mtx_unlock(&s->mtx);
+ if (!nni_list_empty(&s->plist)) {
nni_idhash_remove(s->pipes, id);
+ nni_mtx_unlock(&s->mtx);
return (NNG_EBUSY);
}
} else {
@@ -148,8 +152,7 @@ pair1_pipe_start(void *arg)
nni_msgq_aio_get(s->uwq, &s->aio_getq);
}
}
-
- s->pipe = p;
+ nni_list_append(&s->plist, p);
s->started = 1;
nni_mtx_unlock(&s->mtx);
@@ -181,11 +184,9 @@ pair1_pipe_stop(void *arg)
nni_aio_cancel(&p->aio_getq, NNG_ECANCELED);
nni_mtx_lock(&s->mtx);
- if (s->pipe == p) {
- s->pipe = NULL;
- }
- nni_mtx_unlock(&s->mtx);
nni_idhash_remove(s->pipes, nni_pipe_id(p->npipe));
+ nni_list_node_remove(&p->node);
+ nni_mtx_unlock(&s->mtx);
}
static void
@@ -261,14 +262,16 @@ pair1_sock_getq_cb(void *arg)
// By definition we are in polyamorous mode.
NNI_ASSERT(s->poly);
+ p = NULL;
nni_mtx_lock(&s->mtx);
-
// If no pipe was requested, we look for any connected peer.
- if (((id = nni_msg_get_pipe(msg)) == 0) && (s->pipe != NULL)) {
- id = nni_pipe_id(s->pipe->npipe);
+ if (((id = nni_msg_get_pipe(msg)) == 0) &&
+ (!nni_list_empty(&s->plist))) {
+ p = nni_list_first(&s->plist);
+ } else {
+ nni_idhash_find(s->pipes, id, (void **) &p);
}
-
- if (nni_idhash_find(s->pipes, id, (void **) &p) != 0) {
+ if (p == NULL) {
// Pipe not present!
nni_mtx_unlock(&s->mtx);
nni_msg_free(msg);
diff --git a/tests/pair1.c b/tests/pair1.c
index d81309ef..62f7b1d6 100644
--- a/tests/pair1.c
+++ b/tests/pair1.c
@@ -357,14 +357,24 @@ TestMain("PAIRv1 protocol", {
So(nng_listen(s1, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(c1, addr, NULL, NNG_FLAG_SYNCH) == 0);
+ nng_usleep(100000);
+ So(nng_dial(c2, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "YES");
So(nng_sendmsg(s1, msg, 0) == 0);
So(nng_recvmsg(c1, &msg, 0) == 0);
CHECKSTR(msg, "YES");
- p1 = nng_msg_get_pipe(msg);
- So(p1 != 0);
+ nng_msg_free(msg);
+
+ nng_close(c1);
+ nng_usleep(10000);
+
+ So(nng_msg_alloc(&msg, 0) == 0);
+ APPENDSTR(msg, "AGAIN");
+ So(nng_sendmsg(s1, msg, 0) == 0);
+ So(nng_recvmsg(c2, &msg, 0) == 0);
+ CHECKSTR(msg, "AGAIN");
nng_msg_free(msg);
});