diff options
| -rw-r--r-- | src/core/defs.h | 2 | ||||
| -rw-r--r-- | src/core/init.c | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 72 | ||||
| -rw-r--r-- | src/core/socket.h | 8 | ||||
| -rw-r--r-- | src/nng.c | 181 | ||||
| -rw-r--r-- | src/nng.h | 40 | ||||
| -rw-r--r-- | tests/bus.c | 23 | ||||
| -rw-r--r-- | tests/event.c | 4 | ||||
| -rw-r--r-- | tests/pipeline.c | 18 | ||||
| -rw-r--r-- | tests/pubsub.c | 8 | ||||
| -rw-r--r-- | tests/reqrep.c | 37 | ||||
| -rw-r--r-- | tests/sock.c | 4 | ||||
| -rw-r--r-- | tests/survey.c | 10 | ||||
| -rw-r--r-- | tests/tcp.c | 6 | ||||
| -rw-r--r-- | tests/trantest.h | 4 |
15 files changed, 279 insertions, 139 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index eb1a6475..ff36cb3f 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -22,13 +22,13 @@ nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, # x) // These types are common but have names shared with user space. -typedef struct nng_socket nni_sock; typedef struct nng_msg nni_msg; typedef struct nng_sockaddr nni_sockaddr; typedef struct nng_event nni_event; typedef struct nng_notify nni_notify; // These are our own names. +typedef struct nni_socket nni_sock; typedef struct nni_ep nni_ep; typedef struct nni_pipe nni_pipe; typedef struct nni_tran nni_tran; diff --git a/src/core/init.c b/src/core/init.c index e0ce3bae..48a5096c 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -37,6 +37,7 @@ nni_init_helper(void) } nni_idhash_set_limits(nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); + nni_idhash_set_limits(nni_sockets, 1, 0xffffffff, 1); nni_idlock = &nni_idlock_x; nni_tran_init(); return (0); diff --git a/src/core/socket.c b/src/core/socket.c index d9119a64..b8c5544b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -13,6 +13,13 @@ // Socket implementation. +uint32_t +nni_sock_id(nni_sock *s) +{ + return (s->s_id); +} + + // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain // the upper read and write queues. nni_msgq * @@ -29,6 +36,48 @@ nni_sock_recvq(nni_sock *s) } +int +nni_sock_hold(nni_sock **sockp, uint32_t id) +{ + int rv; + nni_sock *sock; + + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_mtx_lock(nni_idlock); + rv = nni_idhash_find(nni_sockets, id, (void **) &sock); + if (rv == 0) { + if (sock->s_closing) { + rv = NNG_ECLOSED; + } else { + nni_mtx_lock(&sock->s_mx); + sock->s_refcnt++; + nni_mtx_unlock(&sock->s_mx); + *sockp = sock; + } + } + nni_mtx_unlock(nni_idlock); + + if (rv == NNG_ENOENT) { + rv = NNG_ECLOSED; + } + return (rv); +} + + +void +nni_sock_rele(nni_sock *sock) +{ + nni_mtx_lock(&sock->s_mx); + sock->s_refcnt--; + if (sock->s_closing) { + nni_cv_wake(&sock->s_cv); + } + nni_mtx_unlock(&sock->s_mx); +} + + // XXX: don't expose the upper queues to protocols, because we need to // trap on activity in those queues! @@ -182,6 +231,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; + if ((rv = nni_init()) != 0) { + return (rv); + } if ((proto = nni_proto_find(pnum)) == NULL) { return (NNG_ENOTSUP); } @@ -257,6 +309,16 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } + nni_mtx_lock(nni_idlock); + rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock); + nni_mtx_unlock(nni_idlock); + if (rv != 0) { + goto fail; + } + + // Caller always gets the socket held. + sock->s_refcnt = 1; + if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) { goto fail; } @@ -290,6 +352,11 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + if (sock->s_id != 0) { + nni_mtx_lock(nni_idlock); + nni_idhash_remove(nni_sockets, sock->s_id); + nni_mtx_unlock(nni_idlock); + } nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_ev_fini(&sock->s_send_ev); @@ -415,6 +482,11 @@ nni_sock_close(nni_sock *sock) // Shutdown everything if not already done. This operation // is idempotent. nni_sock_shutdown(sock); + nni_mtx_lock(&sock->s_mx); + while (sock->s_refcnt > 1) { + nni_cv_wait(&sock->s_cv); + } + nni_mtx_unlock(&sock->s_mx); // At this point nothing else should be referencing us. // As with UNIX close, it is a gross error for the caller diff --git a/src/core/socket.h b/src/core/socket.h index 1743869a..2a30fae5 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -13,10 +13,13 @@ // NB: This structure is supplied here for use by the CORE. Use of this library // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. -struct nng_socket { +struct nni_socket { nni_mtx s_mx; nni_cv s_cv; + uint32_t s_id; + uint32_t s_refcnt; + nni_msgq * s_uwq; // Upper write queue nni_msgq * s_urq; // Upper read queue @@ -60,6 +63,8 @@ struct nng_socket { uint32_t s_nextid; // Next Pipe ID. }; +extern int nni_sock_hold(nni_sock **, uint32_t); +extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); extern int nni_sock_shutdown(nni_sock *); @@ -71,6 +76,7 @@ extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time); extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time); extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int); extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int); +extern uint32_t nni_sock_id(nni_sock *); // Set error codes for applications. These are only ever // called from the filter functions in protocols, and thus @@ -30,156 +30,237 @@ (void) nni_init() int -nng_open(nng_socket **s, uint16_t proto) +nng_open(nng_socket *sidp, uint16_t proto) { int rv; + nni_sock *sock; - if ((rv = nni_init()) != 0) { + if ((rv = nni_sock_open(&sock, proto)) != 0) { return (rv); } - return (nni_sock_open(s, proto)); + *sidp = nni_sock_id(sock); + nni_sock_rele(sock); + return (0); } int -nng_shutdown(nng_socket *s) +nng_shutdown(nng_socket sid) { - NNI_INIT_INT(); - return (nni_sock_shutdown(s)); + int rv; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + rv = nni_sock_shutdown(sock); + nni_sock_rele(sock); + return (rv); } -void -nng_close(nng_socket *s) +int +nng_close(nng_socket sid) { - NNI_INIT_VOID(); - nni_sock_close(s); + int rv; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + // No release -- close releases it. + nni_sock_close(sock); + return (rv); } uint16_t -nng_protocol(nng_socket *s) +nng_protocol(nng_socket sid) { - NNI_INIT_VOID(); - return (nni_sock_proto(s)); + int rv; + uint16_t pnum; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + pnum = nni_sock_proto(sock); + nni_sock_rele(sock); + return (pnum); } uint16_t -nng_peer(nng_socket *s) +nng_peer(nng_socket sid) { - NNI_INIT_VOID(); - return (nni_sock_peer(s)); + int rv; + uint16_t pnum; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + pnum = nni_sock_peer(sock); + nni_sock_rele(sock); + return (pnum); } int -nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags) +nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { nni_time expire; + int rv; + nni_sock *sock; - NNI_INIT_INT(); - if ((flags == NNG_FLAG_NONBLOCK) || (s->s_rcvtimeo == 0)) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_rcvtimeo == 0)) { expire = NNI_TIME_ZERO; - } else if (s->s_rcvtimeo < 0) { + } else if (sock->s_rcvtimeo < 0) { expire = NNI_TIME_NEVER; } else { expire = nni_clock(); - expire += s->s_rcvtimeo; + expire += sock->s_rcvtimeo; } - return (nni_sock_recvmsg(s, msgp, expire)); + rv = nni_sock_recvmsg(sock, msgp, expire); + nni_sock_rele(sock); + return (rv); } int -nng_sendmsg(nng_socket *s, nng_msg *msg, int flags) +nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { nni_time expire; + int rv; + nni_sock *sock; - NNI_INIT_INT(); - - if ((flags == NNG_FLAG_NONBLOCK) || (s->s_sndtimeo == 0)) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_sndtimeo == 0)) { expire = NNI_TIME_ZERO; - } else if (s->s_sndtimeo < 0) { + } else if (sock->s_sndtimeo < 0) { expire = NNI_TIME_NEVER; } else { expire = nni_clock(); - expire += s->s_sndtimeo; + expire += sock->s_sndtimeo; } - return (nni_sock_sendmsg(s, msg, expire)); + rv = nni_sock_sendmsg(sock, msg, expire); + nni_sock_rele(sock); + return (rv); } int -nng_dial(nng_socket *s, const char *addr, nng_endpoint *epp, int flags) +nng_dial(nng_socket sid, const char *addr, nng_endpoint *epp, int flags) { nni_ep *ep; int rv; + nni_sock *sock; - NNI_INIT_INT(); - if ((rv = nni_sock_dial(s, addr, &ep, flags)) == 0) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((rv = nni_sock_dial(sock, addr, &ep, flags)) == 0) { if (epp != NULL) { *epp = ep->ep_id; } } + nni_sock_rele(sock); return (rv); } int -nng_listen(nng_socket *s, const char *addr, nng_endpoint *epp, int flags) +nng_listen(nng_socket sid, const char *addr, nng_endpoint *epp, int flags) { nni_ep *ep; int rv; + nni_sock *sock; - NNI_INIT_INT(); - if ((rv = nni_sock_listen(s, addr, &ep, flags)) == 0) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((rv = nni_sock_listen(sock, addr, &ep, flags)) == 0) { if (epp != NULL) { *epp = ep->ep_id; } } + nni_sock_rele(sock); return (rv); } int -nng_setopt(nng_socket *s, int opt, const void *val, size_t sz) +nng_setopt(nng_socket sid, int opt, const void *val, size_t sz) { - NNI_INIT_INT(); - return (nni_sock_setopt(s, opt, val, sz)); + nni_sock *sock; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + rv = nni_sock_setopt(sock, opt, val, sz); + nni_sock_rele(sock); + return (rv); } int -nng_getopt(nng_socket *s, int opt, void *val, size_t *szp) +nng_getopt(nng_socket sid, int opt, void *val, size_t *szp) { - NNI_INIT_INT(); - return (nni_sock_getopt(s, opt, val, szp)); + nni_sock *sock; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + rv = nni_sock_getopt(sock, opt, val, szp); + nni_sock_rele(sock); + return (rv); } nng_notify * -nng_setnotify(nng_socket *sock, int mask, nng_notify_func fn, void *arg) +nng_setnotify(nng_socket sid, int mask, nng_notify_func fn, void *arg) { - NNI_INIT_VOID(); - return (nni_add_notify(sock, mask, fn, arg)); + nni_sock *sock; + nng_notify *notify; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (NULL); + } + notify = nni_add_notify(sock, mask, fn, arg); + nni_sock_rele(sock); + return (notify); } void -nng_unsetnotify(nng_socket *sock, nng_notify *notify) +nng_unsetnotify(nng_socket sid, nng_notify *notify) { - NNI_INIT_VOID(); + nni_sock *sock; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return; + } nni_rem_notify(sock, notify); + nni_sock_rele(sock); } -nng_socket * +nng_socket nng_event_socket(nng_event *ev) { - return (ev->e_sock); + // FOR NOW.... maybe evnet should contain socket Id instead? + return (nni_sock_id(ev->e_sock)); } @@ -424,7 +505,7 @@ nng_snapshot_destroy(nng_snapshot *snap) int -nng_snapshot_update(nng_socket *sock, nng_snapshot *snap) +nng_snapshot_update(nng_socket sock, nng_snapshot *snap) { // Stats TBD. NNI_INIT_INT(); @@ -467,7 +548,7 @@ nng_stat_value(nng_stat *stat) int -nng_device(nng_socket *sock1, nng_socket *sock2) +nng_device(nng_socket sock1, nng_socket sock2) { // Device TBD. NNI_INIT_INT(); @@ -42,7 +42,7 @@ extern "C" { #endif // NNG_DECL // Types common to nng. -typedef struct nng_socket nng_socket; +typedef uint32_t nng_socket; typedef uint32_t nng_endpoint; typedef uint32_t nng_pipe; typedef struct nng_msg nng_msg; @@ -54,7 +54,7 @@ typedef struct nng_stat nng_stat; // nng_open simply creates a socket of the given class. It returns an // error code on failure, or zero on success. The socket starts in cooked // mode. -NNG_DECL int nng_open(nng_socket **, uint16_t proto); +NNG_DECL int nng_open(nng_socket *, uint16_t proto); // nng_close closes the socket, terminating all activity and // closing any underlying connections and releasing any associated @@ -62,26 +62,26 @@ NNG_DECL int nng_open(nng_socket **, uint16_t proto); // error to reference the socket in any way after this is called. Likewise, // it is an error to reference any resources such as endpoints or // pipes associated with the socket. -NNG_DECL void nng_close(nng_socket *); +NNG_DECL int nng_close(nng_socket); // nng_shutdown shuts down the socket. This causes any threads doing // work for the socket or blocked in socket functions to be woken (and // return NNG_ECLOSED). The socket resources are still present, so it // is safe to call other functions; they will just return NNG_ECLOSED. // A call to nng_close is still required to release the resources. -NNG_DECL int nng_shutdown(nng_socket *); +NNG_DECL int nng_shutdown(nng_socket); // nng_protocol returns the protocol number of the socket. -NNG_DECL uint16_t nng_protocol(nng_socket *); +NNG_DECL uint16_t nng_protocol(nng_socket); // nng_peer returns the protocol number for the socket's peer. -NNG_DECL uint16_t nng_peer(nng_socket *); +NNG_DECL uint16_t nng_peer(nng_socket); // nng_setopt sets an option for a specific socket. -NNG_DECL int nng_setopt(nng_socket *, int, const void *, size_t); +NNG_DECL int nng_setopt(nng_socket, int, const void *, size_t); // nng_socket_getopt obtains the option for a socket. -NNG_DECL int nng_getopt(nng_socket *, int, void *, size_t *); +NNG_DECL int nng_getopt(nng_socket, int, void *, size_t *); // nng_notify_func is a user function that is executed upon certain // events. See below. @@ -92,14 +92,14 @@ typedef void (*nng_notify_func)(nng_event *, void *); // separate thread. Event delivery is not guaranteed, and can fail // if events occur more quickly than the callback can handle, or // if memory or other resources are scarce. -NNG_DECL nng_notify *nng_setnotify(nng_socket *, int, nng_notify_func, void *); +NNG_DECL nng_notify *nng_setnotify(nng_socket, int, nng_notify_func, void *); // nng_unsetnotify unregisters a previously registered notification callback. // Once this returns, the associated callback will not be executed any longer. // If the callback is running when this called, then it will wait until that // callback completes. (The caller of this function should not hold any // locks acqured by the callback, in order to avoid a deadlock.) -NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *); +NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *); // Event types. Sockets can have multiple different kind of events. // Note that these are edge triggered -- therefore the status indicated @@ -125,7 +125,7 @@ NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *); // Some of the values will not make sense for some event types, in which case // the value returned will be NULL. NNG_DECL int nng_event_type(nng_event *); -NNG_DECL nng_socket *nng_event_socket(nng_event *); +NNG_DECL nng_socket nng_event_socket(nng_event *); NNG_DECL nng_endpoint nng_event_endpoint(nng_event *); NNG_DECL nng_pipe nng_event_pipe(nng_event *); NNG_DECL const char *nng_event_reason(nng_event *); @@ -136,7 +136,7 @@ NNG_DECL const char *nng_event_reason(nng_event *); // endpoint pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to // indicate that a failure setting the socket up should return an error // back to the caller immediately. -NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpoint *, int); +NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int); // nng_dial creates a dialing endpoint, with no special options, and // starts it dialing. Dialers have at most one active connection at a time @@ -146,11 +146,11 @@ NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpoint *, int); // dial will be made synchronously, and a failure condition returned back // to the caller. (If the connection is dropped, it will still be // reconnected in the background -- only the initial connect is synchronous.) -NNG_DECL int nng_dial(nng_socket *, const char *, nng_endpoint *, int); +NNG_DECL int nng_dial(nng_socket, const char *, nng_endpoint *, int); // nng_endpoint_create creates an endpoint on the socket, but does not // start it either dialing or listening. -NNG_DECL int nng_endpoint_create(nng_endpoint *, nng_socket *, const char *); +NNG_DECL int nng_endpoint_create(nng_endpoint *, nng_socket, const char *); // nng_endpoint_dial starts the endpoint dialing. This is only possible if // the endpoint is not already dialing or listening. @@ -181,7 +181,7 @@ NNG_DECL const char *nng_strerror(int); // received the data. The return value will be zero to indicate that the // socket has accepted the entire data for send, or an errno to indicate // failure. The flags may include NNG_FLAG_NONBLOCK. -NNG_DECL int nng_send(nng_socket *, const void *, size_t, int); +NNG_DECL int nng_send(nng_socket, const void *, size_t, int); // nng_recv receives message data into the socket, up to the supplied size. // The actual size of the message data will be written to the value pointed @@ -190,19 +190,19 @@ NNG_DECL int nng_send(nng_socket *, const void *, size_t, int); // the caller. In that case the pointer to the allocated will be stored // instead of the data itself. The caller is responsible for freeing the // associated memory with free(). -NNG_DECL int nng_recv(nng_socket *, void *, size_t *, int); +NNG_DECL int nng_recv(nng_socket, void *, size_t *, int); // nng_sendmsg is like nng_send, but offers up a message structure, which // gives the ability to provide more control over the message, including // providing backtrace information. It also can take a message that was // obtain via nn_recvmsg, allowing for zero copy forwarding. -NNG_DECL int nng_sendmsg(nng_socket *, nng_msg *, int); +NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int); // nng_recvmsg is like nng_recv, but is used to obtain a message structure // as well as the data buffer. This can be used to obtain more information // about where the message came from, access raw headers, etc. It also // can be passed off directly to nng_sendmsg. -NNG_DECL int nng_recvmsg(nng_socket *, nng_msg **, int); +NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int); // Message API. NNG_DECL int nng_msg_alloc(nng_msg **, size_t); @@ -306,7 +306,7 @@ NNG_DECL void nng_snapshot_free(nng_snapshot *); // relevant to a particular socket. All prior values are overwritten. // It is acceptable to use the same snapshot object with different // sockets. -NNG_DECL int nng_snapshot_update(nng_socket *, nng_snapshot *); +NNG_DECL int nng_snapshot_update(nng_socket, nng_snapshot *); // nng_snapshot_next is used to iterate over the individual statistic // objects inside the snapshot. Note that the statistic object, and the @@ -356,7 +356,7 @@ NNG_DECL int64_t nng_stat_value(nng_stat *); // Device functionality. This connects two sockets together in a device, // which means that messages from one side are forwarded to the other. -NNG_DECL int nng_device(nng_socket *, nng_socket *); +NNG_DECL int nng_device(nng_socket, nng_socket); // Pollset functionality. TBD. (Note that I'd rather avoid this // altogether, because I believe that the notification mechanism I've diff --git a/tests/bus.c b/tests/bus.c index bb8531bd..6703fa0e 100644 --- a/tests/bus.c +++ b/tests/bus.c @@ -18,16 +18,14 @@ So(memcmp(nng_msg_body(m), s, strlen(s)) == 0) Main({ - int rv; const char *addr = "inproc://test"; nni_init(); Test("BUS pattern", { Convey("We can create a BUS socket", { - nng_socket *bus; + nng_socket bus; So(nng_open(&bus, NNG_PROTO_BUS) == 0); - So(bus != NULL); Reset({ nng_close(bus); @@ -40,20 +38,15 @@ Main({ }) Convey("We can create a linked BUS topology", { - nng_socket *bus1; - nng_socket *bus2; - nng_socket *bus3; + nng_socket bus1; + nng_socket bus2; + nng_socket bus3; uint64_t rtimeo; - So((rv = nng_open(&bus1, NNG_PROTO_BUS)) == 0); - So(bus1 != NULL); - - So((rv = nng_open(&bus2, NNG_PROTO_BUS)) == 0); - So(bus2 != NULL); - - So((rv = nng_open(&bus3, NNG_PROTO_BUS)) == 0); - So(bus3 != NULL); - + So(nng_open(&bus1, NNG_PROTO_BUS) == 0); + So(nng_open(&bus2, NNG_PROTO_BUS) == 0); + So(nng_open(&bus3, NNG_PROTO_BUS) == 0); + Reset({ nng_close(bus1); nng_close(bus2); diff --git a/tests/event.c b/tests/event.c index ab42203b..f6e0ab3a 100644 --- a/tests/event.c +++ b/tests/event.c @@ -73,8 +73,8 @@ Main({ Test("Event Handling", { Convey("Given a connected pair of pair sockets", { - nng_socket *sock1; - nng_socket *sock2; + nng_socket sock1; + nng_socket sock2; struct evcnt evcnt1; struct evcnt evcnt2; nng_notify *notify1; diff --git a/tests/pipeline.c b/tests/pipeline.c index 5339146f..688c48bf 100644 --- a/tests/pipeline.c +++ b/tests/pipeline.c @@ -23,7 +23,7 @@ Main({ Test("PIPELINE (PUSH/PULL) pattern", { Convey("We can create a PUSH socket", { - nng_socket *push; + nng_socket push; So(nng_open(&push, NNG_PROTO_PUSH) == 0); So(push != NULL); @@ -44,7 +44,7 @@ Main({ }) Convey("We can create a PULL socket", { - nng_socket *pull; + nng_socket pull; So(nng_open(&pull, NNG_PROTO_PULL) == 0); So(pull != NULL); @@ -66,9 +66,9 @@ Main({ }) Convey("We can create a linked PUSH/PULL pair", { - nng_socket *push = NULL; - nng_socket *pull = NULL; - nng_socket *what = NULL; + nng_socket push = NULL; + nng_socket pull = NULL; + nng_socket what = NULL; So(nng_open(&push, NNG_PROTO_PUSH) == 0); So(nng_open(&pull, NNG_PROTO_PULL) == 0); @@ -107,10 +107,10 @@ Main({ nng_msg *def; uint64_t usecs; int len; - nng_socket *push; - nng_socket *pull1; - nng_socket *pull2; - nng_socket *pull3; + nng_socket push; + nng_socket pull1; + nng_socket pull2; + nng_socket pull3; So(nng_open(&push, NNG_PROTO_PUSH) == 0); So(nng_open(&pull1, NNG_PROTO_PULL) == 0); diff --git a/tests/pubsub.c b/tests/pubsub.c index 39e8fe12..12355554 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -24,7 +24,7 @@ Main({ Test("PUB/SUB pattern", { Convey("We can create a PUB socket", { - nng_socket *pub; + nng_socket pub; So(nng_open(&pub, NNG_PROTO_PUB) == 0); So(pub != NULL); @@ -45,7 +45,7 @@ Main({ }) Convey("We can create a SUB socket", { - nng_socket *sub; + nng_socket sub; So(nng_open(&sub, NNG_PROTO_SUB) == 0); So(sub != NULL); @@ -67,8 +67,8 @@ Main({ }) Convey("We can create a linked PUB/SUB pair", { - nng_socket *pub; - nng_socket *sub; + nng_socket pub; + nng_socket sub; So((rv = nng_open(&pub, NNG_PROTO_PUB)) == 0); So(pub != NULL); diff --git a/tests/reqrep.c b/tests/reqrep.c index 9fd2a206..52269711 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -20,11 +20,9 @@ Main({ Test("REQ/REP pattern", { Convey("We can create a REQ socket", { - nng_socket *req; + nng_socket req; - rv = nng_open(&req, NNG_PROTO_REQ); - So(rv == 0); - So(req != NULL); + So(nng_open(&req, NNG_PROTO_REQ) == 0); Reset({ nng_close(req); @@ -43,10 +41,8 @@ Main({ }) Convey("We can create a REP socket", { - nng_socket *rep; - rv = nng_open(&rep, NNG_PROTO_REP); - So(rv == 0); - So(rep != NULL); + nng_socket rep; + So(nng_open(&rep, NNG_PROTO_REP) == 0); Reset({ nng_close(rep); @@ -68,27 +64,20 @@ Main({ }) Convey("We can create a linked REQ/REP pair", { - nng_socket *req; - nng_socket *rep; + nng_socket req; + nng_socket rep; - rv = nng_open(&rep, NNG_PROTO_REP); - So(rv == 0); - So(rep != NULL); + So(nng_open(&rep, NNG_PROTO_REP) == 0); - rv = nng_open(&req, NNG_PROTO_REQ); - So(rv == 0); - So(req != NULL); + So(nng_open(&req, NNG_PROTO_REQ) == 0); Reset({ nng_close(rep); nng_close(req); }) - rv = nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH); - So(rv == 0); - - rv = nng_dial(req, addr, NULL, NNG_FLAG_SYNCH); - So(rv == 0); + So(nng_listen(rep, addr, NULL, NNG_FLAG_SYNCH) == 0); + So(nng_dial(req, addr, NULL, NNG_FLAG_SYNCH) == 0); Convey("They can REQ/REP exchange", { nng_msg *ping; @@ -123,14 +112,12 @@ Main({ uint64_t retry = 100000; // 100 ms size_t len; - nng_socket *req; - nng_socket *rep; + nng_socket req; + nng_socket rep; So(nng_open(&rep, NNG_PROTO_REP) == 0); - So(rep != NULL); So(nng_open(&req, NNG_PROTO_REQ) == 0); - So(req != NULL); Reset({ nng_close(rep); diff --git a/tests/sock.c b/tests/sock.c index 123354ac..9ceb99cb 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -20,7 +20,7 @@ Main({ Convey("We are able to open a PAIR socket", { int rv; - nng_socket *sock = NULL; + nng_socket sock = NULL; rv = nng_open(&sock, NNG_PROTO_PAIR); So(rv == 0); @@ -147,7 +147,7 @@ Main({ }) Convey("We can send and receive messages", { - nng_socket *sock2 = NULL; + nng_socket sock2 = NULL; int len = 1; nng_msg *msg; uint64_t second = 1000000; diff --git a/tests/survey.c b/tests/survey.c index a749b0b1..9ce7f20b 100644 --- a/tests/survey.c +++ b/tests/survey.c @@ -24,7 +24,7 @@ Main({ Test("SURVEY pattern", { Convey("We can create a SURVEYOR socket", { - nng_socket *surv; + nng_socket surv; So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0); So(surv != NULL); @@ -55,7 +55,7 @@ Main({ }) Convey("We can create a RESPONDENT socket", { - nng_socket *resp; + nng_socket resp; So(nng_open(&resp, NNG_PROTO_RESPONDENT) == 0); So(resp != NULL); @@ -77,9 +77,9 @@ Main({ }) Convey("We can create a linked survey pair", { - nng_socket *surv; - nng_socket *resp; - nng_socket *sock; + nng_socket surv; + nng_socket resp; + nng_socket sock; uint64_t expire; So(nng_open(&surv, NNG_PROTO_SURVEYOR) == 0); diff --git a/tests/tcp.c b/tests/tcp.c index 4cc9070e..0ec69121 100644 --- a/tests/tcp.c +++ b/tests/tcp.c @@ -21,7 +21,7 @@ TestMain("TCP Transport", { Convey("We cannot connect to wild cards", { - nng_socket *s; + nng_socket s; So(nng_open(&s, NNG_PROTO_PAIR) == 0); Reset({ @@ -31,8 +31,8 @@ TestMain("TCP Transport", { }) Convey("We can bind to wild card", { - nng_socket *s1; - nng_socket *s2; + nng_socket s1; + nng_socket s2; So(nng_open(&s1, NNG_PROTO_PAIR) == 0); So(nng_open(&s2, NNG_PROTO_PAIR) == 0); Reset({ diff --git a/tests/trantest.h b/tests/trantest.h index 23081c35..fab3371d 100644 --- a/tests/trantest.h +++ b/tests/trantest.h @@ -19,8 +19,8 @@ typedef struct { char addr[NNG_MAXADDRLEN+1]; - nng_socket *reqsock; - nng_socket *repsock; + nng_socket reqsock; + nng_socket repsock; nni_tran *tran; } trantest; |
