From 79dea9caa32faec3b8075e1457c9e1276032a93c Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Thu, 7 Jun 2018 19:30:15 -0700 Subject: fixes #508 nng_aio_free blocks during callback This recycles the job structures, and so the demo seems to work. If you have sufficient concurrency, these aio structures will ultimately leak (with the level of concurrency), but for this demo that's fine. (To keep them from leaking, you'd walk the freelist and discard them just before exiting. The actual growth of the list should be quite small.) There are other strategies that could be used to avoid uncontrolled growth, but they aren't useful for our demo purposes. Also, when we have a fix for 511, we can go ahead and just do a deferred discard. As aios are used *heavily* in the code, the strategy of a deferred cache may help reduce presssure on the heap, andd might be something we want to explore for the core. --- demo/rest/server.c | 101 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 33 deletions(-) (limited to 'demo/rest') diff --git a/demo/rest/server.c b/demo/rest/server.c index 72c24cb1..45240c38 100644 --- a/demo/rest/server.c +++ b/demo/rest/server.c @@ -73,35 +73,66 @@ typedef enum { } job_state; typedef struct rest_job { - nng_aio * http_aio; // aio from HTTP we must reply to - nng_http_res *http_res; // HTTP response object - job_state state; // 0 = sending, 1 = receiving - nng_msg * msg; // request message - nng_aio * aio; // request flow - nng_ctx ctx; // context on the request socket + nng_aio * http_aio; // aio from HTTP we must reply to + nng_http_res * http_res; // HTTP response object + job_state state; // 0 = sending, 1 = receiving + nng_msg * msg; // request message + nng_aio * aio; // request flow + nng_ctx ctx; // context on the request socket + struct rest_job *next; // next on the freelist } rest_job; nng_socket req_sock; -void -rest_free_job(rest_job *job) +// We maintain a queue of free jobs. This way we don't have to +// deallocate them from the callback; we just reuse them. +nng_mtx * job_lock; +rest_job *job_freelist; + +static void rest_job_cb(void *arg); + +static void +rest_recycle_job(rest_job *job) { - if (job == NULL) { - return; - } - if (job->http_res != 0) { + if (job->http_res != NULL) { nng_http_res_free(job->http_res); - } - if (job->aio != NULL) { - nng_aio_free(job->aio); + job->http_res = NULL; } if (job->msg != NULL) { nng_msg_free(job->msg); + job->msg = NULL; } if (nng_ctx_id(job->ctx) != 0) { nng_ctx_close(job->ctx); } - free(job); + + nng_mtx_lock(job_lock); + job->next = job_freelist; + job_freelist = job; + nng_mtx_unlock(job_lock); +} + +static rest_job * +rest_get_job(void) +{ + rest_job *job; + + nng_mtx_lock(job_lock); + if ((job = job_freelist) != NULL) { + job_freelist = job->next; + nng_mtx_unlock(job_lock); + job->next = NULL; + return (job); + } + nng_mtx_unlock(job_lock); + if ((job = calloc(1, sizeof(*job))) == NULL) { + return (NULL); + } + if (nng_aio_alloc(&job->aio, rest_job_cb, job) != 0) { + free(job); + return (NULL); + } + return (job); } static void @@ -118,10 +149,10 @@ rest_http_fatal(rest_job *job, const char *fmt, int rv) nng_http_res_set_reason(res, buf); nng_aio_set_output(aio, 0, res); nng_aio_finish(aio, 0); - rest_free_job(job); + rest_recycle_job(job); } -void +static void rest_job_cb(void *arg) { rest_job *job = arg; @@ -172,7 +203,7 @@ rest_job_cb(void *arg) job->http_aio = NULL; job->http_res = NULL; // We are done with the job. - rest_free_job(job); + rest_recycle_job(job); return; default: fatal("bad case", NNG_ESTATE); @@ -185,22 +216,21 @@ rest_job_cb(void *arg) void rest_handle(nng_aio *aio) { - struct rest_job * job; - nng_http_req * req = nng_aio_get_input(aio, 0); - nng_http_conn * conn = nng_aio_get_input(aio, 2); - const char * clen; - size_t sz; - nng_iov iov; - int rv; - - if ((job = malloc(sizeof(*job))) == NULL) { + struct rest_job *job; + nng_http_req * req = nng_aio_get_input(aio, 0); + nng_http_conn * conn = nng_aio_get_input(aio, 2); + const char * clen; + size_t sz; + nng_iov iov; + int rv; + + if ((job = rest_get_job()) == NULL) { nng_aio_finish(aio, NNG_ENOMEM); return; } - if (((rv = nng_aio_alloc(&job->aio, rest_job_cb, job)) != 0) || - ((rv = nng_http_res_alloc(&job->http_res)) != 0) || + if (((rv = nng_http_res_alloc(&job->http_res)) != 0) || ((rv = nng_ctx_open(&job->ctx, req_sock)) != 0)) { - rest_free_job(job); + rest_recycle_job(job); nng_aio_finish(aio, rv); return; } @@ -213,7 +243,7 @@ rest_handle(nng_aio *aio) nng_http_res_set_reason(res, NULL); nng_aio_set_output(aio, 0, res); nng_aio_finish(aio, 0); - rest_free_job(job); + rest_recycle_job(job); return; } // Arbitrary limit, reject jobs with no data, or more than 128KB. @@ -227,7 +257,7 @@ rest_handle(nng_aio *aio) nng_http_res_set_status(res, NNG_HTTP_STATUS_BAD_REQUEST); nng_aio_set_output(aio, 0, res); nng_aio_finish(aio, 0); - rest_free_job(job); + rest_recycle_job(job); return; } @@ -258,6 +288,11 @@ rest_start(uint16_t port) nng_url * url; int rv; + if ((rv = nng_mtx_alloc(&job_lock)) != 0) { + fatal("nng_mtx_alloc", rv); + } + job_freelist = NULL; + // Set up some strings, etc. We use the port number // from the argument list. snprintf(rest_addr, sizeof(rest_addr), REST_URL, port); -- cgit v1.2.3-70-g09d2