diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/defs.h | 6 | ||||
| -rw-r--r-- | src/core/endpt.c | 8 | ||||
| -rw-r--r-- | src/core/message.c | 4 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 10 | ||||
| -rw-r--r-- | src/core/pipe.h | 2 | ||||
| -rw-r--r-- | src/core/platform.h | 2 | ||||
| -rw-r--r-- | src/nng.c | 3 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_synch.c | 2 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 11 |
10 files changed, 33 insertions, 17 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index a37377f7..478bb464 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -34,9 +34,9 @@ typedef struct nni_pipe_ops nni_pipe_ops; typedef struct nni_protocol nni_protocol; -typedef int nni_signal; // Used as a turnstile/wakeup channel. -typedef uint64_t nni_time; // An absolute time in microseconds. -typedef int nni_duration; // A relative time in microseconds. +typedef int nni_signal; // Turnstile/wakeup channel. +typedef uint64_t nni_time; // Absolute time (usec). +typedef int nni_duration; // Relative time (usec). // Some default timing things. #define NNI_TIME_NEVER ((nni_time) 0xffffffffull) diff --git a/src/core/endpt.c b/src/core/endpt.c index ad1a0b9a..ba557e3a 100644 --- a/src/core/endpt.c +++ b/src/core/endpt.c @@ -20,7 +20,7 @@ struct nng_endpt { void * ep_data; nni_list_node_t ep_sock_node; nni_socket * ep_sock; - char ep_addr[NNG_MAXADDRLEN]; + char ep_addr[NNG_MAXADDRLEN]; nni_thread * ep_dialer; nni_thread * ep_listener; int ep_close; @@ -77,6 +77,7 @@ nni_endpt_create(nni_endpt **epp, nni_socket *sock, const char *addr) return (0); } + void nni_endpt_destroy(nni_endpt *ep) { @@ -96,10 +97,12 @@ nni_endpt_destroy(nni_endpt *ep) nni_free(ep, sizeof (*ep)); } + void nni_endpt_close(nni_endpt *ep) { nni_pipe *pipe; + nni_mutex_enter(&ep->ep_mx); if (ep->ep_close) { nni_mutex_exit(&ep->ep_mx); @@ -115,6 +118,7 @@ nni_endpt_close(nni_endpt *ep) nni_mutex_exit(&ep->ep_mx); } + int nni_endpt_listen(nni_endpt *ep) { @@ -124,6 +128,7 @@ nni_endpt_listen(nni_endpt *ep) return (ep->ep_ops.ep_listen(ep->ep_data)); } + int nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) { @@ -145,6 +150,7 @@ nni_endpt_dial(nni_endpt *ep, nni_pipe **pp) return (0); } + int nni_endpt_accept(nni_endpt *ep, nni_pipe **pp) { diff --git a/src/core/message.c b/src/core/message.c index 39b87c25..f1e1fc99 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -186,10 +186,10 @@ nni_chunk_prepend(nni_chunk *ch, const void *data, size_t len) // We had enough capacity, just shuffle data down. memmove(ch->ch_ptr + len, ch->ch_ptr, ch->ch_len); } else if ((rv = nni_chunk_grow(ch, 0, len)) == 0) { - // We grew the chunk, so adjust. + // We grew the chunk, so adjust. ch->ch_ptr -= len; } else { - // Couldn't grow the chunk either. Error. + // Couldn't grow the chunk either. Error. return (rv); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index fd89b7ad..859c0bd9 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -20,12 +20,12 @@ struct nni_msgqueue { nni_cond mq_writeable; nni_cond mq_drained; int mq_cap; - int mq_alloc; // alloc is cap + 2... + int mq_alloc; // alloc is cap + 2... int mq_len; int mq_get; int mq_put; int mq_closed; - int mq_rwait; // readers waiting (unbuffered) + int mq_rwait; // readers waiting (unbuffered) nni_msg ** mq_msgs; }; @@ -146,12 +146,14 @@ nni_msgqueue_put_impl(nni_msgqueue *mq, nni_msg *msg, } // room in the queue? - if (mq->mq_len < mq->mq_cap) + if (mq->mq_len < mq->mq_cap) { break; + } // unbuffered, room for one, and a reader waiting? - if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) + if (mq->mq_rwait && (mq->mq_len == mq->mq_cap)) { break; + } // interrupted? if (*signal) { diff --git a/src/core/pipe.h b/src/core/pipe.h index 4bd11a17..6688f837 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -25,7 +25,7 @@ struct nng_pipe { nni_endpt * p_ep; }; - // Pipe operations that protocols use. +// Pipe operations that protocols use. extern int nni_pipe_recv(nni_pipe *, nng_msg **); extern int nni_pipe_send(nni_pipe *, nng_msg *); extern uint32_t nni_pipe_id(nni_pipe *); diff --git a/src/core/platform.h b/src/core/platform.h index 84477853..21793086 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -113,7 +113,7 @@ extern void nni_cond_wait(nni_cond *); // check the condition. It will return either NNG_ETIMEDOUT, or 0. extern int nni_cond_waituntil(nni_cond *, nni_time); -typedef struct nni_thread nni_thread; +typedef struct nni_thread nni_thread; // nni_thread_creates a thread that runs the given function. The thread // receives a single argument. @@ -48,11 +48,13 @@ nng_socket_protocol(nng_socket *s) return (nni_socket_proto(s)); } + int nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags) { int rv; nni_duration expire; + if ((rv = nni_init()) != 0) { return (rv); } @@ -65,6 +67,7 @@ nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags) return (nni_socket_recvmsg(s, msgp, expire)); } + // Misc. const char * nng_strerror(int num) diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index a07b5c92..2df96c71 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -21,7 +21,7 @@ #define PLATFORM_POSIX_ALLOC #define PLATFORM_POSIX_DEBUG #define PLATFORM_POSIX_CLOCK -#define PLATFORM_POSIX_RANDOM +#define PLATFORM_POSIX_RANDOM #define PLATFORM_POSIX_SYNCH #define PLATFORM_POSIX_THREAD diff --git a/src/platform/posix/posix_synch.c b/src/platform/posix/posix_synch.c index d4c4d6ac..e693a641 100644 --- a/src/platform/posix/posix_synch.c +++ b/src/platform/posix/posix_synch.c @@ -35,6 +35,7 @@ void nni_mutex_fini(nni_mutex *mp) { int rv; + if ((rv = pthread_mutex_destroy(&mp->mx)) != 0) { nni_panic("pthread_mutex_destroy failed: %s", strerror(rv)); } @@ -139,4 +140,5 @@ nni_cond_waituntil(nni_cond *c, uint64_t usec) return (0); } + #endif diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 24f97b81..42057f39 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -33,12 +33,14 @@ uint32_t nni_plat_nextid(void) { uint32_t id; + pthread_mutex_lock(&nni_plat_lock); id = nni_plat_next++; pthread_mutex_unlock(&nni_plat_lock); return (id); } + static void * nni_thrfunc(void *arg) { @@ -71,7 +73,7 @@ nni_thread_create(nni_thread **tp, void (*fn)(void *), void *arg) void -nni_thread_reap(nni_thread * thr) +nni_thread_reap(nni_thread *thr) { int rv; @@ -138,9 +140,9 @@ nni_plat_init(int (*helper)(void)) uint16_t xsub[3]; nni_time now = nni_clock(); - xsub[0] = (uint16_t)now; - xsub[1] = (uint16_t)(now >> 16); - xsub[2] = (uint16_t)(now >> 24); + xsub[0] = (uint16_t) now; + xsub[1] = (uint16_t) (now >> 16); + xsub[2] = (uint16_t) (now >> 24); nni_plat_next = nrand48(xsub); } #endif @@ -170,4 +172,5 @@ nni_plat_fini(void) pthread_mutex_unlock(&nni_plat_lock); } + #endif |
