summaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c201
1 files changed, 103 insertions, 98 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index f14c15fd..3a209441 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -47,6 +47,11 @@ typedef struct nni_sockopt {
void * data;
} nni_sockopt;
+typedef struct nni_sock_pipe_cb {
+ nng_pipe_cb cb_fn;
+ void * cb_arg;
+} nni_sock_pipe_cb;
+
struct nni_socket {
nni_list_node s_node;
nni_mtx s_mx;
@@ -81,11 +86,12 @@ struct nni_socket {
nni_list s_pipes; // active pipes
nni_list s_ctxs; // active contexts (protected by global nni_sock_lk)
- bool s_closing; // Socket is closing
- bool s_closed; // Socket closed, protected by global lock
- bool s_ctxwait; // Waiting for contexts to close.
- nng_pipe_cb s_pipe_cb; // User callback for pipe events.
- void * s_pipe_cbarg; // Argument for pipe events.
+ bool s_closing; // Socket is closing
+ bool s_closed; // Socket closed, protected by global lock
+ bool s_ctxwait; // Waiting for contexts to close.
+
+ nni_mtx s_pipe_cbs_mtx;
+ nni_sock_pipe_cb s_pipe_cbs[NNG_PIPE_EV_NUM];
};
static void nni_ctx_destroy(nni_ctx *);
@@ -431,39 +437,34 @@ nni_sock_rele(nni_sock *s)
nni_mtx_unlock(&nni_sock_lk);
}
-int
-nni_sock_pipe_start(nni_sock *s, nni_pipe *pipe)
+bool
+nni_sock_closing(nni_sock *s)
{
- nng_pipe_cb cb;
-
- NNI_ASSERT(s != NULL);
+ bool rv;
nni_mtx_lock(&s->s_mx);
- if (nni_pipe_peer(pipe) != s->s_peer_id.p_id) {
- // Peer protocol mismatch.
- nni_mtx_unlock(&s->s_mx);
- return (NNG_EPROTO);
- }
- if ((cb = s->s_pipe_cb) != NULL) {
- nng_pipe p;
- void * arg = s->s_pipe_cbarg;
- nni_mtx_unlock(&s->s_mx);
- p.id = nni_pipe_id(pipe);
- cb(p, NNG_PIPE_ADD, arg);
- if (nni_pipe_closed(pipe)) {
- return (NNG_ECLOSED);
+ rv = s->s_closing;
+ nni_mtx_unlock(&s->s_mx);
+ return (rv);
+}
+
+void
+nni_sock_run_pipe_cb(nni_sock *s, int ev, uint32_t id)
+{
+ if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
+ nng_pipe_cb cb;
+ void * arg;
+
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ cb = s->s_pipe_cbs[ev].cb_fn;
+ arg = s->s_pipe_cbs[ev].cb_arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+
+ if (cb != NULL) {
+ nng_pipe p;
+ p.id = id;
+ cb(p, ev, arg);
}
- nni_mtx_lock(&s->s_mx);
- }
- if (s->s_closing) {
- // We're closing, bail out. This has to be done after
- // we have dropped the lock above in case the sock is closed
- // while the user callback runs.
- nni_mtx_unlock(&s->s_mx);
- return (NNG_ECLOSED);
}
-
- nni_mtx_unlock(&s->s_mx);
- return (0);
}
int
@@ -486,26 +487,16 @@ nni_sock_pipe_add(nni_sock *s, nni_pipe *p)
}
void
-nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
+nni_sock_pipe_remove(nni_sock *s, nni_pipe *p)
{
- nng_pipe_cb cb;
-
- nni_mtx_lock(&sock->s_mx);
- if ((cb = sock->s_pipe_cb) != NULL) {
- void * arg = sock->s_pipe_cbarg;
- nng_pipe p;
- nni_mtx_unlock(&sock->s_mx);
- p.id = nni_pipe_id(pipe);
- cb(p, NNG_PIPE_REM, arg);
- nni_mtx_lock(&sock->s_mx);
- }
- if (nni_list_active(&sock->s_pipes, pipe)) {
- nni_list_remove(&sock->s_pipes, pipe);
+ nni_mtx_lock(&s->s_mx);
+ if (nni_list_active(&s->s_pipes, p)) {
+ nni_list_remove(&s->s_pipes, p);
}
- if (sock->s_closing && nni_list_empty(&sock->s_pipes)) {
- nni_cv_wake(&sock->s_cv);
+ if (s->s_closing && nni_list_empty(&s->s_pipes)) {
+ nni_cv_wake(&s->s_cv);
}
- nni_mtx_unlock(&sock->s_mx);
+ nni_mtx_unlock(&s->s_mx);
}
static void
@@ -532,6 +523,7 @@ nni_sock_destroy(nni_sock *s)
nni_cv_fini(&s->s_close_cv);
nni_cv_fini(&s->s_cv);
nni_mtx_fini(&s->s_mx);
+ nni_mtx_fini(&s->s_pipe_cbs_mtx);
NNI_FREE_STRUCT(s);
}
@@ -574,6 +566,7 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_pipe_sock_list_init(&s->s_pipes);
nni_ep_list_init(&s->s_eps);
nni_mtx_init(&s->s_mx);
+ nni_mtx_init(&s->s_pipe_cbs_mtx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &nni_sock_lk);
@@ -668,10 +661,10 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto)
return (rv);
}
-// nni_sock_shutdown shuts down the socket; after this point no further
-// access to the socket will function, and any threads blocked in entry
-// points will be woken (and the functions they are blocked in will return
-// NNG_ECLOSED.)
+// nni_sock_shutdown shuts down the socket; after this point no
+// further access to the socket will function, and any threads blocked
+// in entry points will be woken (and the functions they are blocked
+// in will return NNG_ECLOSED.)
int
nni_sock_shutdown(nni_sock *sock)
{
@@ -689,8 +682,8 @@ nni_sock_shutdown(nni_sock *sock)
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = true;
- // Close the EPs. This prevents new connections from forming but
- // but allows existing ones to drain.
+ // Close the EPs. This prevents new connections from forming
+ // but but allows existing ones to drain.
NNI_LIST_FOREACH (&sock->s_eps, ep) {
nni_ep_shutdown(ep);
}
@@ -728,9 +721,9 @@ nni_sock_shutdown(nni_sock *sock)
nni_mtx_lock(&sock->s_mx);
- // At this point, we've done everything we politely can to give
- // the protocol a chance to flush its write side. Now its time
- // to be a little more insistent.
+ // At this point, we've done everything we politely can to
+ // give the protocol a chance to flush its write side. Now
+ // its time to be a little more insistent.
// Close the upper queues immediately. This can happen
// safely while we hold the lock.
@@ -756,7 +749,8 @@ nni_sock_shutdown(nni_sock *sock)
nni_pipe_stop(pipe);
}
- // We have to wait for *both* endpoints and pipes to be removed.
+ // We have to wait for *both* endpoints and pipes to be
+ // removed.
while ((!nni_list_empty(&sock->s_pipes)) ||
(!nni_list_empty(&sock->s_eps))) {
nni_cv_wait(&sock->s_cv);
@@ -777,9 +771,9 @@ nni_sock_shutdown(nni_sock *sock)
}
// nni_sock_close shuts down the socket, then releases any resources
-// associated with it. It is a programmer error to reference the socket
-// after this function is called, as the pointer may reference invalid
-// memory or other objects.
+// associated with it. It is a programmer error to reference the
+// socket after this function is called, as the pointer may reference
+// invalid memory or other objects.
void
nni_sock_close(nni_sock *s)
{
@@ -789,8 +783,8 @@ nni_sock_close(nni_sock *s)
nni_mtx_lock(&nni_sock_lk);
if (s->s_closed) {
- // Some other thread called close. All we need to do is
- // drop our reference count.
+ // Some other thread called close. All we need to do
+ // is drop our reference count.
nni_mtx_unlock(&nni_sock_lk);
nni_sock_rele(s);
return;
@@ -835,8 +829,8 @@ nni_sock_closeall(void)
nni_mtx_unlock(&nni_sock_lk);
return;
}
- // Bump the reference count. The close call below will
- // drop it.
+ // Bump the reference count. The close call below
+ // will drop it.
s->s_refcnt++;
nni_list_node_remove(&s->s_node);
nni_mtx_unlock(&nni_sock_lk);
@@ -962,7 +956,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
// Protocol options. The protocol can override options that
- // the socket framework would otherwise supply, like buffer sizes.
+ // the socket framework would otherwise supply, like buffer
+ // sizes.
for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) {
if (strcmp(pso->pso_name, name) != 0) {
continue;
@@ -976,7 +971,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
return (rv);
}
- // Some options do not go down to transports. Handle them directly.
+ // Some options do not go down to transports. Handle them
+ // directly.
for (sso = nni_sock_options; sso->so_name != NULL; sso++) {
if (strcmp(sso->so_name, name) != 0) {
continue;
@@ -997,17 +993,20 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
return (rv);
}
- // Validation of transport options. This is stateless, so transports
- // should not fail to set an option later if they passed it here.
+ // Validation of transport options. This is stateless, so
+ // transports should not fail to set an option later if they
+ // passed it here.
rv = nni_tran_chkopt(name, v, sz, t);
- // Also check a few generic things. We do this if no transport
- // was found, or even if a transport rejected one of the settings.
+ // Also check a few generic things. We do this if no
+ // transport was found, or even if a transport rejected one of
+ // the settings.
if ((rv == NNG_ENOTSUP) || (rv == 0)) {
if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) {
size_t z;
- // just a sanity test on the size; it also ensures that
- // a size can be set even with no transport configured.
+ // just a sanity test on the size; it also
+ // ensures that a size can be set even with no
+ // transport configured.
rv = nni_copyin_size(&z, v, sz, 0, NNI_MAXSZ, t);
}
}
@@ -1049,9 +1048,10 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
}
- // Apply the options. Failure to set any option on any transport
- // (other than ENOTSUP) stops the operation altogether. Its
- // important that transport wide checks properly pre-validate.
+ // Apply the options. Failure to set any option on any
+ // transport (other than ENOTSUP) stops the operation
+ // altogether. Its important that transport wide checks
+ // properly pre-validate.
NNI_LIST_FOREACH (&s->s_eps, ep) {
int x;
if (optv->typ == NNI_TYPE_OPAQUE) {
@@ -1075,15 +1075,16 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t)
}
if (rv == 0) {
- // Remove and toss the old value, we are using a new one.
+ // Remove and toss the old value, we are using a new
+ // one.
if (oldv != NULL) {
nni_list_remove(&s->s_options, oldv);
nni_free_opt(oldv);
}
- // Insert our new value. This permits it to be compared
- // against later, and for new endpoints to automatically
- // receive these values,
+ // Insert our new value. This permits it to be
+ // compared against later, and for new endpoints to
+ // automatically receive these values,
nni_list_append(&s->s_options, optv);
} else {
nni_free_opt(optv);
@@ -1108,7 +1109,8 @@ nni_sock_getopt(nni_sock *s, const char *name, void *val, size_t *szp, int t)
}
// Protocol specific options. The protocol can override
- // options like the send buffer or notification descriptors this way.
+ // options like the send buffer or notification descriptors
+ // this way.
for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) {
if (strcmp(name, pso->pso_name) != 0) {
continue;
@@ -1166,12 +1168,14 @@ nni_sock_flags(nni_sock *sock)
}
void
-nni_sock_set_pipe_cb(nni_sock *sock, nng_pipe_cb cb, void *arg)
+nni_sock_set_pipe_cb(nni_sock *s, int ev, nng_pipe_cb cb, void *arg)
{
- nni_mtx_lock(&sock->s_mx);
- sock->s_pipe_cb = cb;
- sock->s_pipe_cbarg = arg;
- nni_mtx_unlock(&sock->s_mx);
+ if ((ev >= 0) && (ev < NNG_PIPE_EV_NUM)) {
+ nni_mtx_lock(&s->s_pipe_cbs_mtx);
+ s->s_pipe_cbs[ev].cb_fn = cb;
+ s->s_pipe_cbs[ev].cb_arg = arg;
+ nni_mtx_unlock(&s->s_pipe_cbs_mtx);
+ }
}
int
@@ -1185,11 +1189,12 @@ nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing)
}
nni_mtx_lock(&nni_sock_lk);
if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) {
- // We refuse a reference if either the socket is closed,
- // or the context is closed. (If the socket is closed,
- // and we are only getting the reference so we can close it,
- // then we still allow. In the case the only valid operation
- // will be to close the socket.)
+ // We refuse a reference if either the socket is
+ // closed, or the context is closed. (If the socket
+ // is closed, and we are only getting the reference so
+ // we can close it, then we still allow. In the case
+ // the only valid operation will be to close the
+ // socket.)
if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) {
rv = NNG_ECLOSED;
} else {
@@ -1224,8 +1229,8 @@ nni_ctx_rele(nni_ctx *ctx)
nni_mtx_lock(&nni_sock_lk);
ctx->c_refcnt--;
if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) {
- // Either still have an active reference, or not actually
- // closing yet.
+ // Either still have an active reference, or not
+ // actually closing yet.
nni_mtx_unlock(&nni_sock_lk);
return;
}
@@ -1288,8 +1293,8 @@ nni_ctx_open(nni_ctx **ctxp, nni_sock *sock)
nni_mtx_unlock(&nni_sock_lk);
// Paranoia, fixing a possible race in close. Don't let us
- // give back a context if the socket is being shutdown (it might
- // not have reached the "closed" state yet.)
+ // give back a context if the socket is being shutdown (it
+ // might not have reached the "closed" state yet.)
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
nni_mtx_unlock(&sock->s_mx);