diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-16 16:27:22 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-16 16:27:22 -0800 |
| commit | ac8415c24ffea645105c3859e814843e81c97f8a (patch) | |
| tree | 7b64b4aab3de6ce5bdd69c3d5b7ead57f4a4b4e7 /src | |
| parent | b8f7236aa2928d70d9bff2e1071654982539eeda (diff) | |
| download | nng-ac8415c24ffea645105c3859e814843e81c97f8a.tar.gz nng-ac8415c24ffea645105c3859e814843e81c97f8a.tar.bz2 nng-ac8415c24ffea645105c3859e814843e81c97f8a.zip | |
Start of event framework.
This compiles correctly, but doesn't actually deliver events yet.
As part of this, I've made most of the initializables in nng
safe to tear-down if uninitialized (or set to zero e.g. via calloc).
This makes it loads easier to write the teardown on error code, since
I can deinit everything, without worrying about which things have been
initialized and which have not.
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/core/defs.h | 2 | ||||
| -rw-r--r-- | src/core/event.c | 103 | ||||
| -rw-r--r-- | src/core/event.h | 35 | ||||
| -rw-r--r-- | src/core/idhash.c | 6 | ||||
| -rw-r--r-- | src/core/message.c | 14 | ||||
| -rw-r--r-- | src/core/nng_impl.h | 1 | ||||
| -rw-r--r-- | src/core/platform.h | 8 | ||||
| -rw-r--r-- | src/core/socket.c | 75 | ||||
| -rw-r--r-- | src/core/socket.h | 8 | ||||
| -rw-r--r-- | src/core/thread.c | 9 | ||||
| -rw-r--r-- | src/core/thread.h | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_impl.h | 1 | ||||
| -rw-r--r-- | src/platform/posix/posix_thread.c | 9 | ||||
| -rw-r--r-- | src/platform/windows/win_impl.h | 1 | ||||
| -rw-r--r-- | src/platform/windows/win_thread.c | 6 | ||||
| -rw-r--r-- | src/protocol/bus/bus.c | 8 | ||||
| -rw-r--r-- | src/protocol/pair/pair.c | 8 | ||||
| -rw-r--r-- | src/protocol/pipeline/pull.c | 8 | ||||
| -rw-r--r-- | src/protocol/pipeline/push.c | 12 | ||||
| -rw-r--r-- | src/protocol/pubsub/pub.c | 10 | ||||
| -rw-r--r-- | src/protocol/pubsub/sub.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep/rep.c | 16 | ||||
| -rw-r--r-- | src/protocol/reqrep/req.c | 20 | ||||
| -rw-r--r-- | src/protocol/survey/respond.c | 16 | ||||
| -rw-r--r-- | src/protocol/survey/survey.c | 10 |
26 files changed, 308 insertions, 96 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 12078195..e70074c2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -32,6 +32,7 @@ set (NNG_SOURCES core/clock.c core/clock.h core/endpt.c + core/event.c core/idhash.c core/idhash.h core/init.c diff --git a/src/core/defs.h b/src/core/defs.h index 9ba41fae..25b131d4 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -27,6 +27,8 @@ typedef struct nng_endpoint nni_ep; typedef struct nng_pipe nni_pipe; typedef struct nng_msg nni_msg; typedef struct nng_sockaddr nni_sockaddr; +typedef struct nng_event nni_event; +typedef struct nng_notify nni_notify; // These are our own names. typedef struct nni_tran nni_tran; diff --git a/src/core/event.c b/src/core/event.c new file mode 100644 index 00000000..3f7d1143 --- /dev/null +++ b/src/core/event.c @@ -0,0 +1,103 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +#include <stdlib.h> +#include <string.h> + +int +nni_event_init(nni_event *event, int type, nni_sock *sock) +{ + int rv; + + memset(event, 0, sizeof (*event)); + if ((rv = nni_cv_init(&event->e_cv, &sock->s_mx)) != 0) { + return (rv); + } + NNI_LIST_NODE_INIT(&event->e_node); + event->e_type = type; + event->e_sock = sock; + return (0); +} + + +void +nni_event_fini(nni_event *event) +{ + nni_cv_fini(&event->e_cv); +} + + +void +nni_event_submit(nni_sock *sock, nni_event *event) +{ + // Call with socket mutex owned! + if (event->e_pending == 0) { + event->e_pending = 1; + event->e_done = 0; + nni_list_append(&sock->s_events, event); + nni_cv_wake(&sock->s_notify_cv); + } +} + + +void +nni_event_wait(nni_sock *sock, nni_event *event) +{ + // Call with socket mutex owned! + // Note that the socket mutex is dropped during the call. + while ((event->e_pending) && (!event->e_done)) { + nni_cv_wait(&event->e_cv); + } +} + + +void +nni_event_notifier(void *arg) +{ + nni_sock *sock = arg; + nni_event *event; + nni_notify *notify; + + nni_mtx_lock(&sock->s_mx); + for (;;) { + if (sock->s_closing) { + break; + } + + if ((event = nni_list_first(&sock->s_events)) != NULL) { + event->e_pending = 0; + nni_list_remove(&sock->s_events, event); + nni_mtx_unlock(&sock->s_mx); + + // Lock the notify list, it must not change. + nni_mtx_lock(&sock->s_notify_mx); + NNI_LIST_FOREACH (&sock->s_notify, notify) { + if ((notify->n_mask & event->e_type) == 0) { + // No interest. + continue; + } + notify->n_func(event, ¬ify->n_arg); + } + nni_mtx_unlock(&sock->s_notify_mx); + + nni_mtx_lock(&sock->s_mx); + // Let the event submitter know we are done, unless + // they have resubmitted. Submitters can wait on this + // lock. + event->e_done = 1; + nni_cv_wake(&event->e_cv); + continue; + } + + nni_cv_wait(&sock->s_notify_cv); + } + nni_mtx_unlock(&sock->s_mx); +} diff --git a/src/core/event.h b/src/core/event.h new file mode 100644 index 00000000..d7faad2e --- /dev/null +++ b/src/core/event.h @@ -0,0 +1,35 @@ +// +// Copyright 2017 Garrett D'Amore <garrett@damore.org> +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_EVENT_H +#define CORE_EVENT_H + +#include "core/defs.h" +#include "core/list.h" + +struct nng_event { + int e_type; + nni_sock * e_sock; + nni_ep * e_ep; + nni_pipe * e_pipe; + + int e_done; // true when notify thr is finished + int e_pending; // true if event is queued + nni_cv e_cv; // signaled when e_done is noted + nni_list_node e_node; // location on the socket list +}; + +struct nng_notify { + nni_list_node n_node; + nng_notify_func n_func; + void * n_arg; + int n_mask; +}; + +#endif // CORE_EVENT_H diff --git a/src/core/idhash.c b/src/core/idhash.c index d816ddf4..e541ef36 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -55,8 +55,10 @@ nni_idhash_create(nni_idhash **hp) void nni_idhash_destroy(nni_idhash *h) { - nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); - NNI_FREE_STRUCT(h); + if (h != NULL) { + nni_free(h->ih_entries, h->ih_cap * sizeof (nni_idhash_entry)); + NNI_FREE_STRUCT(h); + } } diff --git a/src/core/message.c b/src/core/message.c index 778e7dfc..346570c9 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -363,13 +363,15 @@ nni_msg_free(nni_msg *m) { nni_msgopt *mo; - nni_chunk_free(&m->m_header); - nni_chunk_free(&m->m_body); - while ((mo = nni_list_first(&m->m_options)) != NULL) { - nni_list_remove(&m->m_options, mo); - nni_free(mo, sizeof (*mo) + mo->mo_sz); + if (m != NULL) { + nni_chunk_free(&m->m_header); + nni_chunk_free(&m->m_body); + while ((mo = nni_list_first(&m->m_options)) != NULL) { + nni_list_remove(&m->m_options, mo); + nni_free(mo, sizeof (*mo) + mo->mo_sz); + } + NNI_FREE_STRUCT(m); } - NNI_FREE_STRUCT(m); } diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index ff25f5ad..007e20a6 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -38,6 +38,7 @@ #include "core/transport.h" // These have to come after the others - particularly transport.h +#include "core/event.h" #include "core/pipe.h" #include "core/socket.h" #include "core/endpt.h" diff --git a/src/core/platform.h b/src/core/platform.h index 8d19a7c8..7afd33ef 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -70,11 +70,11 @@ typedef struct nni_plat_ipcsock nni_plat_ipcsock; // nni_plat_mtx_init initializes a mutex structure. This may require dynamic // allocation, depending on the platform. It can return NNG_ENOMEM if that -// fails. +// fails. An initialized mutex must be distinguishable from zeroed memory. extern int nni_plat_mtx_init(nni_plat_mtx *); // nni_plat_mtx_fini destroys the mutex and releases any resources allocated -// for it's use. +// for it's use. If the mutex is zeroed memory, this should do nothing. extern void nni_plat_mtx_fini(nni_plat_mtx *); // nni_plat_mtx_lock locks the mutex. This is not recursive -- a mutex can @@ -93,9 +93,13 @@ extern int nni_plat_mtx_trylock(nni_plat_mtx *); // supplied with it, and that mutex must always be held when performing any // operations on the condition variable (other than fini.) This may require // dynamic allocation, and if so this operation may fail with NNG_ENOMEM. +// As with mutexes, an initialized mutex should be distinguishable from +// zeroed memory. extern int nni_plat_cv_init(nni_plat_cv *, nni_plat_mtx *); // nni_plat_cv_fini releases all resources associated with condition variable. +// If the cv points to just zeroed memory (was never initialized), it does +// nothing. extern void nni_plat_cv_fini(nni_plat_cv *); // nni_plat_cv_wake wakes all waiters on the condition. This should be diff --git a/src/core/socket.c b/src/core/socket.c index ae7cbc13..56347e4e 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -165,6 +165,8 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node); NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node); + NNI_LIST_INIT(&sock->s_notify, nni_notify, n_node); + NNI_LIST_INIT(&sock->s_events, nni_event, e_node); sock->s_sock_ops = *proto->proto_sock_ops; sops = &sock->s_sock_ops; @@ -192,47 +194,24 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) pops->pipe_rem = nni_sock_nullop; } - if ((rv = nni_mtx_init(&sock->s_mx)) != 0) { - NNI_FREE_STRUCT(sock); - return (rv); - } - if ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) { - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + if (((rv = nni_mtx_init(&sock->s_mx)) != 0) || + ((rv = nni_mtx_init(&sock->s_notify_mx)) != 0) || + ((rv = nni_cv_init(&sock->s_cv, &sock->s_mx)) != 0) || + ((rv = nni_cv_init(&sock->s_notify_cv, &sock->s_mx)) != 0)) { + goto fail; } if ((rv = nni_thr_init(&sock->s_reaper, nni_reaper, sock)) != 0) { - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + goto fail; } - if ((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) { - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); - } - if ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0) { - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + if (((rv = nni_msgq_init(&sock->s_uwq, 0)) != 0) || + ((rv = nni_msgq_init(&sock->s_urq, 0)) != 0)) { + goto fail; } if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) { - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_thr_fini(&sock->s_reaper); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); - return (rv); + goto fail; } // NB: If worker functions are null, then the thread initialization @@ -241,16 +220,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_worker fn = sops->sock_worker[i]; rv = nni_thr_init(&sock->s_worker_thr[i], fn, sock->s_data); if (rv != 0) { - while (i > 0) { - i--; - nni_thr_fini(&sock->s_worker_thr[i]); - } - sops->sock_fini(&sock->s_data); - nni_msgq_fini(sock->s_urq); - nni_msgq_fini(sock->s_uwq); - nni_cv_fini(&sock->s_cv); - nni_mtx_fini(&sock->s_mx); - NNI_FREE_STRUCT(sock); + goto fail; } } @@ -261,6 +231,23 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_thr_run(&sock->s_reaper); *sockp = sock; return (0); + +fail: + sock->s_sock_ops.sock_fini(sock->s_data); + + // And we need to clean up *our* state. + for (i = 0; i < NNI_MAXWORKERS; i++) { + nni_thr_fini(&sock->s_worker_thr[i]); + } + nni_thr_fini(&sock->s_reaper); + nni_msgq_fini(sock->s_urq); + nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_notify_cv); + nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_notify_mx); + nni_mtx_fini(&sock->s_mx); + NNI_FREE_STRUCT(sock); + return (rv); } @@ -388,7 +375,9 @@ nni_sock_close(nni_sock *sock) nni_thr_fini(&sock->s_reaper); nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); + nni_cv_fini(&sock->s_notify_cv); nni_cv_fini(&sock->s_cv); + nni_mtx_fini(&sock->s_notify_mx); nni_mtx_fini(&sock->s_mx); NNI_FREE_STRUCT(sock); } diff --git a/src/core/socket.h b/src/core/socket.h index 4656c5d7..d8b37252 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -37,9 +37,14 @@ struct nng_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // pipes for this socket + nni_list s_events; // pending events + nni_list s_notify; // event watchers + nni_cv s_notify_cv; // wakes notify thread + nni_mtx s_notify_mx; // protects s_notify list nni_list s_reaps; // pipes to reap nni_thr s_reaper; + nni_thr s_notify_thr; nni_thr s_worker_thr[NNI_MAXWORKERS]; int s_ep_pend; // EP dial/listen in progress @@ -48,6 +53,9 @@ struct nng_socket { int s_senderr; // Protocol state machine use int s_recverr; // Protocol state machine use + nni_event s_recv_ev; // Event for readability + nni_event s_send_ev; // Event for sendability + uint32_t s_nextid; // Next Pipe ID. }; diff --git a/src/core/thread.c b/src/core/thread.c index 50d63521..91bacdec 100644 --- a/src/core/thread.c +++ b/src/core/thread.c @@ -115,21 +115,26 @@ nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg) thr->arg = arg; if ((rv = nni_plat_mtx_init(&thr->mtx)) != 0) { + thr->done = 1; return (rv); } if ((rv = nni_plat_cv_init(&thr->cv, &thr->mtx)) != 0) { nni_plat_mtx_fini(&thr->mtx); + thr->done = 1; return (rv); } if (fn == NULL) { + thr->init = 1; thr->done = 1; return (0); } if ((rv = nni_plat_thr_init(&thr->thr, nni_thr_wrap, thr)) != 0) { + thr->done = 1; nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); return (rv); } + thr->init = 1; return (0); } @@ -160,6 +165,9 @@ nni_thr_wait(nni_thr *thr) void nni_thr_fini(nni_thr *thr) { + if (!thr->init) { + return; + } nni_plat_mtx_lock(&thr->mtx); thr->stop = 1; nni_plat_cv_wake(&thr->cv); @@ -170,6 +178,7 @@ nni_thr_fini(nni_thr *thr) if (thr->fn != NULL) { nni_plat_thr_fini(&thr->thr); } + nni_plat_cv_fini(&thr->cv); nni_plat_mtx_fini(&thr->mtx); } diff --git a/src/core/thread.h b/src/core/thread.h index 95794a6e..44f5c9e4 100644 --- a/src/core/thread.h +++ b/src/core/thread.h @@ -31,6 +31,7 @@ typedef struct { int start; int stop; int done; + int init; } nni_thr; // nni_mtx_init initializes the mutex. (Win32 programmers take note; diff --git a/src/platform/posix/posix_impl.h b/src/platform/posix/posix_impl.h index 1ab728c5..3faebfd4 100644 --- a/src/platform/posix/posix_impl.h +++ b/src/platform/posix/posix_impl.h @@ -60,6 +60,7 @@ extern int nni_plat_devnull; // open descriptor on /dev/null // elsewhere. struct nni_plat_mtx { + int init; pthread_mutex_t mtx; }; diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c index 91fbdff7..113dd9ea 100644 --- a/src/platform/posix/posix_thread.c +++ b/src/platform/posix/posix_thread.c @@ -50,6 +50,7 @@ nni_plat_mtx_init(nni_plat_mtx *mtx) nni_panic("pthread_mutex_init: %s", strerror(rv)); } } + mtx->init = 1; return (0); } @@ -59,9 +60,13 @@ nni_plat_mtx_fini(nni_plat_mtx *mtx) { int rv; + if (!mtx->init) { + return; + } if ((rv = pthread_mutex_destroy(&mtx->mtx)) != 0) { nni_panic("pthread_mutex_fini: %s", strerror(rv)); } + mtx->init = 0; } @@ -159,9 +164,13 @@ nni_plat_cv_fini(nni_plat_cv *cv) { int rv; + if (cv->mtx == NULL) { + return; + } if ((rv = pthread_cond_destroy(&cv->cv)) != 0) { nni_panic("pthread_cond_destroy: %s", strerror(rv)); } + cv->mtx = NULL; } diff --git a/src/platform/windows/win_impl.h b/src/platform/windows/win_impl.h index 9de47ce4..c6a68142 100644 --- a/src/platform/windows/win_impl.h +++ b/src/platform/windows/win_impl.h @@ -60,6 +60,7 @@ struct nni_plat_thr { struct nni_plat_mtx { CRITICAL_SECTION cs; DWORD owner; + int init; }; struct nni_plat_cv { diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c index e2568626..1b903b26 100644 --- a/src/platform/windows/win_thread.c +++ b/src/platform/windows/win_thread.c @@ -35,6 +35,7 @@ int nni_plat_mtx_init(nni_plat_mtx *mtx) { InitializeCriticalSection(&mtx->cs); + mtx->init = 1; return (0); } @@ -42,7 +43,10 @@ nni_plat_mtx_init(nni_plat_mtx *mtx) void nni_plat_mtx_fini(nni_plat_mtx *mtx) { - DeleteCriticalSection(&mtx->cs); + if (mtx->init) { + DeleteCriticalSection(&mtx->cs); + mtx->init = 0; + } } diff --git a/src/protocol/bus/bus.c b/src/protocol/bus/bus.c index 0d57f1db..986d8ed7 100644 --- a/src/protocol/bus/bus.c +++ b/src/protocol/bus/bus.c @@ -59,7 +59,9 @@ nni_bus_sock_fini(void *arg) { nni_bus_sock *psock = arg; - NNI_FREE_STRUCT(psock); + if (psock != NULL) { + NNI_FREE_STRUCT(psock); + } } @@ -91,7 +93,9 @@ nni_bus_pipe_fini(void *arg) { nni_bus_pipe *ppipe = arg; - NNI_FREE_STRUCT(ppipe); + if (ppipe != NULL) { + NNI_FREE_STRUCT(ppipe); + } } diff --git a/src/protocol/pair/pair.c b/src/protocol/pair/pair.c index 0d8b32fe..d6c2eedf 100644 --- a/src/protocol/pair/pair.c +++ b/src/protocol/pair/pair.c @@ -64,7 +64,9 @@ nni_pair_sock_fini(void *arg) { nni_pair_sock *psock = arg; - NNI_FREE_STRUCT(psock); + if (psock != NULL) { + NNI_FREE_STRUCT(psock); + } } @@ -89,7 +91,9 @@ nni_pair_pipe_fini(void *arg) { nni_pair_pipe *ppipe = arg; - NNI_FREE_STRUCT(ppipe); + if (ppipe != NULL) { + NNI_FREE_STRUCT(ppipe); + } } diff --git a/src/protocol/pipeline/pull.c b/src/protocol/pipeline/pull.c index 20beb873..ca15efef 100644 --- a/src/protocol/pipeline/pull.c +++ b/src/protocol/pipeline/pull.c @@ -50,7 +50,9 @@ nni_pull_sock_fini(void *arg) { nni_pull_sock *pull = arg; - NNI_FREE_STRUCT(pull); + if (pull != NULL) { + NNI_FREE_STRUCT(pull); + } } @@ -74,7 +76,9 @@ nni_pull_pipe_fini(void *arg) { nni_pull_pipe *pp = arg; - NNI_FREE_STRUCT(pp); + if (pp != NULL) { + NNI_FREE_STRUCT(pp); + } } diff --git a/src/protocol/pipeline/push.c b/src/protocol/pipeline/push.c index 4704a323..0f973f8a 100644 --- a/src/protocol/pipeline/push.c +++ b/src/protocol/pipeline/push.c @@ -84,8 +84,10 @@ nni_push_sock_fini(void *arg) { nni_push_sock *push = arg; - nni_cv_fini(&push->cv); - NNI_FREE_STRUCT(push); + if (push != NULL) { + nni_cv_fini(&push->cv); + NNI_FREE_STRUCT(push); + } } @@ -116,8 +118,10 @@ nni_push_pipe_fini(void *arg) { nni_push_pipe *pp = arg; - nni_msgq_fini(pp->mq); - NNI_FREE_STRUCT(pp); + if (pp != NULL) { + nni_msgq_fini(pp->mq); + NNI_FREE_STRUCT(pp); + } } diff --git a/src/protocol/pubsub/pub.c b/src/protocol/pubsub/pub.c index a167d523..5617d92c 100644 --- a/src/protocol/pubsub/pub.c +++ b/src/protocol/pubsub/pub.c @@ -62,7 +62,9 @@ nni_pub_sock_fini(void *arg) { nni_pub_sock *pub = arg; - NNI_FREE_STRUCT(pub); + if (pub != NULL) { + NNI_FREE_STRUCT(pub); + } } @@ -93,8 +95,10 @@ nni_pub_pipe_fini(void *arg) { nni_pub_pipe *pp = arg; - nni_msgq_fini(pp->sendq); - NNI_FREE_STRUCT(pp); + if (pp != NULL) { + nni_msgq_fini(pp->sendq); + NNI_FREE_STRUCT(pp); + } } diff --git a/src/protocol/pubsub/sub.c b/src/protocol/pubsub/sub.c index 9390b0d2..cd8dd87c 100644 --- a/src/protocol/pubsub/sub.c +++ b/src/protocol/pubsub/sub.c @@ -65,12 +65,14 @@ nni_sub_sock_fini(void *arg) nni_sub_sock *sub = arg; nni_sub_topic *topic; - while ((topic = nni_list_first(&sub->topics)) != NULL) { - nni_list_remove(&sub->topics, topic); - nni_free(topic->buf, topic->len); - NNI_FREE_STRUCT(topic); + if (sub != NULL) { + while ((topic = nni_list_first(&sub->topics)) != NULL) { + nni_list_remove(&sub->topics, topic); + nni_free(topic->buf, topic->len); + NNI_FREE_STRUCT(topic); + } + NNI_FREE_STRUCT(sub); } - NNI_FREE_STRUCT(sub); } @@ -94,7 +96,9 @@ nni_sub_pipe_fini(void *arg) { nni_sub_pipe *sp = arg; - NNI_FREE_STRUCT(sp); + if (sp != NULL) { + NNI_FREE_STRUCT(sp); + } } diff --git a/src/protocol/reqrep/rep.c b/src/protocol/reqrep/rep.c index 90346236..6410e6db 100644 --- a/src/protocol/reqrep/rep.c +++ b/src/protocol/reqrep/rep.c @@ -72,11 +72,13 @@ nni_rep_sock_fini(void *arg) { nni_rep_sock *rep = arg; - nni_idhash_destroy(rep->pipes); - if (rep->btrace != NULL) { - nni_free(rep->btrace, rep->btrace_len); + if (rep != NULL) { + nni_idhash_destroy(rep->pipes); + if (rep->btrace != NULL) { + nni_free(rep->btrace, rep->btrace_len); + } + NNI_FREE_STRUCT(rep); } - NNI_FREE_STRUCT(rep); } @@ -106,8 +108,10 @@ nni_rep_pipe_fini(void *arg) { nni_rep_pipe *rp = arg; - nni_msgq_fini(rp->sendq); - NNI_FREE_STRUCT(rp); + if (rp != NULL) { + nni_msgq_fini(rp->sendq); + NNI_FREE_STRUCT(rp); + } } diff --git a/src/protocol/reqrep/req.c b/src/protocol/reqrep/req.c index 73ef4969..787997e8 100644 --- a/src/protocol/reqrep/req.c +++ b/src/protocol/reqrep/req.c @@ -89,14 +89,16 @@ nni_req_sock_fini(void *arg) { nni_req_sock *req = arg; - nni_cv_fini(&req->cv); - if (req->reqmsg != NULL) { - nni_msg_free(req->reqmsg); - } - if (req->retrymsg != NULL) { - nni_msg_free(req->retrymsg); + if (req != NULL) { + nni_cv_fini(&req->cv); + if (req->reqmsg != NULL) { + nni_msg_free(req->reqmsg); + } + if (req->retrymsg != NULL) { + nni_msg_free(req->retrymsg); + } + NNI_FREE_STRUCT(req); } - NNI_FREE_STRUCT(req); } @@ -121,7 +123,9 @@ nni_req_pipe_fini(void *arg) { nni_req_pipe *rp = arg; - NNI_FREE_STRUCT(rp); + if (rp != NULL) { + NNI_FREE_STRUCT(rp); + } } diff --git a/src/protocol/survey/respond.c b/src/protocol/survey/respond.c index 0cef5a68..3a4bf52d 100644 --- a/src/protocol/survey/respond.c +++ b/src/protocol/survey/respond.c @@ -67,11 +67,13 @@ nni_resp_sock_fini(void *arg) { nni_resp_sock *psock = arg; - nni_idhash_destroy(psock->pipes); - if (psock->btrace != NULL) { - nni_free(psock->btrace, psock->btrace_len); + if (psock != NULL) { + nni_idhash_destroy(psock->pipes); + if (psock->btrace != NULL) { + nni_free(psock->btrace, psock->btrace_len); + } + NNI_FREE_STRUCT(psock); } - NNI_FREE_STRUCT(psock); } @@ -101,8 +103,10 @@ nni_resp_pipe_fini(void *arg) { nni_resp_pipe *ppipe = arg; - nni_msgq_fini(ppipe->sendq); - NNI_FREE_STRUCT(ppipe); + if (ppipe != NULL) { + nni_msgq_fini(ppipe->sendq); + NNI_FREE_STRUCT(ppipe); + } } diff --git a/src/protocol/survey/survey.c b/src/protocol/survey/survey.c index 22baf2e8..c7ce1c16 100644 --- a/src/protocol/survey/survey.c +++ b/src/protocol/survey/survey.c @@ -82,8 +82,10 @@ nni_surv_sock_fini(void *arg) { nni_surv_sock *psock = arg; - nni_cv_fini(&psock->cv); - NNI_FREE_STRUCT(psock); + if (psock != NULL) { + nni_cv_fini(&psock->cv); + NNI_FREE_STRUCT(psock); + } } @@ -114,7 +116,9 @@ nni_surv_pipe_fini(void *arg) { nni_surv_pipe *sp = arg; - NNI_FREE_STRUCT(sp); + if (sp != NULL) { + NNI_FREE_STRUCT(sp); + } } |
