aboutsummaryrefslogtreecommitdiff
path: root/demo/stream/stream.c
blob: 0c194111b09ba4839f6dd153b89199021af9e9ec (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright 2020 Hugo Lindström <hugolm84@gmail.com>

// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
// file was obtained (LICENSE.txt).  A copy of the license may also be
// found online at https://opensource.org/licenses/MIT.
//

// This program serves as an example for how to write async communication with
// an arbitrary socket using nng_stream. The server receives a connection and
// sends a hello message to the nng_stream iov.

// To run this program, start the server as stream -s <portnumber>
// Then connect to it with the client as stream -c <url>
//
//  For example:
//
//  % ./stream -s 5555 &
//  % ./stream -c tcp://127.0.0.1:5555

#include <nng/nng.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

void
nng_fatal(const char *func, int rv)
{
	fprintf(stderr, "%s: %s\n", func, nng_strerror(rv));
	exit(1);
}

int server(const char *url);
int client(const char *url);

int
main(int argc, char **argv)
{
	int rc;

	if (argc < 3) {
		fprintf(stderr, "Usage: %s [-s url|-c url]\n", argv[0]);
		exit(EXIT_FAILURE);
	}

	if ((rc = nng_init(NULL)) != 0) {
		nng_fatal("nng_init", rc);
	};
	if (strcmp(argv[1], "-s") == 0) {
		rc = server(argv[2]);
	} else {
		rc = client(argv[2]);
	}
	nng_fini();
	exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
}

int
client(const char *url)
{
	nng_stream_dialer *dialer;
	nng_aio           *aio;
	nng_iov            iov;
	int                rv;

	// Allocatate dialer and aio assoicated with this connection
	if ((rv = nng_stream_dialer_alloc(&dialer, url)) != 0) {
		nng_fatal("call to nng_stream_dialer_alloc failed", rv);
	}

	if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) {
		nng_fatal("call to nng_aio_alloc", rv);
	}
	nng_aio_set_timeout(aio, 5000); // 5 sec

	// Allocatate a buffer to recv
	iov.iov_len = 100;
	iov.iov_buf = (char *) malloc(sizeof(char) * iov.iov_len);
	if ((rv = nng_aio_set_iov(aio, 1, &iov)) != 0) {
		nng_fatal("call to nng_aio_alloc", rv);
	}
	// Connect to the socket via url provided to alloc
	nng_stream_dialer_dial(dialer, aio);

	// Wait for connection
	nng_aio_wait(aio);
	if ((rv = nng_aio_result(aio)) != 0) {
		nng_fatal("waiting for ng_stream_dialer_dial failed", rv);
	}

	// Get the stream (connection) at position 0
	nng_stream *c1 = (nng_stream *) nng_aio_get_output(aio, 0);
	nng_stream_recv(c1, aio);
	nng_aio_wait(aio);
	if ((rv = nng_aio_result(aio)) != 0) {
		nng_fatal("waiting for nng_stream_recv failed", rv);
	}

	size_t recv_count = nng_aio_count(aio);
	if (recv_count <= 0) {
		nng_fatal("Recv count was 0!", NNG_ECONNABORTED);
	} else {
		printf("received %zu bytes, message: '%s'\n", recv_count,
		    (char *) iov.iov_buf);
	}

	// Send ELCOSE to send/recv associated wit this stream
	free(iov.iov_buf);

	// stop everything before freeing
	nng_stream_stop(c1);
	nng_stream_dialer_stop(dialer);

	nng_stream_free(c1);
	nng_aio_free(aio);
	nng_stream_dialer_free(dialer);
	return 0;
}

int
server(const char *url)
{
	nng_stream_listener *listener;
	nng_aio             *aio;
	nng_iov              iov;
	int                  rv;

	// Allocatate dialer and aio assoicated with this connection
	if ((rv = nng_stream_listener_alloc(&listener, url)) != 0) {
		nng_fatal("call to nng_stream_listener_alloc failed", rv);
	}

	if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) {
		nng_fatal("call to nng_aio_alloc", rv);
	}
	nng_aio_set_timeout(aio, 5000); // 5 sec

	iov.iov_buf = "This is a message.";
	iov.iov_len = strlen(iov.iov_buf);

	if ((rv = nng_aio_set_iov(aio, 1, &iov)) != 0) {
		nng_fatal("call to nng_aio_alloc", rv);
	}
	// Connect to the socket via url provided to alloc
	if ((rv = nng_stream_listener_listen(listener)) != 0) {
		nng_fatal("call to nng_stream_listener_listen failed", rv);
	}
	nng_stream_listener_accept(listener, aio);

	// Wait for connection
	nng_aio_wait(aio);
	if ((rv = nng_aio_result(aio)) != 0) {
		nng_fatal("waiting for nng_stream_listener_accept failed", rv);
	}

	// Get the stream (connection) at position 0
	nng_stream *c1 = (nng_stream *) nng_aio_get_output(aio, 0);
	nng_stream_send(c1, aio);
	nng_aio_wait(aio);
	if ((rv = nng_aio_result(aio)) != 0) {
		nng_fatal("waiting for nng_stream_recv failed", rv);
	}

	size_t sent_count = nng_aio_count(aio);
	if (sent_count <= 0) {
		nng_fatal("Recv count was 0!", NNG_ECONNABORTED);
	} else {
		printf("sent %zu bytes, message: '%s'\n", sent_count,
		    (char *) iov.iov_buf);
	}

	// stop everything before freeing
	nng_stream_stop(c1);
	nng_stream_listener_stop(listener);

	nng_stream_free(c1);
	nng_aio_free(aio);
	nng_stream_listener_free(listener);
	return 0;
}