// Copyright 2020 Hugo Lindström // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this // file was obtained (LICENSE.txt). A copy of the license may also be // found online at https://opensource.org/licenses/MIT. // // This program serves as an example for how to write async communication with // an arbitrary socket using nng_stream. The server receives a connection and // sends a hello message to the nng_stream iov. // To run this program, start the server as stream -s // Then connect to it with the client as stream -c // // For example: // // % ./stream -s 5555 & // % ./stream -c tcp://127.0.0.1:5555 #include #include #include #include void nng_fatal(const char *func, int rv) { fprintf(stderr, "%s: %s\n", func, nng_strerror(rv)); exit(1); } int server(const char *url); int client(const char *url); int main(int argc, char **argv) { int rc; if (argc < 3) { fprintf(stderr, "Usage: %s [-s url|-c url]\n", argv[0]); exit(EXIT_FAILURE); } if ((rc = nng_init(NULL)) != 0) { nng_fatal("nng_init", rc); }; if (strcmp(argv[1], "-s") == 0) { rc = server(argv[2]); } else { rc = client(argv[2]); } nng_fini(); exit(rc == 0 ? EXIT_SUCCESS : EXIT_FAILURE); } int client(const char *url) { nng_stream_dialer *dialer; nng_aio *aio; nng_iov iov; int rv; // Allocate dialer and aio associated with this connection if ((rv = nng_stream_dialer_alloc(&dialer, url)) != 0) { nng_fatal("call to nng_stream_dialer_alloc failed", rv); } if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) { nng_fatal("call to nng_aio_alloc", rv); } nng_aio_set_timeout(aio, 5000); // 5 sec // Allocatate a buffer to recv iov.iov_len = 100; iov.iov_buf = (char *) malloc(sizeof(char) * iov.iov_len); if ((rv = nng_aio_set_iov(aio, 1, &iov)) != 0) { nng_fatal("call to nng_aio_alloc", rv); } // Connect to the socket via url provided to alloc nng_stream_dialer_dial(dialer, aio); // Wait for connection nng_aio_wait(aio); if ((rv = nng_aio_result(aio)) != 0) { nng_fatal("waiting for ng_stream_dialer_dial failed", rv); } // Get the stream (connection) at position 0 nng_stream *c1 = (nng_stream *) nng_aio_get_output(aio, 0); nng_stream_recv(c1, aio); nng_aio_wait(aio); if ((rv = nng_aio_result(aio)) != 0) { nng_fatal("waiting for nng_stream_recv failed", rv); } size_t recv_count = nng_aio_count(aio); if (recv_count <= 0) { nng_fatal("Recv count was 0!", NNG_ECONNABORTED); } else { printf("received %zu bytes, message: '%s'\n", recv_count, (char *) iov.iov_buf); } // Send ELCOSE to send/recv associated wit this stream free(iov.iov_buf); // stop everything before freeing nng_stream_stop(c1); nng_stream_dialer_stop(dialer); nng_stream_free(c1); nng_aio_free(aio); nng_stream_dialer_free(dialer); return 0; } int server(const char *url) { nng_stream_listener *listener; nng_aio *aio; nng_iov iov; int rv; // Allocate dialer and aio associated with this connection if ((rv = nng_stream_listener_alloc(&listener, url)) != 0) { nng_fatal("call to nng_stream_listener_alloc failed", rv); } if ((rv = nng_aio_alloc(&aio, NULL, NULL)) != 0) { nng_fatal("call to nng_aio_alloc", rv); } nng_aio_set_timeout(aio, 5000); // 5 sec iov.iov_buf = "This is a message."; iov.iov_len = strlen(iov.iov_buf); if ((rv = nng_aio_set_iov(aio, 1, &iov)) != 0) { nng_fatal("call to nng_aio_alloc", rv); } // Connect to the socket via url provided to alloc if ((rv = nng_stream_listener_listen(listener)) != 0) { nng_fatal("call to nng_stream_listener_listen failed", rv); } nng_stream_listener_accept(listener, aio); // Wait for connection nng_aio_wait(aio); if ((rv = nng_aio_result(aio)) != 0) { nng_fatal("waiting for nng_stream_listener_accept failed", rv); } // Get the stream (connection) at position 0 nng_stream *c1 = (nng_stream *) nng_aio_get_output(aio, 0); nng_stream_send(c1, aio); nng_aio_wait(aio); if ((rv = nng_aio_result(aio)) != 0) { nng_fatal("waiting for nng_stream_recv failed", rv); } size_t sent_count = nng_aio_count(aio); if (sent_count <= 0) { nng_fatal("Recv count was 0!", NNG_ECONNABORTED); } else { printf("sent %zu bytes, message: '%s'\n", sent_count, (char *) iov.iov_buf); } // stop everything before freeing nng_stream_stop(c1); nng_stream_listener_stop(listener); nng_stream_free(c1); nng_aio_free(aio); nng_stream_listener_free(listener); return 0; }