diff options
| -rw-r--r-- | docs/man/nng_options.5.adoc | 14 | ||||
| -rw-r--r-- | src/compat/nanomsg/nn.c | 173 | ||||
| -rw-r--r-- | src/core/msgqueue.c | 66 | ||||
| -rw-r--r-- | src/core/msgqueue.h | 10 | ||||
| -rw-r--r-- | src/core/socket.c | 47 | ||||
| -rw-r--r-- | src/nng.h | 1 | ||||
| -rw-r--r-- | src/transport/tcp/tcp.c | 27 | ||||
| -rw-r--r-- | src/transport/tls/tls.c | 27 | ||||
| -rw-r--r-- | tests/sock.c | 6 |
9 files changed, 152 insertions, 219 deletions
diff --git a/docs/man/nng_options.5.adoc b/docs/man/nng_options.5.adoc index b5b62815..d71a7b54 100644 --- a/docs/man/nng_options.5.adoc +++ b/docs/man/nng_options.5.adoc @@ -21,7 +21,6 @@ nng_options - socket, dialer, listener, and pipe options #define NNG_OPT_SOCKNAME "socket-name" #define NNG_OPT_RAW "raw" -#define NNG_OPT_LINGER "linger" #define NNG_OPT_RECVBUF "recv-buffer" #define NNG_OPT_SENDBUF "send-buffer" #define NNG_OPT_RECVFD "recv-fd" @@ -73,19 +72,6 @@ can have multiple dialers and endpoints associated with them. An attempt has been made to include details about such restrictions in the description of the option. -[[NNG_OPT_LINGER]] -((`NNG_OPT_LINGER`)):: -(((lingering))) -(`<<nng_duration.5#,nng_duration>>`) -This is the linger time of the socket in milliseconds. -When this value is non-zero, then the system will -attempt to defer closing until it has undelivered data, or until the specified -timeout has expired. - -NOTE: Not all transports support lingering, and -so closing a socket or exiting the application can still result in the loss -of undelivered messages. - [[NNG_OPT_LOCADDR]] ((`NNG_OPT_LOCADDR`)):: (`<<nng_sockaddr.5#,nng_sockaddr>>`) diff --git a/src/compat/nanomsg/nn.c b/src/compat/nanomsg/nn.c index ee237c40..5c6bbd7d 100644 --- a/src/compat/nanomsg/nn.c +++ b/src/compat/nanomsg/nn.c @@ -21,6 +21,8 @@ #include "protocol/survey0/respond.h" #include "protocol/survey0/survey.h" +#include "core/nng_impl.h" + #include <stdio.h> #include <string.h> @@ -645,39 +647,14 @@ nn_sendmsg(int s, const struct nn_msghdr *mh, int flags) return ((int) sz); } -// options which we convert -- most of the array is initialized at run time. -static const struct { - int nnlevel; - int nnopt; - const char *opt; -} options[] = { - { NN_SOL_SOCKET, NN_LINGER, NNG_OPT_LINGER }, // review - { NN_SOL_SOCKET, NN_SNDBUF, NNG_OPT_SENDBUF }, - { NN_SOL_SOCKET, NN_RCVBUF, NNG_OPT_RECVBUF }, - { NN_SOL_SOCKET, NN_RECONNECT_IVL, NNG_OPT_RECONNMINT }, - { NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX, NNG_OPT_RECONNMAXT }, - { NN_SOL_SOCKET, NN_SNDFD, NNG_OPT_SENDFD }, - { NN_SOL_SOCKET, NN_RCVFD, NNG_OPT_RECVFD }, - { NN_SOL_SOCKET, NN_RCVMAXSIZE, NNG_OPT_RECVMAXSZ }, - { NN_SOL_SOCKET, NN_MAXTTL, NNG_OPT_MAXTTL }, - { NN_SOL_SOCKET, NN_RCVTIMEO, NNG_OPT_RECVTIMEO }, - { NN_SOL_SOCKET, NN_SNDTIMEO, NNG_OPT_SENDTIMEO }, - { NN_SOL_SOCKET, NN_SOCKET_NAME, NNG_OPT_SOCKNAME }, - { NN_REQ, NN_REQ_RESEND_IVL, NNG_OPT_REQ_RESENDTIME }, - { NN_SUB, NN_SUB_SUBSCRIBE, NNG_OPT_SUB_SUBSCRIBE }, - { NN_SUB, NN_SUB_UNSUBSCRIBE, NNG_OPT_SUB_UNSUBSCRIBE }, - { NN_SURVEYOR, NN_SURVEYOR_DEADLINE, NNG_OPT_SURVEYOR_SURVEYTIME }, - // XXX: IPV4ONLY, SNDPRIO, RCVPRIO -}; - static int -nn_getdomain(int s, void *valp, size_t *szp) +nn_getdomain(nng_socket s, void *valp, size_t *szp) { int i; bool b; int rv; - if ((rv = nng_getopt_bool((nng_socket) s, NNG_OPT_RAW, &b)) != 0) { + if ((rv = nng_getopt_bool(s, NNG_OPT_RAW, &b)) != 0) { nn_seterror(rv); return (-1); } @@ -687,25 +664,143 @@ nn_getdomain(int s, void *valp, size_t *szp) return (0); } +static int +nn_getzero(nng_socket s, void *valp, size_t *szp) +{ + int zero = 0; + NNI_ARG_UNUSED(s); + memcpy(valp, &zero, *szp < sizeof(zero) ? *szp : sizeof(zero)); + *szp = sizeof(zero); + return (0); +} + +static int +nn_setignore(nng_socket s, const void *valp, size_t sz) +{ + NNI_ARG_UNUSED(valp); + NNI_ARG_UNUSED(s); + if (sz != sizeof(int)) { + nn_seterror(NNG_EINVAL); + return (-1); + } + return (0); +} + +// options which we convert -- most of the array is initialized at run time. +static const struct { + int nnlevel; + int nnopt; + const char *opt; + int (*get)(nng_socket, void *, size_t *); + int (*set)(nng_socket, const void *, size_t); +} options[] = { + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_LINGER, + .get = nn_getzero, + .set = nn_setignore, + }, // review + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_DOMAIN, + .get = nn_getdomain, + .set = NULL, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_SNDBUF, + .opt = NNG_OPT_SENDBUF, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RCVBUF, + .opt = NNG_OPT_RECVBUF, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RECONNECT_IVL, + .opt = NNG_OPT_RECONNMINT, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RECONNECT_IVL_MAX, + .opt = NNG_OPT_RECONNMAXT, + }, + { + .nnlevel = NN_SOL_SOCKET, .nnopt = NN_SNDFD, .opt = NNG_OPT_SENDFD, + }, + { + .nnlevel = NN_SOL_SOCKET, .nnopt = NN_RCVFD, .opt = NNG_OPT_RECVFD, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RCVMAXSIZE, + .opt = NNG_OPT_RECVMAXSZ, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_MAXTTL, + .opt = NNG_OPT_MAXTTL, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_RCVTIMEO, + .opt = NNG_OPT_RECVTIMEO, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_SNDTIMEO, + .opt = NNG_OPT_SENDTIMEO, + }, + { + .nnlevel = NN_SOL_SOCKET, + .nnopt = NN_SOCKET_NAME, + .opt = NNG_OPT_SOCKNAME, + }, + { + .nnlevel = NN_REQ, + .nnopt = NN_REQ_RESEND_IVL, + .opt = NNG_OPT_REQ_RESENDTIME, + }, + { + .nnlevel = NN_SUB, + .nnopt = NN_SUB_SUBSCRIBE, + .opt = NNG_OPT_SUB_SUBSCRIBE, + }, + { + .nnlevel = NN_SUB, + .nnopt = NN_SUB_UNSUBSCRIBE, + .opt = NNG_OPT_SUB_UNSUBSCRIBE, + }, + { + .nnlevel = NN_SURVEYOR, + .nnopt = NN_SURVEYOR_DEADLINE, + .opt = NNG_OPT_SURVEYOR_SURVEYTIME, + }, + // XXX: IPV4ONLY, SNDPRIO, RCVPRIO +}; + int nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp) { - const char *name = NULL; - int rv; + const char *name = NULL; + int (*get)(nng_socket, void *, size_t *) = NULL; + int rv; for (unsigned i = 0; i < sizeof(options) / sizeof(options[0]); i++) { if ((options[i].nnlevel == nnlevel) && (options[i].nnopt == nnopt)) { + get = options[i].get; name = options[i].opt; break; } } - if (name == NULL) { - if (nnlevel == NN_SOL_SOCKET && nnopt == NN_DOMAIN) { - return (nn_getdomain(s, valp, szp)); - } + if (get != NULL) { + return (get((nng_socket) s, valp, szp)); + } + if (name == NULL) { errno = ENOPROTOOPT; return (-1); } @@ -721,16 +816,24 @@ nn_getsockopt(int s, int nnlevel, int nnopt, void *valp, size_t *szp) int nn_setsockopt(int s, int nnlevel, int nnopt, const void *valp, size_t sz) { - const char *name = NULL; - int rv; + const char *name = NULL; + int (*set)(nng_socket, const void *, size_t) = NULL; + int rv; for (unsigned i = 0; i < sizeof(options) / sizeof(options[0]); i++) { if ((options[i].nnlevel == nnlevel) && (options[i].nnopt == nnopt)) { + + set = options[i].set; name = options[i].opt; break; } } + + if (set != NULL) { + return (set((nng_socket) s, valp, sz)); + } + if (name == NULL) { return (ENOPROTOOPT); } diff --git a/src/core/msgqueue.c b/src/core/msgqueue.c index 8246279f..529e7f4d 100644 --- a/src/core/msgqueue.c +++ b/src/core/msgqueue.c @@ -17,7 +17,6 @@ struct nni_msgq { nni_mtx mq_lock; - nni_cv mq_drained; int mq_cap; int mq_alloc; // alloc is cap + 2... int mq_len; @@ -26,7 +25,6 @@ struct nni_msgq { int mq_closed; int mq_puterr; int mq_geterr; - int mq_draining; int mq_besteffort; nni_msg **mq_msgs; @@ -68,18 +66,16 @@ nni_msgq_init(nni_msgq **mqp, unsigned cap) nni_aio_list_init(&mq->mq_aio_putq); nni_aio_list_init(&mq->mq_aio_getq); nni_mtx_init(&mq->mq_lock); - nni_cv_init(&mq->mq_drained, &mq->mq_lock); - - mq->mq_cap = cap; - mq->mq_alloc = alloc; - mq->mq_len = 0; - mq->mq_get = 0; - mq->mq_put = 0; - mq->mq_closed = 0; - mq->mq_puterr = 0; - mq->mq_geterr = 0; - mq->mq_draining = 0; - *mqp = mq; + + mq->mq_cap = cap; + mq->mq_alloc = alloc; + mq->mq_len = 0; + mq->mq_get = 0; + mq->mq_put = 0; + mq->mq_closed = 0; + mq->mq_puterr = 0; + mq->mq_geterr = 0; + *mqp = mq; return (0); } @@ -92,7 +88,6 @@ nni_msgq_fini(nni_msgq *mq) if (mq == NULL) { return; } - nni_cv_fini(&mq->mq_drained); nni_mtx_fini(&mq->mq_lock); /* Free any orphaned messages. */ @@ -317,12 +312,6 @@ nni_msgq_run_notify(nni_msgq *mq) } mq->mq_cb_fn(mq->mq_cb_arg, flags); } - - if (mq->mq_draining) { - if ((mq->mq_len == 0) && !nni_list_empty(&mq->mq_aio_putq)) { - nni_cv_wake(&mq->mq_drained); - } - } } void @@ -438,40 +427,6 @@ nni_msgq_tryput(nni_msgq *mq, nni_msg *msg) } void -nni_msgq_drain(nni_msgq *mq, nni_time expire) -{ - nni_aio *aio; - - nni_mtx_lock(&mq->mq_lock); - mq->mq_closed = 1; - mq->mq_draining = 1; - while ((mq->mq_len > 0) || !nni_list_empty(&mq->mq_aio_putq)) { - if (nni_cv_until(&mq->mq_drained, expire) != 0) { - break; - } - } - - // Timed out or writers drained. - - // Complete the putq as NNG_ECLOSED. - while ((aio = nni_list_first(&mq->mq_aio_putq)) != NULL) { - nni_aio_list_remove(aio); - nni_aio_finish_error(aio, NNG_ECLOSED); - } - - // Free any remaining messages in the queue. - while (mq->mq_len > 0) { - nni_msg *msg = mq->mq_msgs[mq->mq_get++]; - if (mq->mq_get >= mq->mq_alloc) { - mq->mq_get = 0; - } - mq->mq_len--; - nni_msg_free(msg); - } - nni_mtx_unlock(&mq->mq_lock); -} - -void nni_msgq_close(nni_msgq *mq) { nni_aio *aio; @@ -585,7 +540,6 @@ nni_msgq_resize(nni_msgq *mq, int cap) out: // Wake everyone up -- we changed everything. - nni_cv_wake(&mq->mq_drained); nni_mtx_unlock(&mq->mq_lock); return (0); } diff --git a/src/core/msgqueue.h b/src/core/msgqueue.h index 9cc650e0..93a26eb6 100644 --- a/src/core/msgqueue.h +++ b/src/core/msgqueue.h @@ -17,10 +17,6 @@ // they are a thread-safe way to pass messages between subsystems. They // do have additional capabilities though. // -// A closed message queue cannot be written to, but if there are messages -// still in it and it is draining, it can be read from. This permits -// linger operations to work. -// // Message queues can be closed many times safely. // // Readers & writers in a message queue can be woken either by a timeout @@ -108,12 +104,6 @@ extern void nni_msgq_set_cb(nni_msgq *, nni_msgq_cb, void *); // are freed. Unlike closing a go channel, this operation is idempotent. extern void nni_msgq_close(nni_msgq *); -// nni_msgq_drain is like nng_msgq_close, except that reads -// against the queue are permitted for up to the time limit. The -// operation blocks until either the queue is empty, or the timeout -// has expired. Any messages still in the queue at the timeout are freed. -extern void nni_msgq_drain(nni_msgq *, nni_time); - // nni_msgq_resize resizes the message queue; messages already in the queue // will be preserved as long as there is room. Messages that are dropped // due to no room are taken from the most recent. (Oldest messages are diff --git a/src/core/socket.c b/src/core/socket.c index 67a2991c..4021c0c6 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -69,7 +69,6 @@ struct nni_socket { nni_proto_ctx_ops s_ctx_ops; // options - nni_duration s_linger; // linger time nni_duration s_sndtimeo; // send timeout nni_duration s_rcvtimeo; // receive timeout nni_duration s_reconn; // reconnect time @@ -546,7 +545,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->s_linger = 0; s->s_sndtimeo = -1; s->s_rcvtimeo = -1; s->s_closing = 0; @@ -586,8 +584,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) || ((rv = nni_msgq_init(&s->s_urq, 0)) != 0) || ((rv = s->s_sock_ops.sock_init(&s->s_data, s)) != 0) || - ((rv = nni_sock_setopt(s, NNG_OPT_LINGER, &s->s_linger, - sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) || ((rv = nni_sock_setopt(s, NNG_OPT_SENDTIMEO, &s->s_sndtimeo, sizeof(nni_duration), NNI_TYPE_DURATION)) != 0) || ((rv = nni_sock_setopt(s, NNG_OPT_RECVTIMEO, &s->s_rcvtimeo, @@ -686,7 +682,6 @@ nni_sock_shutdown(nni_sock *sock) nni_pipe *pipe; nni_ep * ep; nni_ep * nep; - nni_time linger; nni_ctx * ctx; nni_ctx * nctx; @@ -698,15 +693,6 @@ nni_sock_shutdown(nni_sock *sock) // Mark us closing, so no more EPs or changes can occur. sock->s_closing = 1; - // Special optimization; if there are no pipes connected, - // then there is no reason to linger since there's nothing that - // could possibly send this data out. - if (nni_list_first(&sock->s_pipes) == NULL) { - linger = NNI_TIME_ZERO; - } else { - linger = nni_clock() + sock->s_linger; - } - // Close the EPs. This prevents new connections from forming but // but allows existing ones to drain. NNI_LIST_FOREACH (&sock->s_eps, ep) { @@ -732,21 +718,6 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&nni_sock_lk); - // XXX: Add protocol specific drain here. This should replace the - // msgq_drain feature below. Probably msgq_drain will need to - // be changed to take an AIO for completion. - - // We drain the upper write queue. This is just like closing it, - // except that the protocol gets a chance to get the messages and - // push them down to the transport. This operation can *block* - // until the linger time has expired. We only do this for sendable - // sockets that are actually using the message queue of course. - if ((nni_sock_flags(sock) & - (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) == - NNI_PROTO_FLAG_SND) { - nni_msgq_drain(sock->s_uwq, linger); - } - // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it @@ -760,11 +731,6 @@ nni_sock_shutdown(nni_sock *sock) nni_mtx_unlock(&nni_sock_lk); nni_mtx_lock(&sock->s_mx); - while (nni_list_first(&sock->s_pipes) != NULL) { - if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { - break; - } - } // At this point, we've done everything we politely can to give // the protocol a chance to flush its write side. Now its time @@ -1029,10 +995,7 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) // Also check a few generic things. We do this if no transport // was found, or even if a transport rejected one of the settings. if ((rv == NNG_ENOTSUP) || (rv == 0)) { - if ((strcmp(name, NNG_OPT_LINGER) == 0)) { - nng_duration d; - rv = nni_copyin_ms(&d, v, sz, t); - } else if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) { + if (strcmp(name, NNG_OPT_RECVMAXSZ) == 0) { size_t z; // just a sanity test on the size; it also ensures that // a size can be set even with no transport configured. @@ -1102,14 +1065,6 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) } } - // For some options, which also have an impact on the socket - // behavior, we save a local value. Note that the transport - // will already have had a chance to veto this. - - if (strcmp(name, NNG_OPT_LINGER) == 0) { - rv = nni_copyin_ms(&s->s_linger, v, sz, t); - } - if (rv == 0) { // Remove and toss the old value, we are using a new one. if (oldv != NULL) { @@ -554,7 +554,6 @@ enum nng_flag_enum { // Options. #define NNG_OPT_SOCKNAME "socket-name" #define NNG_OPT_RAW "raw" -#define NNG_OPT_LINGER "linger" #define NNG_OPT_RECVBUF "recv-buffer" #define NNG_OPT_SENDBUF "send-buffer" #define NNG_OPT_RECVFD "recv-fd" diff --git a/src/transport/tcp/tcp.c b/src/transport/tcp/tcp.c index 80b372e9..7ea77035 100644 --- a/src/transport/tcp/tcp.c +++ b/src/transport/tcp/tcp.c @@ -48,7 +48,6 @@ struct nni_tcp_ep { nni_plat_tcp_ep *tep; uint16_t proto; size_t rcvmax; - nni_duration linger; nni_aio * aio; nni_aio * user_aio; nni_url * url; @@ -795,26 +794,6 @@ nni_tcp_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp, int typ) return (nni_copyout_size(ep->rcvmax, v, szp, typ)); } -static int -nni_tcp_ep_setopt_linger(void *arg, const void *v, size_t sz, int typ) -{ - nni_tcp_ep * ep = arg; - nng_duration val; - int rv; - - if (((rv = nni_copyin_ms(&val, v, sz, typ)) == 0) && (ep != NULL)) { - ep->linger = val; - } - return (rv); -} - -static int -nni_tcp_ep_getopt_linger(void *arg, void *v, size_t *szp, int typ) -{ - nni_tcp_ep *ep = arg; - return (nni_copyout_ms(ep->linger, v, szp, typ)); -} - static nni_tran_pipe_option nni_tcp_pipe_options[] = { { .po_name = NNG_OPT_LOCADDR, @@ -855,12 +834,6 @@ static nni_tran_ep_option nni_tcp_ep_options[] = { .eo_getopt = nni_tcp_ep_getopt_url, .eo_setopt = NULL, }, - { - .eo_name = NNG_OPT_LINGER, - .eo_type = NNI_TYPE_DURATION, - .eo_getopt = nni_tcp_ep_getopt_linger, - .eo_setopt = nni_tcp_ep_setopt_linger, - }, // terminate list { .eo_name = NULL, diff --git a/src/transport/tls/tls.c b/src/transport/tls/tls.c index 69e36609..a852ddd8 100644 --- a/src/transport/tls/tls.c +++ b/src/transport/tls/tls.c @@ -54,7 +54,6 @@ struct nni_tls_ep { nni_plat_tcp_ep *tep; uint16_t proto; size_t rcvmax; - nni_duration linger; int authmode; nni_aio * aio; nni_aio * user_aio; @@ -819,26 +818,6 @@ nni_tls_ep_getopt_recvmaxsz(void *arg, void *v, size_t *szp, int typ) } static int -nni_tls_ep_setopt_linger(void *arg, const void *v, size_t sz, int typ) -{ - nni_tls_ep * ep = arg; - nng_duration val; - int rv; - - if (((rv = nni_copyin_ms(&val, v, sz, typ)) == 0) && (ep != NULL)) { - ep->linger = val; - } - return (rv); -} - -static int -nni_tls_ep_getopt_linger(void *arg, void *v, size_t *szp, int typ) -{ - nni_tls_ep *ep = arg; - return (nni_copyout_ms(ep->linger, v, szp, typ)); -} - -static int tls_setopt_config(void *arg, const void *data, size_t sz, int typ) { nni_tls_ep * ep = arg; @@ -984,12 +963,6 @@ static nni_tran_ep_option nni_tls_ep_options[] = { .eo_setopt = nni_tls_ep_setopt_recvmaxsz, }, { - .eo_name = NNG_OPT_LINGER, - .eo_type = NNI_TYPE_DURATION, - .eo_getopt = nni_tls_ep_getopt_linger, - .eo_setopt = nni_tls_ep_setopt_linger, - }, - { .eo_name = NNG_OPT_URL, .eo_type = NNI_TYPE_STRING, .eo_getopt = nni_tls_ep_getopt_url, diff --git a/tests/sock.c b/tests/sock.c index c4de6ad4..17ea055d 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -471,10 +471,10 @@ TestMain("Socket Operations", { So(nng_listener_getopt_int(1999, NNG_OPT_RAW, &i) == NNG_ENOENT); - So(nng_dialer_getopt_ms(1999, NNG_OPT_LINGER, &t) == - NNG_ENOENT); - So(nng_listener_getopt_ms(1999, NNG_OPT_LINGER, &t) == + So(nng_dialer_getopt_ms(1999, NNG_OPT_RECVTIMEO, &t) == NNG_ENOENT); + So(nng_listener_getopt_ms( + 1999, NNG_OPT_SENDTIMEO, &t) == NNG_ENOENT); }); |
