diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-01 19:28:46 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-01 19:28:46 -0800 |
| commit | eae9af1d228cd92387a978e52b9f5032a06dc175 (patch) | |
| tree | 4e822d6208ebf4d1d7708ac3176ef348c42cac8e /src/protocol/reqrep/rep.c | |
| parent | 3a421d08f87b8b34786ac47e30552fbdb2cf4371 (diff) | |
| download | nng-eae9af1d228cd92387a978e52b9f5032a06dc175.tar.gz nng-eae9af1d228cd92387a978e52b9f5032a06dc175.tar.bz2 nng-eae9af1d228cd92387a978e52b9f5032a06dc175.zip | |
REQ/REP use new style locks. Also, enable compilation for REP.
Diffstat (limited to 'src/protocol/reqrep/rep.c')
| -rw-r--r-- | src/protocol/reqrep/rep.c | 66 |
1 files changed, 34 insertions, 32 deletions
diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index ddfcc0c7..7623bd2c 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -22,12 +22,12 @@ typedef struct nni_rep_sock nni_rep_sock; // An nni_rep_sock is our per-socket protocol private structure. struct nni_rep_sock { nni_socket * sock; - nni_mutex mx; + nni_mtx mx; nni_msgqueue * uwq; nni_msgqueue * urq; int raw; int ttl; - nni_thread * sender; + nni_thr sender; nni_idhash * pipes; char * btrace; size_t btrace_len; @@ -54,7 +54,7 @@ nni_rep_create(void **repp, nni_socket *sock) if ((rep = nni_alloc(sizeof (*rep))) == NULL) { return (NNG_ENOMEM); } - if ((rv = nni_mutex_init(&rep->mx)) != 0) { + if ((rv = nni_mtx_init(&rep->mx)) != 0) { nni_free(rep, sizeof (*rep)); return (rv); } @@ -64,7 +64,7 @@ nni_rep_create(void **repp, nni_socket *sock) rep->btrace = NULL; rep->btrace_len = 0; if ((rv = nni_idhash_create(&rep->pipes)) != 0) { - nni_mutex_fini(&rep->mx); + nni_mtx_fini(&rep->mx); nni_free(rep, sizeof (*rep)); return (rv); } @@ -72,15 +72,16 @@ nni_rep_create(void **repp, nni_socket *sock) rep->uwq = nni_socket_sendq(sock); rep->urq = nni_socket_recvq(sock); - rv = nni_thread_create(&rep->sender, nni_rep_topsender, rep); + rv = nni_thr_init(&rep->sender, nni_rep_topsender, rep); if (rv != 0) { nni_idhash_destroy(rep->pipes); - nni_mutex_fini(&rep->mx); + nni_mtx_fini(&rep->mx); nni_free(rep, sizeof (*rep)); return (rv); } *repp = rep; nni_socket_senderr(sock, NNG_ESTATE); + nni_thr_run(&rep->sender); return (0); } @@ -90,9 +91,9 @@ nni_rep_destroy(void *arg) { nni_rep_sock *rep = arg; - nni_thread_reap(rep->sender); + nni_thr_fini(&rep->sender); nni_idhash_destroy(rep->pipes); - nni_mutex_fini(&rep->mx); + nni_mtx_fini(&rep->mx); if (rep->btrace != NULL) { nni_free(rep->btrace, rep->btrace_len); } @@ -115,13 +116,13 @@ nni_rep_add_pipe(void *arg, nni_pipe *pipe, void *datap) return (rv); } - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); if ((rv = nni_idhash_insert(rep->pipes, nni_pipe_id(pipe), rp)) != 0) { nni_msgqueue_destroy(rp->sendq); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); return (rv); } - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); return (0); } @@ -132,9 +133,9 @@ nni_rep_rem_pipe(void *arg, void *data) nni_rep_sock *rep = arg; nni_rep_pipe *rp = data; - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); nni_idhash_remove(rep->pipes, nni_pipe_id(rp->pipe)); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); nni_msgqueue_destroy(rp->sendq); } @@ -176,9 +177,9 @@ nni_rep_topsender(void *arg) id += header[3]; nni_msg_trim_header(msg, 4); - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); if (nni_idhash_find(rep->pipes, id, (void **) &rp) != 0) { - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); nni_msg_free(msg); continue; } @@ -190,7 +191,7 @@ nni_rep_topsender(void *arg) // circumstances. nni_msg_free(msg); } - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); } } @@ -306,14 +307,14 @@ nni_rep_setopt(void *arg, int opt, const void *buf, size_t sz) switch (opt) { case NNG_OPT_MAXTTL: - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->ttl, buf, sz, 1, 255); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); rv = nni_setopt_int(&rep->raw, buf, sz, 0, 1); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -330,14 +331,14 @@ nni_rep_getopt(void *arg, int opt, void *buf, size_t *szp) switch (opt) { case NNG_OPT_MAXTTL: - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->ttl, buf, szp); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); break; case NNG_OPT_RAW: - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); rv = nni_getopt_int(&rep->raw, buf, szp); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); break; default: rv = NNG_ENOTSUP; @@ -352,9 +353,9 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_rep_sock *rep = arg; size_t len; - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); return (msg); } @@ -364,7 +365,7 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) // If we have a stored backtrace, append it to the header... // if we don't have a backtrace, discard the message. if (rep->btrace == NULL) { - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -377,7 +378,7 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } @@ -385,6 +386,7 @@ nni_rep_sendfilter(void *arg, nni_msg *msg) nni_free(rep->btrace, rep->btrace_len); rep->btrace = NULL; rep->btrace_len = 0; + nni_mtx_unlock(&rep->mx); return (msg); } @@ -396,9 +398,9 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) char *header; size_t len; - nni_mutex_enter(&rep->mx); + nni_mtx_lock(&rep->mx); if (rep->raw) { - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); return (msg); } @@ -410,14 +412,14 @@ nni_rep_recvfilter(void *arg, nni_msg *msg) rep->btrace_len = 0; } if ((rep->btrace = nni_alloc(len)) == NULL) { - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); nni_msg_free(msg); return (NULL); } rep->btrace_len = len; memcpy(rep->btrace, 0, len); nni_msg_trim_header(msg, len); - nni_mutex_exit(&rep->mx); + nni_mtx_unlock(&rep->mx); return (msg); } |
