aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-07 19:30:15 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-07 19:30:15 -0700
commit79dea9caa32faec3b8075e1457c9e1276032a93c (patch)
treef9569708314eb5f50f6bf75c90e2541a852f7e8c
parenta45fb3fecbe6321a03e76831019c2c0e777bda34 (diff)
downloadnng-79dea9caa32faec3b8075e1457c9e1276032a93c.tar.gz
nng-79dea9caa32faec3b8075e1457c9e1276032a93c.tar.bz2
nng-79dea9caa32faec3b8075e1457c9e1276032a93c.zip
fixes #508 nng_aio_free blocks during callback
This recycles the job structures, and so the demo seems to work. If you have sufficient concurrency, these aio structures will ultimately leak (with the level of concurrency), but for this demo that's fine. (To keep them from leaking, you'd walk the freelist and discard them just before exiting. The actual growth of the list should be quite small.) There are other strategies that could be used to avoid uncontrolled growth, but they aren't useful for our demo purposes. Also, when we have a fix for 511, we can go ahead and just do a deferred discard. As aios are used *heavily* in the code, the strategy of a deferred cache may help reduce presssure on the heap, andd might be something we want to explore for the core.
-rw-r--r--demo/rest/server.c101
1 files changed, 68 insertions, 33 deletions
diff --git a/demo/rest/server.c b/demo/rest/server.c
index 72c24cb1..45240c38 100644
--- a/demo/rest/server.c
+++ b/demo/rest/server.c
@@ -73,35 +73,66 @@ typedef enum {
} job_state;
typedef struct rest_job {
- nng_aio * http_aio; // aio from HTTP we must reply to
- nng_http_res *http_res; // HTTP response object
- job_state state; // 0 = sending, 1 = receiving
- nng_msg * msg; // request message
- nng_aio * aio; // request flow
- nng_ctx ctx; // context on the request socket
+ nng_aio * http_aio; // aio from HTTP we must reply to
+ nng_http_res * http_res; // HTTP response object
+ job_state state; // 0 = sending, 1 = receiving
+ nng_msg * msg; // request message
+ nng_aio * aio; // request flow
+ nng_ctx ctx; // context on the request socket
+ struct rest_job *next; // next on the freelist
} rest_job;
nng_socket req_sock;
-void
-rest_free_job(rest_job *job)
+// We maintain a queue of free jobs. This way we don't have to
+// deallocate them from the callback; we just reuse them.
+nng_mtx * job_lock;
+rest_job *job_freelist;
+
+static void rest_job_cb(void *arg);
+
+static void
+rest_recycle_job(rest_job *job)
{
- if (job == NULL) {
- return;
- }
- if (job->http_res != 0) {
+ if (job->http_res != NULL) {
nng_http_res_free(job->http_res);
- }
- if (job->aio != NULL) {
- nng_aio_free(job->aio);
+ job->http_res = NULL;
}
if (job->msg != NULL) {
nng_msg_free(job->msg);
+ job->msg = NULL;
}
if (nng_ctx_id(job->ctx) != 0) {
nng_ctx_close(job->ctx);
}
- free(job);
+
+ nng_mtx_lock(job_lock);
+ job->next = job_freelist;
+ job_freelist = job;
+ nng_mtx_unlock(job_lock);
+}
+
+static rest_job *
+rest_get_job(void)
+{
+ rest_job *job;
+
+ nng_mtx_lock(job_lock);
+ if ((job = job_freelist) != NULL) {
+ job_freelist = job->next;
+ nng_mtx_unlock(job_lock);
+ job->next = NULL;
+ return (job);
+ }
+ nng_mtx_unlock(job_lock);
+ if ((job = calloc(1, sizeof(*job))) == NULL) {
+ return (NULL);
+ }
+ if (nng_aio_alloc(&job->aio, rest_job_cb, job) != 0) {
+ free(job);
+ return (NULL);
+ }
+ return (job);
}
static void
@@ -118,10 +149,10 @@ rest_http_fatal(rest_job *job, const char *fmt, int rv)
nng_http_res_set_reason(res, buf);
nng_aio_set_output(aio, 0, res);
nng_aio_finish(aio, 0);
- rest_free_job(job);
+ rest_recycle_job(job);
}
-void
+static void
rest_job_cb(void *arg)
{
rest_job *job = arg;
@@ -172,7 +203,7 @@ rest_job_cb(void *arg)
job->http_aio = NULL;
job->http_res = NULL;
// We are done with the job.
- rest_free_job(job);
+ rest_recycle_job(job);
return;
default:
fatal("bad case", NNG_ESTATE);
@@ -185,22 +216,21 @@ rest_job_cb(void *arg)
void
rest_handle(nng_aio *aio)
{
- struct rest_job * job;
- nng_http_req * req = nng_aio_get_input(aio, 0);
- nng_http_conn * conn = nng_aio_get_input(aio, 2);
- const char * clen;
- size_t sz;
- nng_iov iov;
- int rv;
-
- if ((job = malloc(sizeof(*job))) == NULL) {
+ struct rest_job *job;
+ nng_http_req * req = nng_aio_get_input(aio, 0);
+ nng_http_conn * conn = nng_aio_get_input(aio, 2);
+ const char * clen;
+ size_t sz;
+ nng_iov iov;
+ int rv;
+
+ if ((job = rest_get_job()) == NULL) {
nng_aio_finish(aio, NNG_ENOMEM);
return;
}
- if (((rv = nng_aio_alloc(&job->aio, rest_job_cb, job)) != 0) ||
- ((rv = nng_http_res_alloc(&job->http_res)) != 0) ||
+ if (((rv = nng_http_res_alloc(&job->http_res)) != 0) ||
((rv = nng_ctx_open(&job->ctx, req_sock)) != 0)) {
- rest_free_job(job);
+ rest_recycle_job(job);
nng_aio_finish(aio, rv);
return;
}
@@ -213,7 +243,7 @@ rest_handle(nng_aio *aio)
nng_http_res_set_reason(res, NULL);
nng_aio_set_output(aio, 0, res);
nng_aio_finish(aio, 0);
- rest_free_job(job);
+ rest_recycle_job(job);
return;
}
// Arbitrary limit, reject jobs with no data, or more than 128KB.
@@ -227,7 +257,7 @@ rest_handle(nng_aio *aio)
nng_http_res_set_status(res, NNG_HTTP_STATUS_BAD_REQUEST);
nng_aio_set_output(aio, 0, res);
nng_aio_finish(aio, 0);
- rest_free_job(job);
+ rest_recycle_job(job);
return;
}
@@ -258,6 +288,11 @@ rest_start(uint16_t port)
nng_url * url;
int rv;
+ if ((rv = nng_mtx_alloc(&job_lock)) != 0) {
+ fatal("nng_mtx_alloc", rv);
+ }
+ job_freelist = NULL;
+
// Set up some strings, etc. We use the port number
// from the argument list.
snprintf(rest_addr, sizeof(rest_addr), REST_URL, port);