1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
//
// Copyright 2017 Garrett D'Amore <garrett@damore.org>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt). A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//
#include "convey.h"
#include "nng.h"
#include <string.h>
#include <assert.h>
#define APPENDSTR(m, s) nng_msg_append(m, s, strlen(s))
#define CHECKSTR(m, s) So(nng_msg_len(m) == strlen(s));\
So(memcmp(nng_msg_body(m), s, strlen(s)) == 0)
struct evcnt {
nng_socket sock;
int readable;
int writeable;
int pipeadd;
int piperem;
int epadd;
int eprem;
int err;
};
void
bump(nng_event *ev, void *arg)
{
struct evcnt *cnt = arg;
assert(nng_event_socket(ev) == cnt->sock);
switch (nng_event_type(ev)) {
case NNG_EV_CAN_SEND:
cnt->writeable++;
break;
case NNG_EV_CAN_RECV:
cnt->readable++;
break;
case NNG_EV_PIPE_ADD:
cnt->pipeadd++;
break;
case NNG_EV_PIPE_REM:
cnt->piperem++;
break;
case NNG_EV_ENDPT_ADD:
cnt->epadd++;
break;
case NNG_EV_ENDPT_REM:
cnt->eprem++;
break;
default:
assert(0);
break;
}
}
Main({
const char *addr = "inproc://test";
Test("Event Handling", {
Convey("Given a connected pair of pair sockets", {
nng_socket sock1;
nng_socket sock2;
struct evcnt evcnt1;
struct evcnt evcnt2;
nng_notify *notify1;
nng_notify *notify2;
So(nng_open(&sock1, NNG_PROTO_PAIR) == 0);
So(nng_open(&sock2, NNG_PROTO_PAIR) == 0);
memset(&evcnt1, 0, sizeof (evcnt1));
memset(&evcnt2, 0, sizeof (evcnt2));
evcnt1.sock = sock1;
evcnt2.sock = sock2;
Reset({
nng_close(sock1);
nng_close(sock2);
})
So(nng_listen(sock1, addr, NULL, NNG_FLAG_SYNCH) == 0);
So(nng_dial(sock2, addr, NULL, NNG_FLAG_SYNCH) == 0);
// Let everything connect.
nng_usleep(100000);
Convey("We can register callbacks", {
So((notify1 = nng_setnotify(sock1, NNG_EV_CAN_SEND, bump, &evcnt1)) != NULL);
So((notify2 = nng_setnotify(sock2, NNG_EV_CAN_RECV, bump, &evcnt2)) != NULL);
Convey("They are called", {
nng_msg *msg;
So(nng_msg_alloc(&msg, 0) == 0);
APPENDSTR(msg, "abc");
So(nng_sendmsg(sock1, msg, 0) == 0);
So(nng_recvmsg(sock2, &msg, 0) == 0);
CHECKSTR(msg, "abc");
nng_msg_free(msg);
// The notify runs async...
nng_usleep(100000);
So(evcnt1.writeable == 1);
So(evcnt2.readable == 1);
})
Convey("We can unregister them", {
nng_unsetnotify(sock1, notify1);
So(1);
nng_unsetnotify(sock2, notify2);
So(1);
})
})
})
})
})
|