aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/platform.h5
-rw-r--r--src/core/sockfd.c7
-rw-r--r--src/core/stream.c37
-rw-r--r--src/core/stream.h5
-rw-r--r--src/core/tcp.c37
5 files changed, 80 insertions, 11 deletions
diff --git a/src/core/platform.h b/src/core/platform.h
index d2409e5b..2f5b5c17 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -293,6 +293,7 @@ extern void nni_tcp_dialer_fini(nni_tcp_dialer *);
// Further operations on it should return NNG_ECLOSED.
// Any in-progress connection will be aborted.
extern void nni_tcp_dialer_close(nni_tcp_dialer *);
+extern void nni_tcp_dialer_stop(nni_tcp_dialer *);
// nni_tcp_dial attempts to create an outgoing connection,
// asynchronously, to the address in the aio. On success, the first (and only)
@@ -318,6 +319,10 @@ extern void nni_tcp_listener_fini(nni_tcp_listener *);
// any bound socket, and further operations will result in NNG_ECLOSED.
extern void nni_tcp_listener_close(nni_tcp_listener *);
+// nni_tcp_listener_stop is close + waits for any operations to stop,
+// so there won't be any further accepts after this.
+extern void nni_tcp_listener_stop(nni_tcp_listener *);
+
// nni_tcp_listener_listen creates the socket in listening mode, bound
// to the specified address.
extern int nni_tcp_listener_listen(nni_tcp_listener *, const nni_sockaddr *);
diff --git a/src/core/sockfd.c b/src/core/sockfd.c
index cedd7436..787a0783 100644
--- a/src/core/sockfd.c
+++ b/src/core/sockfd.c
@@ -63,6 +63,12 @@ sfd_listener_close(void *arg)
nni_mtx_unlock(&l->mtx);
}
+static void
+sfd_listener_stop(void *arg)
+{
+ sfd_listener_close(arg);
+}
+
static int
sfd_listener_listen(void *arg)
{
@@ -222,6 +228,7 @@ nni_sfd_listener_alloc(nng_stream_listener **lp, const nng_url *url)
l->ops.sl_free = sfd_listener_free;
l->ops.sl_close = sfd_listener_close;
+ l->ops.sl_stop = sfd_listener_stop;
l->ops.sl_listen = sfd_listener_listen;
l->ops.sl_accept = sfd_listener_accept;
l->ops.sl_get = sfd_listener_get;
diff --git a/src/core/stream.c b/src/core/stream.c
index ad026586..16e11eca 100644
--- a/src/core/stream.c
+++ b/src/core/stream.c
@@ -116,7 +116,17 @@ static struct {
void
nng_stream_close(nng_stream *s)
{
- s->s_close(s);
+ if (s != NULL) {
+ s->s_close(s);
+ }
+}
+
+void
+nng_stream_stop(nng_stream *s)
+{
+ if (s != NULL) {
+ s->s_stop(s);
+ }
}
void
@@ -149,7 +159,17 @@ nni_stream_get(
void
nng_stream_dialer_close(nng_stream_dialer *d)
{
- d->sd_close(d);
+ if (d != NULL) {
+ d->sd_close(d);
+ }
+}
+
+void
+nng_stream_dialer_stop(nng_stream_dialer *d)
+{
+ if (d != NULL) {
+ d->sd_stop(d);
+ }
}
void
@@ -226,8 +246,19 @@ nni_stream_dialer_set_tls(nng_stream_dialer *d, nng_tls_config *cfg)
void
nng_stream_listener_close(nng_stream_listener *l)
{
- l->sl_close(l);
+ if (l != NULL) {
+ l->sl_close(l);
+ }
+}
+
+void
+nng_stream_listener_stop(nng_stream_listener *l)
+{
+ if (l != NULL) {
+ l->sl_stop(l);
+ }
}
+
void
nng_stream_listener_free(nng_stream_listener *l)
{
diff --git a/src/core/stream.h b/src/core/stream.h
index 9ea65834..e09dc9e5 100644
--- a/src/core/stream.h
+++ b/src/core/stream.h
@@ -1,5 +1,5 @@
//
-// Copyright 2020 Staysail Systems, Inc. <info@staysail.tech>
+// Copyright 2024 Staysail Systems, Inc. <info@staysail.tech>
//
// This software is supplied under the terms of the MIT License, a
// copy of which should be located in the distribution where this
@@ -43,6 +43,7 @@ extern int nni_stream_listener_get_tls(
struct nng_stream {
void (*s_free)(void *);
void (*s_close)(void *);
+ void (*s_stop)(void *);
void (*s_recv)(void *, nng_aio *);
void (*s_send)(void *, nng_aio *);
int (*s_get)(void *, const char *, void *, size_t *, nni_type);
@@ -53,6 +54,7 @@ struct nng_stream {
struct nng_stream_dialer {
void (*sd_free)(void *);
void (*sd_close)(void *);
+ void (*sd_stop)(void *);
void (*sd_dial)(void *, nng_aio *);
int (*sd_get)(void *, const char *, void *, size_t *, nni_type);
int (*sd_set)(void *, const char *, const void *, size_t, nni_type);
@@ -65,6 +67,7 @@ struct nng_stream_dialer {
struct nng_stream_listener {
void (*sl_free)(void *);
void (*sl_close)(void *);
+ void (*sl_stop)(void *);
int (*sl_listen)(void *);
void (*sl_accept)(void *, nng_aio *);
int (*sl_get)(void *, const char *, void *, size_t *, nni_type);
diff --git a/src/core/tcp.c b/src/core/tcp.c
index 5d324a13..d2e08493 100644
--- a/src/core/tcp.c
+++ b/src/core/tcp.c
@@ -104,6 +104,8 @@ tcp_dial_con_cb(void *arg)
if ((d->closed) || ((aio = nni_list_first(&d->conaios)) == NULL)) {
if (rv == 0) {
// Make sure we discard the underlying connection.
+ nng_stream_close(nni_aio_get_output(d->conaio, 0));
+ nng_stream_stop(nni_aio_get_output(d->conaio, 0));
nng_stream_free(nni_aio_get_output(d->conaio, 0));
nni_aio_set_output(d->conaio, 0, NULL);
}
@@ -127,14 +129,26 @@ tcp_dialer_close(void *arg)
{
tcp_dialer *d = arg;
nni_aio *aio;
- nni_mtx_lock(&d->mtx);
- d->closed = true;
- while ((aio = nni_list_first(&d->conaios)) != NULL) {
- nni_list_remove(&d->conaios, aio);
- nni_aio_finish_error(aio, NNG_ECLOSED);
+
+ if (d != NULL) {
+ nni_mtx_lock(&d->mtx);
+ d->closed = true;
+ while ((aio = nni_list_first(&d->conaios)) != NULL) {
+ nni_list_remove(&d->conaios, aio);
+ nni_aio_finish_error(aio, NNG_ECLOSED);
+ }
+ nni_tcp_dialer_close(d->d);
+ nni_mtx_unlock(&d->mtx);
+ }
+}
+
+static void
+tcp_dialer_stop(void *arg)
+{
+ tcp_dialer *d = arg;
+ if (d != NULL) {
+ nni_tcp_dialer_stop(d->d);
}
- nni_tcp_dialer_close(d->d);
- nni_mtx_unlock(&d->mtx);
}
static void
@@ -222,6 +236,7 @@ tcp_dialer_alloc(tcp_dialer **dp)
d->ops.sd_close = tcp_dialer_close;
d->ops.sd_free = tcp_dialer_free;
+ d->ops.sd_stop = tcp_dialer_stop;
d->ops.sd_dial = tcp_dialer_dial;
d->ops.sd_get = tcp_dialer_get;
d->ops.sd_set = tcp_dialer_set;
@@ -276,6 +291,13 @@ tcp_listener_close(void *arg)
}
static void
+tcp_listener_stop(void *arg)
+{
+ tcp_listener *l = arg;
+ nni_tcp_listener_stop(l->l);
+}
+
+static void
tcp_listener_free(void *arg)
{
tcp_listener *l = arg;
@@ -372,6 +394,7 @@ tcp_listener_alloc_addr(nng_stream_listener **lp, const nng_sockaddr *sa)
l->ops.sl_free = tcp_listener_free;
l->ops.sl_close = tcp_listener_close;
+ l->ops.sl_stop = tcp_listener_stop;
l->ops.sl_listen = tcp_listener_listen;
l->ops.sl_accept = tcp_listener_accept;
l->ops.sl_get = tcp_listener_get;