From 5f7289e1f8e1427c9214c8e3e96ad56b1f868d53 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Wed, 4 Apr 2018 13:36:54 -0700 Subject: fixes #334 Separate context for state machines from sockets This provides context support for REQ and REP sockets. More discussion around this is in the issue itself. Optionally we would like to extend this to the surveyor pattern. Note that we specifically do not support pollable descriptors for non-default contexts, and the results of using file descriptors for polling (NNG_OPT_SENDFD and NNG_OPT_RECVFD) is undefined. In the future, it might be nice to figure out how to factor in optional use of a message queue for users who want more buffering, but we think there is little need for this with cooked mode. --- docs/man/libnng.3.adoc | 16 + docs/man/nng_ctx.5.adoc | 161 +++++++ docs/man/nng_ctx_close.3.adoc | 53 +++ docs/man/nng_ctx_getopt.3.adoc | 128 ++++++ docs/man/nng_ctx_open.3.adoc | 69 +++ docs/man/nng_ctx_recv.3.adoc | 72 +++ docs/man/nng_ctx_send.3.adoc | 86 ++++ docs/man/nng_ctx_setopt.3.adoc | 109 +++++ docs/man/nng_getopt.3.adoc | 4 +- docs/man/nng_rep.7.adoc | 21 +- docs/man/nng_req.7.adoc | 28 +- src/CMakeLists.txt | 2 + src/core/aio.c | 2 +- src/core/defs.h | 3 + src/core/idhash.h | 2 - src/core/nng_impl.h | 1 + src/core/pollable.c | 101 +++++ src/core/pollable.h | 26 ++ src/core/protocol.h | 45 +- src/core/socket.c | 354 +++++++++++++-- src/core/socket.h | 42 ++ src/nng.c | 120 +++++ src/nng.h | 42 ++ src/protocol/reqrep0/CMakeLists.txt | 6 +- src/protocol/reqrep0/rep.c | 621 +++++++++++++++++--------- src/protocol/reqrep0/req.c | 861 ++++++++++++++++++++++-------------- src/protocol/reqrep0/xrep.c | 434 ++++++++++++++++++ src/protocol/reqrep0/xreq.c | 324 ++++++++++++++ tests/CMakeLists.txt | 8 +- tests/reqctx.c | 258 +++++++++++ tests/reqpoll.c | 148 +++++++ tests/reqrep.c | 84 +++- 32 files changed, 3631 insertions(+), 600 deletions(-) create mode 100644 docs/man/nng_ctx.5.adoc create mode 100644 docs/man/nng_ctx_close.3.adoc create mode 100644 docs/man/nng_ctx_getopt.3.adoc create mode 100644 docs/man/nng_ctx_open.3.adoc create mode 100644 docs/man/nng_ctx_recv.3.adoc create mode 100644 docs/man/nng_ctx_send.3.adoc create mode 100644 docs/man/nng_ctx_setopt.3.adoc create mode 100644 src/core/pollable.c create mode 100644 src/core/pollable.h create mode 100644 src/protocol/reqrep0/xrep.c create mode 100644 src/protocol/reqrep0/xreq.c create mode 100644 tests/reqctx.c create mode 100644 tests/reqpoll.c diff --git a/docs/man/libnng.3.adoc b/docs/man/libnng.3.adoc index feab3ebf..8df29861 100644 --- a/docs/man/libnng.3.adoc +++ b/docs/man/libnng.3.adoc @@ -195,6 +195,22 @@ The following functions are used to register a transport for use. | <>|register ZeroTier transport |=== +=== Protocol Contexts + +The following functions are useful to separate the protocol processing +from a socket object, into a separate context. +This can allow multiple contexts to be created on a single socket for +concurrent applications. + +|=== +|<>|close context +|<>|get context option +|<>|create context +|<>|receive message using context asynchronously +|<>|send message using context asynchronously +|<>|set context option +|=== + === URL Object Common functionality is supplied for parsing and handling diff --git a/docs/man/nng_ctx.5.adoc b/docs/man/nng_ctx.5.adoc new file mode 100644 index 00000000..c59a4a4c --- /dev/null +++ b/docs/man/nng_ctx.5.adoc @@ -0,0 +1,161 @@ += nng_ctx(5) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx - protocol context + +== SYNOPSIS + +[source, c] +---- +#include + +typedef uint32_t nng_ctx +---- + +== DESCRIPTION + +An `nng_ctx`(((context))) is a handle to an underlying "`context`" object, +which keeps the protocol state for some stateful protocols. +The purpose of a separate context object is to permit applications to +share a single socket, with its various underlying +<>, +<>, +and <>, +while still benefiting from separate state tracking. + +For example, a <> context will contain the request ID +of any sent request, a timer to retry the request on failure, and so forth. +A separate context on the same socket can have similar data, but corresponding +to a completely different request. + +All contexts share the same socket, and so some options, as well as the +underlying transport details, will be common to all contexts on that socket. + +NOTE: Not every protocol supports separate contexts. +See the protocol-specific documentation for further details about whether +contexts are supported, and details about what options are supported for +contexts. + +Protocols that make use of contexts will also have a "`default`" context +that is used when the socket global operations are used. +Operations using the global context will generally not interfere with +any other contexts, except that certain socket options may affect socket +global behavior. + +(((concurrent)))(((raw mode))) +Historically, applications wanting to use a stateful protocol concurrently +would have to resort to <> sockets, which bypasses +much of the various protocol handling, leaving it to up to the application +to do so. +Contexts make it possible to still benefit from advanced protocol handling, +including timeouts, retries, and matching requests to responses, while doing so +concurrently. + +NOTE: <> sockets do not support contexts, since +there is generally no state tracked for them, and thus contexts make no sense. + +TIP: Contexts are an excellent mechanism to use when building concurrent +applications, and should be used in lieu of +<> sockets when possible. + +WARNING: Use of file descriptor polling (with descriptors +obtained using the +<> or +<> options) while contexts +are in use on the same socket is not supported, and may lead to unpredictable +behavior. +These asynchronous methods should not be mixed on the same socket. + +== EXAMPLE + +The following program fragment demonstrates the use of contexts to implement +a concurrent <> service that simply echos messages back +to the sender. + +[source, c] +---- + +struct echo_context { + nng_ctx *ctx; + nng_aio *aio; + enum { INIT, RECV, SEND } state; +}; + +void +echo(void *arg) +{ + struct echo_context *ec = arg; + + switch (ec->state) { + case INIT: + ec->state = RECV; + nng_ctx_recv(ec->ctx, ec->aio); + return; + case RECV: + if (nng_aio_result(ec->aio) != 0) { + // ... handle error + } + // We reuse the message on the ec->aio + ec->state = SEND; + nng_ctx_send(ec->ctx, ec->aio); + return; + case SEND: + if (nng_aio_result(ec->aio) != 0) { + // ... handle error + } + ec->state = RECV; + nng_ctx_recv(ec->ctx, ec->aio); + return; + } +} +---- + +Given the above fragment, the following example shows setting up the +service. It assumes that the <> has already been +created and any transports set up as well with functions such as +<> +or <>. + +[source,c] +---- +#define CONCURRENCY 1024 + +echo_context ecs[CONCURRENCY]; + +void +start_echo_service(nng_socket rep_socket) +{ + for (int i = 0; i < CONCURRENCY; i++) { + // error checks elided for clarity + nng_ctx_open(ec[i].ctx, rep_socket) + nng_aio_alloc(ec[i].aio, echo, &e[i]); + ec[i].state = INIT; + echo(&ec[i]); // start it running + } +} +---- + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_ctx_close.3.adoc b/docs/man/nng_ctx_close.3.adoc new file mode 100644 index 00000000..16d9432e --- /dev/null +++ b/docs/man/nng_ctx_close.3.adoc @@ -0,0 +1,53 @@ += nng_ctx_close(3) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx_close - close context + +== SYNOPSIS + +[source, c] +---- +#include + +int nng_ctx_close(nng_ctx ctx); +---- + +== DESCRIPTION + +The `nng_ctx_close()` function closes the context _ctx_. +Messages that have been submitted for sending may be flushed or delivered, +depending upon the transport and the setting of the +<> option. + +Further attempts to use the context after this call returns will result +in `NNG_ECLOSED`. +Threads waiting for operations on the context when this +call is executed may also return with an `NNG_ECLOSED` result. + +NOTE: Closing the socket associated with _ctx_ +(using <>) also closes this context. + +== RETURN VALUES + +This function returns 0 on success, and non-zero otherwise. + +== ERRORS + +`NNG_ECLOSED`:: The context _ctx_ is already closed or was never opened. + +== SEE ALSO + +<>, +<>, +<>, +<> diff --git a/docs/man/nng_ctx_getopt.3.adoc b/docs/man/nng_ctx_getopt.3.adoc new file mode 100644 index 00000000..b20ea3b4 --- /dev/null +++ b/docs/man/nng_ctx_getopt.3.adoc @@ -0,0 +1,128 @@ += nng_ctx_getopt(3) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx_getopt - get context option + +== SYNOPSIS + +[source, c] +---- +#include + +int nng_ctx_getopt(nng_ctx ctx, const char *opt, void *val, size_t *valszp); + +int nng_ctx_getopt_bool(nng_ctx ctx, const char *opt, bool *bvalp); + +int nng_ctx_getopt_int(nng_ctx ctx, const char *opt, int *ivalp); + +int nng_ctx_getopt_ms(nng_ctx ctx, const char *opt, nng_duration *durp); + +int nng_ctx_getopt_size(nng_ctx ctx, const char *opt, size_t *zp); + +int nng_ctx_getopt_string(nng_ctx ctx, const char *opt, char **strp); + +int nng_ctx_getopt_uint64(nng_ctx ctx, const char *opt, uint64_t *u64p); +---- + +== DESCRIPTION + +(((options, context))) +The `nng_ctx_getopt()` functions are used to retrieve option values for +the <> _ctx_. +The actual options that may be retrieved in this way vary. +A number of them are documented in <>. + +NOTE: Context options are protocol specific. +The details will be documented with the protocol. + +=== Forms + +In all of these forms, the option _opt_ is retrieved from the context _ctx_. +The forms vary based on the type of the option they take. + +The details of the type, size, and semantics of the option will depend +on the actual option, and will be documented with the option itself. + +TIP: Generally, it will be easier to use one of the typed forms instead. + +==== `nng_ctx_getopt()` +This function is untyped and can be used to retrieve the value of any option. +The caller must store a pointer to a buffer to receive the value in _val_, +and the size of the buffer shall be stored at the location referenced by +_valszp_. + +When the function returns, the actual size of the data copied (or that +would have been copied if sufficient space were present) is stored at +the location referened by _valszp_. +If the caller's buffer is not large enough to hold the entire object, +then the copy is truncated. +Therefore the caller should check for truncation by verifyng that the +returned size in _valszp_ does not exceed the original buffer size. + +It is acceptable to pass `NULL` for _val_ if the value in _valszp_ is zero. +This can be used to determine the size of the buffer needed to receive +the object. + +==== `nng_ctx_getopt_bool()` +This function is for options which take a boolean (`bool`). +The value will be stored at _ivalp_. + +==== `nng_ctx_getopt_int()` +This function is for options which take an integer (`int`). +The value will be stored at _ivalp_. + +==== `nng_ctx_getopt_ms()` +This function is used to retrieve time <> +(such as timeouts), stored in _durp_ as a number of milliseconds. +(The special value ((`NNG_DUR_INFINITE`)) means an infinite amount of time, and +the special value ((`NNG_DUR_DEFAULT`)) means a context-specific default.) + +==== `nng_ctx_getopt_size()` +This function is used to retrieve a size into the pointer _zp_, +typically for buffer sizes, message maximum sizes, and similar options. + +==== `nng_ctx_getopt_string()` +This function is used to retrieve a string into _strp_. +This string is created from the source using <> +and consequently must be freed by the caller using +<> when it is no longer needed. + +==== `nng_ctx_getopt_uint64()` +This function is used to retrieve a 64-bit unsigned value into the value +referenced by _u64p_. +This is typically used for options related to identifiers, network +numbers, and similar. + +== RETURN VALUES + +These functions return 0 on success, and non-zero otherwise. + +== ERRORS + +`NNG_EBADTYPE`:: Incorrect type for option. +`NNG_ECLOSED`:: Parameter _s_ does not refer to an open socket. +`NNG_EINVAL`:: Size of destination _val_ too small for object. +`NNG_ENOMEM`:: Insufficient memory exists. +`NNG_ENOTSUP`:: The option _opt_ is not supported. +`NNG_EWRITEONLY`:: The option _opt_ is write-only. + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_ctx_open.3.adoc b/docs/man/nng_ctx_open.3.adoc new file mode 100644 index 00000000..86686417 --- /dev/null +++ b/docs/man/nng_ctx_open.3.adoc @@ -0,0 +1,69 @@ += nng_ctx_open(3) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx_open - create context + +== SYNOPSIS + +[source,c] +---- +#include + +int nng_ctx_open(nng_ctx *ctxp, nng_socket s); +---- + +== DESCRIPTION + +The `nng_ctx0_open()` function creates a separate ((context)) to be used with +the socket _s_, +and returns it at the location pointed by _ctxp_. + +NOTE: Not every protocol supports creation of separate contexts. + +Contexts allow the independent and concurrent use of stateful operations +using the same socket. +For example, two different contexts created on a <> +socket can each receive requests, and send replies to them, without any +regard to or interference with each other. + +(((raw mode))) +TIP: Using contexts is an excellent way to write simpler concurrent +applications, while retaining the benefits of the protocol-specific +advanced processing, avoiding the need to bypass that with +<> sockets. + +NOTE: Use of contexts with <> sockets is +nonsensical, and not supported. + +== RETURN VALUES + +This function returns 0 on success, and non-zero otherwise. + +== ERRORS + +`NNG_ENOMEM`:: Insufficient memory is available. +`NNG_ENOTSUP`:: The protocol does not support separate contexts, or the socket was opened in raw mode. + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_ctx_recv.3.adoc b/docs/man/nng_ctx_recv.3.adoc new file mode 100644 index 00000000..d4e84b00 --- /dev/null +++ b/docs/man/nng_ctx_recv.3.adoc @@ -0,0 +1,72 @@ += nng_ctx_recv(3) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx_recv - receive message using context asynchronously + +== SYNOPSIS + +[source, c] +---- +#include + +void nng_ctx_recv(nng_ctx ctx, nng_aio *aio); +---- + +== DESCRIPTION + +The `nng_ctx_recv()` receives a <> using the +<> _s_ asynchronously. + +When a message is successfully received by the context, it is +stored in the _aio_ by an internal call equivalent to +<>, then the completion +callback on the _aio_ is executed. +In this case, <> will +return zero. +The callback function is responsible for retrieving the message +and disposing of it appropriately. + +IMPORTANT: Failing to accept and dispose of messages in this +case can lead to memory leaks. + +If for some reason the asynchronous receive cannot be completed +successfully (including by being canceled or timing out), then +the callback will still be executed, +but <> will be non-zero. + +NOTE: The semantics of what receiving a message means varies from protocol to +protocol, so examination of the protocol documentation is encouraged. + +== RETURN VALUES + +None. (The operation completes asynchronously.) + +== ERRORS + +`NNG_ECANCELED`:: The operation was aborted. +`NNG_ECLOSED`:: The context _ctx_ is not open. +`NNG_ENOMEM`:: Insufficient memory is available. +`NNG_ENOTSUP`:: The protocol for context _ctx_ does not support receiving. +`NNG_ESTATE`:: The context _ctx_ cannot receive data in this state. +`NNG_ETIMEDOUT`:: The receive timeout expired. + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_ctx_send.3.adoc b/docs/man/nng_ctx_send.3.adoc new file mode 100644 index 00000000..95c1a118 --- /dev/null +++ b/docs/man/nng_ctx_send.3.adoc @@ -0,0 +1,86 @@ += nng_ctx_send(3) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx_send - send message using context asynchronously + +== SYNOPSIS + +[source, c] +---- +#include + +void nng_ctx_send(nng_ctx ctx, nng_aio *aio); +---- + +== DESCRIPTION + +The `nng_ctx_send()` sends a <> using the +<> _ctx_ asynchronously. + +The message to send must have previously been set on the _aio_ +using the <> function. +The function assumes "`ownership`" of the message. + +If the message was successfully queued for delivery to the socket, +then the _aio_ will be completed, and <> +will return zero. +In this case the socket will dispose of the message when it is finished with it. + +NOTE: The operation will be "`completed`", and the callback associated +with the _aio_ executed, as soon as the socket accepts the message +for sending. +This does _not_ indicate that the message was actually delivered, as it +may still be buffered in the sending socket, buffered in the receiving +socket, or in flight over physical media. + +If the operation fails for any reason (including cancellation or timeout), +then the _aio_ callback will be executed and <> +will return a non-zero error status. +In this case, the callback has a responsibity to retrieve the message from +the _aio_ with <> and dispose of +it appropriately. +(This may include retrying the send operation on the same or a different +socket, or deallocating the message with <>.) + +NOTE: The semantics of what sending a message means varies from protocol to +protocol, so examination of the protocol documentation is encouraged. + +TIP: Context send operations are asynchronous. +If a synchronous operation is needed, one can be constructed by using a +`NULL` callback on the _aio_ and then waiting for the operation using +<>. + +== RETURN VALUES + +None. (The operation completes asynchronously.) + +== ERRORS + +`NNG_ECANCELED`:: The operation was aborted. +`NNG_ECLOSED`:: The context _ctx_ is not open. +`NNG_EMSGSIZE`:: The message is too large. +`NNG_ENOMEM`:: Insufficient memory is available. +`NNG_ENOTSUP`:: The protocol for context _ctx_ does not support sending. +`NNG_ESTATE`:: The context _ctx_ cannot send data in this state. +`NNG_ETIMEDOUT`:: The send timeout expired. + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_ctx_setopt.3.adoc b/docs/man/nng_ctx_setopt.3.adoc new file mode 100644 index 00000000..4ab36e07 --- /dev/null +++ b/docs/man/nng_ctx_setopt.3.adoc @@ -0,0 +1,109 @@ += nng_ctx_setopt(3) +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This document is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +== NAME + +nng_ctx_setopt - set context option + +== SYNOPSIS + +[source, c] +---- +#include + +int nng_ctx_setopt(nng_ctx ctx, const char *opt, const void *val, size_t valsz); + +int nng_ctx_setopt_bool(nng_ctx ctx, const char *opt, int bval); + +int nng_ctx_setopt_int(nng_ctx ctx, const char *opt, int ival); + +int nng_ctx_setopt_ms(nng_ctx ctx, const char *opt, nng_duration dur); + +int nng_ctx_setopt_size(nng_ctx ctx, const char *opt, size_t z); + +int nng_ctx_setopt_string(nng_ctx ctx, const char *opt, const char *str); + +int nng_ctx_setopt_uint64(nng_ctx ctx, const char *opt, uint64_t u64); +---- + +== DESCRIPTION +(((options, context))) +The `nng_ctx_setopt()` functions are used to configure options for +the context _ctx_. +The actual options that may be configured in this way vary, and are +specified by _opt_. + +NOTE: Context options are protocol specific. +The details will be documented with the protocol. + +=== Forms + +The details of the type, size, and semantics of the option will depend +on the actual option, and will be documented with the option itself. + +TIP: Generally, it will be easier to use one of the typed versions +of this function. + +==== `nng_ctx_setopt()` +This function is untyped, and can be used to configure any arbitrary data. +The _val_ pointer addresses the data to copy, and _valsz_ is the +size of the objected located at _val_. + +==== `nng_ctx_setopt_bool()` +This function is for options which take a boolean (`bool`). +The _bval_ is passed to the option. + +==== `nng_ctx_setopt_int()` +This function is for options which take an integer (`int`). +The _ival_ is passed to the option. + +==== `nng_ctx_setopt_ms()` +This function is used to configure time durations (such as timeouts) using +type <>. +The duration _dur_ is an integer number of milliseconds. + +==== `nng_ctx_setopt_size()` +This function is used to configure a size, _z_, typically for buffer sizes, +message maximum sizes, and similar options. + +==== `nng_ctx_setopt_string()` +This function is used to pass configure a string, _str_. +Strings passed this way must be legal UTF-8 or ASCII strings, terminated +with a `NUL` (`\0`) byte. +(Other constraints may apply as well, see the documentation for each option +for details.) + +==== `nng_ctx_setopt_uint64()` +This function is used to configure a 64-bit unsigned value, _u64_. +This is typically used for options related to identifiers, network numbers, +and similar. + +== RETURN VALUES + +These functions return 0 on success, and non-zero otherwise. + +== ERRORS + +`NNG_ECLOSED`:: Parameter _s_ does not refer to an open socket. +`NNG_EINVAL`:: The value being passed is invalid. +`NNG_ENOTSUP`:: The option _opt_ is not supported. +`NNG_EREADONLY`:: The option _opt_ is read-only. +`NNG_ESTATE`:: The socket is in an inappropriate state for setting this option. + +== SEE ALSO + +<>, +<>, +<>, +<>, +<>, +<>, +<> diff --git a/docs/man/nng_getopt.3.adoc b/docs/man/nng_getopt.3.adoc index af61132c..306efbe9 100644 --- a/docs/man/nng_getopt.3.adoc +++ b/docs/man/nng_getopt.3.adoc @@ -16,7 +16,7 @@ nng_getopt - get socket option == SYNOPSIS [source, c] ------------ +---- #include int nng_getopt(nng_socket s, const char *opt, void *val, size_t *valszp); @@ -34,7 +34,7 @@ int nng_getopt_size(nng_socket s, const char *opt, size_t *zp); int nng_getopt_string(nng_socket s, const char *opt, char **strp); int nng_getopt_uint64(nng_socket s, const char *opt, uint64_t *u64p); ------------ +---- == DESCRIPTION diff --git a/docs/man/nng_rep.7.adoc b/docs/man/nng_rep.7.adoc index 255a8f4b..789b515d 100644 --- a/docs/man/nng_rep.7.adoc +++ b/docs/man/nng_rep.7.adoc @@ -41,15 +41,28 @@ The _rep_ protocol is the replier side, and the The <> functions create a replier socket. This socket may be used to receive messages (requests), and then to send replies. + Generally a reply can only be sent after receiving a request. -(Attempts to receive a message will result in `NNG_ESTATE` if there -is no outstanding request.) -Attempts to send on a socket with no outstanding requests will result -in `NNG_ESTATE`. +Send operations will result in `NNG_ESTATE` if no corresponding request +was previously received. + +Likewise, only one receive operation may be pending at a time. +Any additional concurrent receive operations will result in `NNG_ESTATE`. <> mode sockets ignore all these restrictions. +=== Context Operations + +This protocol supports the creation of <> for concurrent +use cases using <>. + +Each context may have at most one outstanding request, and operates +independently from the others. +The restrictions for order of operations with sockets apply equally +well for contexts, except that each context will be treated as if it were +a separate socket. + === Protocol Versions Only version 0 of this protocol is supported. diff --git a/docs/man/nng_req.7.adoc b/docs/man/nng_req.7.adoc index 9b956172..221023c4 100644 --- a/docs/man/nng_req.7.adoc +++ b/docs/man/nng_req.7.adoc @@ -53,23 +53,37 @@ The _req_ protocol is the requester side, and the === Socket Operations The <> functions create a requester socket. -This socket may be used to send messages (requests), -and then to receive replies. +This socket may be used to send messages (requests), and then to receive replies. + Generally a reply can only be received after sending a request. (Attempts to receive a message will result in `NNG_ESTATE` if there is no outstanding request.) +Furthermore, only a single receive operation may be pending at a time. +Attempts to post more receive operations concurrently will result in +`NNG_ESTATE`. + Requests may be canceled by sending a different request. This will cause the requester to discard any reply from the earlier request, but it will not stop a replier from processing a request it has already received or terminate a request that has already been placed on the wire. -Attempts to receive on a socket with no outstanding requests will result -in `NNG_ESTATE`. - <> mode sockets ignore all these restrictions. +=== Context Operations + +This protocol supports the creation of <> for concurrent +use cases using <>. +The `NNG_OPT_REQ_RESENDTIME` value may be configured differently +on contexts created this way. + +Each context may have at most one outstanding request, and operates +independently from the others. +The restrictions for order of operations with sockets apply equally +well for contexts, except that each context will be treated as if it were +a separate socket. + === Protocol Versions Only version 0 of this protocol is supported. @@ -82,7 +96,7 @@ The following protocol-specific option is available. ((`NNG_OPT_REQ_RESENDTIME`)):: This read/write option is a duration (32-bit unsigned integer) representing - a relative number of milliseconds. + a relative number of milliseconds. When a new request is started, a timer of this duration is also started. If no reply is received before this timer expires, then the request will be resent. (Requests are also automatically resent if the peer to whom @@ -129,7 +143,9 @@ request ID it originally used for the request. == SEE ALSO +<>, <>, <>, +<>, <>, <> diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 92f75f94..a3e36ee8 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -53,6 +53,8 @@ set (NNG_SOURCES core/nng_impl.h core/options.c core/options.h + core/pollable.c + core/pollable.h core/panic.c core/panic.h core/pipe.c diff --git a/src/core/aio.c b/src/core/aio.c index 9cf04217..3027b0c1 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -324,6 +324,7 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } if (!aio->a_sleep) { + // Convert the relative timeout to an absolute timeout. switch (aio->a_timeout) { case NNG_DURATION_ZERO: aio->a_expire = NNI_TIME_ZERO; @@ -338,7 +339,6 @@ nni_aio_start(nni_aio *aio, nni_aio_cancelfn cancelfn, void *data) } } - // Convert the relative timeout to an absolute timeout. if (aio->a_expire != NNI_TIME_NEVER) { nni_aio_expire_add(aio); } diff --git a/src/core/defs.h b/src/core/defs.h index f64d3df7..1d656bb2 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -41,6 +41,7 @@ typedef struct nng_notify nni_notify; // These are our own names. typedef struct nni_socket nni_sock; +typedef struct nni_ctx nni_ctx; typedef struct nni_ep nni_ep; typedef struct nni_pipe nni_pipe; typedef struct nni_tran nni_tran; @@ -49,6 +50,8 @@ typedef struct nni_tran_ep_option nni_tran_ep_option; typedef struct nni_tran_pipe nni_tran_pipe; typedef struct nni_tran_pipe_option nni_tran_pipe_option; +typedef struct nni_proto_ctx_option nni_proto_ctx_option; +typedef struct nni_proto_ctx_ops nni_proto_ctx_ops; typedef struct nni_proto_sock_ops nni_proto_sock_ops; typedef struct nni_proto_pipe_ops nni_proto_pipe_ops; typedef struct nni_proto_sock_option nni_proto_sock_option; diff --git a/src/core/idhash.h b/src/core/idhash.h index c2cecf81..9dbdd45e 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -29,8 +29,6 @@ typedef struct nni_idhash_entry nni_idhash_entry; extern int nni_idhash_init(nni_idhash **); extern void nni_idhash_fini(nni_idhash *); extern void nni_idhash_set_limits(nni_idhash *, uint64_t, uint64_t, uint64_t); -extern int nni_idhash_create(nni_idhash **); -extern void nni_idhash_destroy(nni_idhash *); extern int nni_idhash_find(nni_idhash *, uint64_t, void **); extern int nni_idhash_remove(nni_idhash *, uint64_t); extern int nni_idhash_insert(nni_idhash *, uint64_t, void *); diff --git a/src/core/nng_impl.h b/src/core/nng_impl.h index 5c750ec7..fdb2ce94 100644 --- a/src/core/nng_impl.h +++ b/src/core/nng_impl.h @@ -37,6 +37,7 @@ #include "core/msgqueue.h" #include "core/options.h" #include "core/panic.h" +#include "core/pollable.h" #include "core/protocol.h" #include "core/random.h" #include "core/reap.h" diff --git a/src/core/pollable.c b/src/core/pollable.c new file mode 100644 index 00000000..b5cecf37 --- /dev/null +++ b/src/core/pollable.c @@ -0,0 +1,101 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/nng_impl.h" + +struct nni_pollable { + int p_rfd; + int p_wfd; + nni_mtx p_lock; + bool p_raised; + bool p_open; +}; + +int +nni_pollable_alloc(nni_pollable **pp) +{ + nni_pollable *p; + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + p->p_open = false; + p->p_raised = false; + nni_mtx_init(&p->p_lock); + *pp = p; + return (0); +} + +void +nni_pollable_free(nni_pollable *p) +{ + if (p == NULL) { + return; + } + if (p->p_open) { + nni_plat_pipe_close(p->p_rfd, p->p_wfd); + } + nni_mtx_fini(&p->p_lock); + NNI_FREE_STRUCT(p); +} + +void +nni_pollable_raise(nni_pollable *p) +{ + if (p == NULL) { + return; + } + nni_mtx_lock(&p->p_lock); + p->p_raised = true; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_raise(p->p_wfd); + return; + } + nni_mtx_unlock(&p->p_lock); +} + +void +nni_pollable_clear(nni_pollable *p) +{ + if (p == NULL) { + return; + } + nni_mtx_lock(&p->p_lock); + p->p_raised = false; + if (p->p_open) { + nni_mtx_unlock(&p->p_lock); + nni_plat_pipe_clear(p->p_rfd); + return; + } + nni_mtx_unlock(&p->p_lock); +} + +int +nni_pollable_getfd(nni_pollable *p, int *fdp) +{ + if (p == NULL) { + return (NNG_EINVAL); + } + nni_mtx_lock(&p->p_lock); + if (!p->p_open) { + int rv; + if ((rv = nni_plat_pipe_open(&p->p_wfd, &p->p_rfd)) != 0) { + nni_mtx_unlock(&p->p_lock); + return (rv); + } + p->p_open = true; + if (p->p_raised) { + nni_plat_pipe_raise(p->p_wfd); + } + } + nni_mtx_unlock(&p->p_lock); + *fdp = p->p_rfd; + return (0); +} diff --git a/src/core/pollable.h b/src/core/pollable.h new file mode 100644 index 00000000..50ec9bf6 --- /dev/null +++ b/src/core/pollable.h @@ -0,0 +1,26 @@ +// +// Copyright 2017 Garrett D'Amore +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef CORE_POLLABLE_H +#define CORE_POLLABLE_H + +#include "core/defs.h" +#include "core/list.h" + +// For the sake of simplicity, we just maintain a single global timer thread. + +typedef struct nni_pollable nni_pollable; + +extern int nni_pollable_alloc(nni_pollable **); +extern void nni_pollable_free(nni_pollable *); +extern void nni_pollable_raise(nni_pollable *); +extern void nni_pollable_clear(nni_pollable *); +extern int nni_pollable_getfd(nni_pollable *, int *); + +#endif // CORE_POLLABLE_H diff --git a/src/core/protocol.h b/src/core/protocol.h index 9b241138..964aee1a 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -47,6 +47,39 @@ struct nni_proto_pipe_ops { void (*pipe_stop)(void *); }; +struct nni_proto_ctx_option { + const char *co_name; + int co_type; + int (*co_getopt)(void *, void *, size_t *, int); + int (*co_setopt)(void *, const void *, size_t, int); +}; + +struct nni_proto_ctx_ops { + // ctx_init creates a new context. The second argument is the + // protocol specific socket structure. + int (*ctx_init)(void **, void *); + + // ctx_fini destroys a context. + void (*ctx_fini)(void *); + + // ctx_recv is an asynchronous recv. + void (*ctx_recv)(void *, nni_aio *); + + // ctx_send is an asynchronous send. + void (*ctx_send)(void *, nni_aio *); + + // ctx_drain drains the context, signaling the aio when done. + // This should prevent any further receives from completing, + // and only sends that had already been submitted should be + // permitted to continue. It may be NULL for protocols where + // draining without an ability to receive makes no sense + // (e.g. REQ or SURVEY). + void (*ctx_drain)(void *, nni_aio *); + + // ctx_options array. + nni_proto_ctx_option *ctx_options; +}; + struct nni_proto_sock_option { const char *pso_name; int pso_type; @@ -87,6 +120,12 @@ struct nni_proto_sock_ops { // should return NULL, otherwise the message (possibly modified). nni_msg *(*sock_filter)(void *, nni_msg *); + // Socket draining is intended to permit protocols to "drain" + // before exiting. For protocols where draining makes no + // sense, this may be NULL. (Example: REQ and SURVEYOR should + // not drain, because they cannot receive a reply!) + void (*sock_drain)(void *, nni_aio *); + // Options. Must not be NULL. Final entry should have NULL name. nni_proto_sock_option *sock_options; }; @@ -103,6 +142,7 @@ struct nni_proto { uint32_t proto_flags; // Protocol flags const nni_proto_sock_ops *proto_sock_ops; // Per-socket opeations const nni_proto_pipe_ops *proto_pipe_ops; // Per-pipe operations. + const nni_proto_ctx_ops * proto_ctx_ops; // Context operations. // proto_init, if not NULL, provides a function that initializes // global values. The main purpose of this may be to initialize @@ -128,11 +168,14 @@ struct nni_proto { // These flags determine which operations make sense. We use them so that // we can reject attempts to create notification fds for operations that make // no sense. Also, we can detect raw mode, thereby providing handling for -// that at the socket layer (NNG_PROTO_FLAG_RAW). +// that at the socket layer (NNG_PROTO_FLAG_RAW). Finally, we provide the +// NNI_PROTO_FLAG_NOMSGQ flag for protocols that do not use the upper write +// or upper read queues. #define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv #define NNI_PROTO_FLAG_RAW 4 // Protocol is raw +#define NNI_PROTO_FLAG_NOMSGQ 8 // Protocol bypasses the upper queues // nni_proto_open is called by the protocol to create a socket instance // with its ops vector. The intent is that applications will only see diff --git a/src/core/socket.c b/src/core/socket.c index 62950b1c..67a2991c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -18,6 +18,19 @@ static nni_list nni_sock_list; static nni_idhash *nni_sock_hash; static nni_mtx nni_sock_lk; +static nni_idhash *nni_ctx_hash; + +struct nni_ctx { + nni_list_node c_node; + nni_sock * c_sock; + nni_proto_ctx_ops c_ops; + void * c_data; + bool c_closed; + unsigned c_refcnt; // protected by global lock + uint32_t c_id; + nng_duration c_sndtimeo; + nng_duration c_rcvtimeo; +}; typedef struct nni_socket_option { const char *so_name; @@ -53,6 +66,7 @@ struct nni_socket { nni_proto_pipe_ops s_pipe_ops; nni_proto_sock_ops s_sock_ops; + nni_proto_ctx_ops s_ctx_ops; // options nni_duration s_linger; // linger time @@ -66,15 +80,19 @@ struct nni_socket { nni_list s_eps; // active endpoints nni_list s_pipes; // active pipes + nni_list s_ctxs; // active contexts (protected by global nni_sock_lk) - int s_ep_pend; // EP dial/listen in progress - int s_closing; // Socket is closing - int s_closed; // Socket closed, protected by global lock + int s_ep_pend; // EP dial/listen in progress + int s_closing; // Socket is closing + int s_closed; // Socket closed, protected by global lock + bool s_ctxwait; // Waiting for contexts to close. nni_notifyfd s_send_fd; nni_notifyfd s_recv_fd; }; +static void nni_ctx_destroy(nni_ctx *); + static void nni_sock_can_send_cb(void *arg, int flags) { @@ -99,32 +117,6 @@ nni_sock_can_recv_cb(void *arg, int flags) } } -void -nni_sock_set_sendable(nni_sock *s, bool cansend) -{ - nni_notifyfd *fd = &s->s_send_fd; - if (fd->sn_init) { - if (cansend) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - -void -nni_sock_set_recvable(nni_sock *s, bool canrecv) -{ - nni_notifyfd *fd = &s->s_recv_fd; - if (fd->sn_init) { - if (canrecv) { - nni_plat_pipe_raise(fd->sn_wfd); - } else { - nni_plat_pipe_clear(fd->sn_rfd); - } - } -} - static int nni_sock_get_fd(nni_sock *s, int flag, int *fdp) { @@ -159,7 +151,15 @@ nni_sock_get_fd(nni_sock *s, int flag, int *fdp) return (rv); } - nni_msgq_set_cb(mq, cb, fd); + // Only set the callback on the message queue if we are + // using it. The message queue automatically updates + // the pipe when the callback is first established. + // If we are not using the message queue, then we have + // to update the initial state explicitly ourselves. + + if ((nni_sock_flags(s) & NNI_PROTO_FLAG_NOMSGQ) == 0) { + nni_msgq_set_cb(mq, cb, fd); + } fd->sn_init = 1; } @@ -563,6 +563,10 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) s->s_sock_ops = *proto->proto_sock_ops; s->s_pipe_ops = *proto->proto_pipe_ops; + if (proto->proto_ctx_ops != NULL) { + s->s_ctx_ops = *proto->proto_ctx_ops; + } + NNI_ASSERT(s->s_sock_ops.sock_open != NULL); NNI_ASSERT(s->s_sock_ops.sock_close != NULL); @@ -571,6 +575,8 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto) NNI_LIST_NODE_INIT(&s->s_node); NNI_LIST_INIT(&s->s_options, nni_sockopt, node); + NNI_LIST_INIT(&s->s_ctxs, nni_ctx, c_node); + nni_pipe_sock_list_init(&s->s_pipes); nni_ep_list_init(&s->s_eps); nni_mtx_init(&s->s_mx); @@ -613,19 +619,27 @@ nni_sock_sys_init(void) NNI_LIST_INIT(&nni_sock_list, nni_sock, s_node); nni_mtx_init(&nni_sock_lk); - if ((rv = nni_idhash_init(&nni_sock_hash)) != 0) { + if (((rv = nni_idhash_init(&nni_sock_hash)) != 0) || + ((rv = nni_idhash_init(&nni_ctx_hash)) != 0)) { nni_sock_sys_fini(); - } else { - nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + return (rv); } - return (rv); + nni_idhash_set_limits(nni_sock_hash, 1, 0x7fffffff, 1); + nni_idhash_set_limits(nni_ctx_hash, 1, 0x7fffffff, 1); + return (0); } void nni_sock_sys_fini(void) { - nni_idhash_fini(nni_sock_hash); - nni_sock_hash = NULL; + if (nni_sock_hash != NULL) { + nni_idhash_fini(nni_sock_hash); + nni_sock_hash = NULL; + } + if (nni_ctx_hash != NULL) { + nni_idhash_fini(nni_ctx_hash); + nni_ctx_hash = NULL; + } nni_mtx_fini(&nni_sock_lk); } @@ -653,10 +667,11 @@ nni_sock_open(nni_sock **sockp, const nni_proto *proto) s->s_sock_ops.sock_open(s->s_data); *sockp = s; } + nni_mtx_unlock(&nni_sock_lk); + // Set the sockname. (void) snprintf( s->s_name, sizeof(s->s_name), "%u", (unsigned) s->s_id); - nni_mtx_unlock(&nni_sock_lk); return (rv); } @@ -672,6 +687,8 @@ nni_sock_shutdown(nni_sock *sock) nni_ep * ep; nni_ep * nep; nni_time linger; + nni_ctx * ctx; + nni_ctx * nctx; nni_mtx_lock(&sock->s_mx); if (sock->s_closing) { @@ -697,16 +714,51 @@ nni_sock_shutdown(nni_sock *sock) } nni_mtx_unlock(&sock->s_mx); + // We now mark any owned contexts as closing. + // XXX: Add context draining support here! + nni_mtx_lock(&nni_sock_lk); + nctx = nni_list_first(&sock->s_ctxs); + while ((ctx = nctx) != NULL) { + nctx = nni_list_next(&sock->s_ctxs, ctx); + ctx->c_closed = true; + if (ctx->c_refcnt == 0) { + // No open operations. So close it. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + nni_ctx_destroy(ctx); + } + // If still has a reference count, then wait for last + // reference to close before nuking it. + } + nni_mtx_unlock(&nni_sock_lk); + + // XXX: Add protocol specific drain here. This should replace the + // msgq_drain feature below. Probably msgq_drain will need to + // be changed to take an AIO for completion. + // We drain the upper write queue. This is just like closing it, // except that the protocol gets a chance to get the messages and // push them down to the transport. This operation can *block* - // until the linger time has expired. - nni_msgq_drain(sock->s_uwq, linger); + // until the linger time has expired. We only do this for sendable + // sockets that are actually using the message queue of course. + if ((nni_sock_flags(sock) & + (NNI_PROTO_FLAG_NOMSGQ | NNI_PROTO_FLAG_SND)) == + NNI_PROTO_FLAG_SND) { + nni_msgq_drain(sock->s_uwq, linger); + } // Generally, unless the protocol is blocked trying to perform // writes (e.g. a slow reader on the other side), it should be // trying to shut things down. We wait to give it // a chance to do so gracefully. + + nni_mtx_lock(&nni_sock_lk); + while (!nni_list_empty(&sock->s_ctxs)) { + sock->s_ctxwait = true; + nni_cv_wait(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + nni_mtx_lock(&sock->s_mx); while (nni_list_first(&sock->s_pipes) != NULL) { if (nni_cv_until(&sock->s_cv, linger) == NNG_ETIMEDOUT) { @@ -790,7 +842,8 @@ nni_sock_close(nni_sock *s) // Wait for all other references to drop. Note that we // have a reference already (from our caller). - while (s->s_refcnt > 1) { + s->s_ctxwait = true; + while ((s->s_refcnt > 1) || (!nni_list_empty(&s->s_ctxs))) { nni_cv_wait(&s->s_close_cv); } nni_mtx_unlock(&nni_sock_lk); @@ -1147,3 +1200,224 @@ nni_sock_flags(nni_sock *sock) { return (sock->s_flags); } + +int +nni_ctx_find(nni_ctx **ctxp, uint32_t id, bool closing) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_mtx_lock(&nni_sock_lk); + if ((rv = nni_idhash_find(nni_ctx_hash, id, (void **) &ctx)) == 0) { + // We refuse a reference if either the socket is closed, + // or the context is closed. (If the socket is closed, + // and we are only getting the reference so we can close it, + // then we still allow. In the case the only valid operation + // will be to close the socket.) + if (ctx->c_closed || ((!closing) && ctx->c_sock->s_closed)) { + rv = NNG_ECLOSED; + } else { + ctx->c_refcnt++; + *ctxp = ctx; + } + } + nni_mtx_unlock(&nni_sock_lk); + + if (rv == NNG_ENOENT) { + rv = NNG_ECLOSED; + } + + return (rv); +} + +static void +nni_ctx_destroy(nni_ctx *ctx) +{ + if (ctx->c_data != NULL) { + ctx->c_ops.ctx_fini(ctx->c_data); + } + + // Let the socket go, our hold on it is done. + NNI_FREE_STRUCT(ctx); +} + +void +nni_ctx_rele(nni_ctx *ctx) +{ + nni_sock *sock = ctx->c_sock; + nni_mtx_lock(&nni_sock_lk); + ctx->c_refcnt--; + if ((ctx->c_refcnt > 0) || (!ctx->c_closed)) { + // Either still have an active reference, or not actually + // closing yet. + nni_mtx_unlock(&nni_sock_lk); + return; + } + + // Remove us from the hash, so we can't be found any more. + // This allows our ID to be reused later, although the system + // tries to avoid ID reuse. + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_list_remove(&sock->s_ctxs, ctx); + if (sock->s_closed || sock->s_ctxwait) { + nni_cv_wake(&sock->s_close_cv); + } + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_destroy(ctx); +} + +int +nni_ctx_open(nni_ctx **ctxp, nni_sock *sock) +{ + nni_ctx *ctx; + int rv; + uint64_t id; + + if (sock->s_ctx_ops.ctx_init == NULL) { + return (NNG_ENOTSUP); + } + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + nni_mtx_lock(&nni_sock_lk); + if (sock->s_closed) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (NNG_ECLOSED); + } + if ((rv = nni_idhash_alloc(nni_ctx_hash, &id, ctx)) != 0) { + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + ctx->c_id = (uint32_t) id; + + if ((rv = sock->s_ctx_ops.ctx_init(&ctx->c_data, sock->s_data)) != 0) { + nni_idhash_remove(nni_ctx_hash, ctx->c_id); + nni_mtx_unlock(&nni_sock_lk); + NNI_FREE_STRUCT(ctx); + return (rv); + } + + ctx->c_closed = false; + ctx->c_refcnt = 1; // Caller implicitly gets a reference. + ctx->c_sock = sock; + ctx->c_ops = sock->s_ctx_ops; + ctx->c_rcvtimeo = sock->s_rcvtimeo; + ctx->c_sndtimeo = sock->s_sndtimeo; + + nni_list_append(&sock->s_ctxs, ctx); + nni_mtx_unlock(&nni_sock_lk); + + // Paranoia, fixing a possible race in close. Don't let us + // give back a context if the socket is being shutdown (it might + // not have reached the "closed" state yet.) + nni_mtx_lock(&sock->s_mx); + if (sock->s_closing) { + nni_mtx_unlock(&sock->s_mx); + nni_ctx_rele(ctx); + return (NNG_ECLOSED); + } + nni_mtx_unlock(&sock->s_mx); + *ctxp = ctx; + + return (0); +} + +void +nni_ctx_close(nni_ctx *ctx) +{ + nni_mtx_lock(&nni_sock_lk); + ctx->c_closed = true; + nni_mtx_unlock(&nni_sock_lk); + + nni_ctx_rele(ctx); +} + +uint32_t +nni_ctx_id(nni_ctx *ctx) +{ + return (ctx->c_id); +} + +void +nni_ctx_send(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_sndtimeo); + ctx->c_ops.ctx_send(ctx->c_data, aio); +} + +void +nni_ctx_recv(nni_ctx *ctx, nni_aio *aio) +{ + nni_aio_normalize_timeout(aio, ctx->c_rcvtimeo); + ctx->c_ops.ctx_recv(ctx->c_data, aio); +} + +int +nni_ctx_getopt(nni_ctx *ctx, const char *opt, void *v, size_t *szp, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_rcvtimeo, v, szp, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyout_ms(ctx->c_sndtimeo, v, szp, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_getopt == NULL) { + rv = NNG_EWRITEONLY; + break; + } + rv = co->co_getopt(ctx->c_data, v, szp, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} + +int +nni_ctx_setopt( + nni_ctx *ctx, const char *opt, const void *v, size_t sz, int typ) +{ + nni_sock * sock = ctx->c_sock; + nni_proto_ctx_option *co; + int rv; + + nni_mtx_lock(&sock->s_mx); + if (strcmp(opt, NNG_OPT_RECVTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_rcvtimeo, v, sz, typ); + } else if (strcmp(opt, NNG_OPT_SENDTIMEO) == 0) { + rv = nni_copyin_ms(&ctx->c_sndtimeo, v, sz, typ); + } else { + rv = NNG_ENOTSUP; + for (co = ctx->c_ops.ctx_options; co->co_name != NULL; co++) { + if (strcmp(opt, co->co_name) != 0) { + continue; + } + if (co->co_setopt == NULL) { + rv = NNG_EREADONLY; + break; + } + rv = co->co_setopt(ctx->c_data, v, sz, typ); + break; + } + } + + nni_mtx_unlock(&sock->s_mx); + return (rv); +} diff --git a/src/core/socket.h b/src/core/socket.h index 3af6bb9e..87bf1374 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -63,4 +63,46 @@ extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *); // nni_sock_flags returns the socket flags, used to indicate whether read // and or write are appropriate for the protocol. extern uint32_t nni_sock_flags(nni_sock *); + +// nni_ctx_open is used to open/create a new context structure. +// Contexts are not supported by most protocols, but for those that do, +// this can offer some improvements for massive concurrency/scalability. +// Returns NNG_ENOTSUP for protocols that lack context support. This adds +// another reference (hold) on the socket on success, and the newly +// created context is also held by the caller. Not supported by raw mode +// sockets (will also return NNG_ENOTSUP). +extern int nni_ctx_open(nni_ctx **, nni_sock *); + +// nni_ctx_find finds a context given its id. The last argument should +// be true if the context is acquired merely to close it, false otherwise. +// (If the socket for the context is being closed, then this will return +// NNG_ECLOSED unless the final argument is true.) +extern int nni_ctx_find(nni_ctx **, uint32_t, bool); + +// nni_ctx_rele is called to release a hold on the context. These holds +// are acquired by either nni_ctx_open or nni_ctx_find. If the context +// is being closed (nni_ctx_close was called), and this is the last reference, +// then the underlying context is freed, and the implicit socket hold +// by the context is also released. +extern void nni_ctx_rele(nni_ctx *); + +// nni_ctx_close is used to close the context. It also implictly releases +// the context. +extern void nni_ctx_close(nni_ctx *); + +// nni_ctx_id returns the context ID, which can be used with nni_ctx_find. +extern uint32_t nni_ctx_id(nni_ctx *); + +// nni_ctx_recv is an asychronous receive. +extern void nni_ctx_recv(nni_ctx *, nni_aio *); + +// nni_ctx_send is an asychronous receive. +extern void nni_ctx_send(nni_ctx *, nni_aio *); + +// nni_ctx_getopt is used to get a context option. +extern int nni_ctx_getopt(nni_ctx *, const char *, void *, size_t *, int); + +// nni_ctx_setopt is used to set a context option. +extern int nni_ctx_setopt(nni_ctx *, const char *, const void *, size_t, int); + #endif // CORE_SOCKET_H diff --git a/src/nng.c b/src/nng.c index c7c51672..e3108e47 100644 --- a/src/nng.c +++ b/src/nng.c @@ -219,6 +219,126 @@ nng_send_aio(nng_socket sid, nng_aio *aio) nni_sock_rele(sock); } +int +nng_ctx_open(nng_ctx *idp, nng_socket sid) +{ + nni_sock *sock; + nni_ctx * ctx; + int rv; + + if ((rv = nni_sock_find(&sock, sid)) != 0) { + return (rv); + } + if ((rv = nni_ctx_open(&ctx, sock)) != 0) { + nni_sock_rele(sock); + return (rv); + } + *idp = nni_ctx_id(ctx); + nni_ctx_rele(ctx); + nni_sock_rele(sock); + return (0); +} + +int +nng_ctx_close(nng_ctx cid) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_ctx_find(&ctx, cid, true)) != 0) { + return (rv); + } + // no release, close releases implicitly. + nni_ctx_close(ctx); + return (0); +} + +void +nng_ctx_recv(nng_ctx cid, nng_aio *aio) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_ctx_find(&ctx, cid, false)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_ctx_recv(ctx, aio); + nni_ctx_rele(ctx); +} + +void +nng_ctx_send(nng_ctx cid, nng_aio *aio) +{ + int rv; + nni_ctx *ctx; + + if ((rv = nni_ctx_find(&ctx, cid, false)) != 0) { + nni_aio_finish_error(aio, rv); + return; + } + nni_ctx_send(ctx, aio); + nni_ctx_rele(ctx); +} + +static int +nng_ctx_getx(nng_ctx id, const char *n, void *v, size_t *szp, int t) +{ + nni_ctx *ctx; + int rv; + + if ((rv = nni_init()) != 0) { + return (rv); + } + if ((rv = nni_ctx_find(&ctx, id, false)) != 0) { + return (rv); + } + rv = nni_ctx_getopt(ctx, n, v, szp, t); + nni_ctx_rele(ctx); + return (rv); +} + +int +nng_ctx_getopt(nng_ctx id, const char *name, void *val, size_t *szp) +{ + return (nng_ctx_getx(id, name, val, szp, NNI_TYPE_OPAQUE)); +} + +int +nng_ctx_getopt_bool(nng_ctx id, const char *name, bool *vp) +{ + size_t sz = sizeof(*vp); + return (nng_ctx_getx(id, name, vp, &sz, NNI_TYPE_BOOL)); +} + +int +nng_ctx_getopt_int(nng_ctx id, const char *name, int *vp) +{ + size_t sz = sizeof(*vp); + return (nng_ctx_getx(id, name, vp, &sz, NNI_TYPE_INT32)); +} + +int +nng_ctx_getopt_size(nng_ctx id, const char *name, size_t *vp) +{ + size_t sz = sizeof(*vp); + return (nng_ctx_getx(id, name, vp, &sz, NNI_TYPE_SIZE)); +} + +int +nng_ctx_getopt_string(nng_ctx id, const char *name, char **vp) +{ + size_t sz = sizeof(*vp); + return (nng_ctx_getx(id, name, vp, &sz, NNI_TYPE_STRING)); +} + +int +nng_ctx_getopt_ms(nng_ctx id, const char *name, nng_duration *vp) +{ + size_t sz = sizeof(*vp); + return (nng_ctx_getx(id, name, vp, &sz, NNI_TYPE_DURATION)); +} + int nng_dial(nng_socket sid, const char *addr, nng_dialer *dp, int flags) { diff --git a/src/nng.h b/src/nng.h index be24113a..66aee6d6 100644 --- a/src/nng.h +++ b/src/nng.h @@ -57,6 +57,7 @@ extern "C" { // Types common to nng. typedef uint32_t nng_socket; +typedef uint32_t nng_ctx; typedef uint32_t nng_dialer; typedef uint32_t nng_listener; typedef uint32_t nng_pipe; @@ -336,6 +337,47 @@ NNG_DECL void nng_send_aio(nng_socket, nng_aio *); // this point. NNG_DECL void nng_recv_aio(nng_socket, nng_aio *); +// Context support. User contexts are not supported by all protocols, +// but for those that do, they give a way to create multiple contexts +// on a single socket, each of which runs the protocol's state machinery +// independently, offering a way to achieve concurrent protocol support +// without resorting to raw mode sockets. See the protocol specific +// documentation for further details. (Note that at this time, only +// asynchronous send/recv are supported for contexts, but its easy enough +// to make synchronous versions with nng_aio_wait().) Note that nng_close +// of the parent socket will *block* as long as any contexts are open. + +// nng_ctx_open creates a context. This returns NNG_ENOTSUP if the +// protocol implementation does not support separate contexts. +NNG_DECL int nng_ctx_open(nng_ctx *, nng_socket); + +// nng_ctx_close closes the context. +NNG_DECL int nng_ctx_close(nng_ctx); + +// nng_ctx_recv receives asynchronously. It works like nng_recv_aio, but +// uses a local context instead of the socket global context. +NNG_DECL void nng_ctx_recv(nng_ctx, nng_aio *); + +// nng_ctx_send sends asynchronously. It works like nng_send_aio, but +// uses a local context instead of the socket global context. +NNG_DECL void nng_ctx_send(nng_ctx, nng_aio *); + +// nng_ctx_getopt is used to retrieve a context-specific option. This +// can only be used for those options that relate to specific context +// tunables (which does include NNG_OPT_SENDTIMEO and NNG_OPT_RECVTIMEO); +// see the protocol documentation for more details. +NNG_DECL int nng_ctx_getopt(nng_ctx, const char *, void *, size_t *); +NNG_DECL int nng_ctx_getopt_bool(nng_ctx, const char *, bool *); +NNG_DECL int nng_ctx_getopt_int(nng_ctx, const char *, int *); +NNG_DECL int nng_ctx_getopt_ms(nng_ctx, const char *, nng_duration *); +NNG_DECL int nng_ctx_getopt_size(nng_ctx, const char *, size_t *); + +// nng_ctx_setopt is used to set a context-specific option. This +// can only be used for those options that relate to specific context +// tunables (which does include NNG_OPT_SENDTIMEO and NNG_OPT_RECVTIMEO); +// see the protocol documentation for more details. +NNG_DECL int nng_ctx_setopt(nng_ctx, const char *, const void *, size_t); + // nng_alloc is used to allocate memory. It's intended purpose is for // allocating memory suitable for message buffers with nng_send(). // Applications that need memory for other purposes should use their platform diff --git a/src/protocol/reqrep0/CMakeLists.txt b/src/protocol/reqrep0/CMakeLists.txt index 7b04aa2d..071c28f1 100644 --- a/src/protocol/reqrep0/CMakeLists.txt +++ b/src/protocol/reqrep0/CMakeLists.txt @@ -11,12 +11,14 @@ # Req/Rep protocol if (NNG_PROTO_REQ0) - set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/req.h) + set(REQ0_SOURCES protocol/reqrep0/req.c protocol/reqrep0/xreq.c + protocol/reqrep0/req.h) set(REQ0_HEADERS protocol/reqrep0/req.h) endif() if (NNG_PROTO_REP0) - set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/rep.h) + set(REP0_SOURCES protocol/reqrep0/rep.c protocol/reqrep0/xrep.c + protocol/reqrep0/rep.h) set(REP0_HEADERS protocol/reqrep0/rep.h) endif() diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index 78a1f2ee..e512c18b 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -28,48 +28,219 @@ typedef struct rep0_pipe rep0_pipe; typedef struct rep0_sock rep0_sock; +typedef struct rep0_ctx rep0_ctx; -static void rep0_sock_getq_cb(void *); -static void rep0_pipe_getq_cb(void *); -static void rep0_pipe_putq_cb(void *); static void rep0_pipe_send_cb(void *); static void rep0_pipe_recv_cb(void *); static void rep0_pipe_fini(void *); +struct rep0_ctx { + rep0_sock * sock; + bool closed; + char * btrace; + size_t btrace_len; + size_t btrace_size; + int ttl; + uint32_t pipe_id; + nni_aio * saio; // send aio + nni_aio * raio; // recv aio + nni_list_node sqnode; + nni_list_node rqnode; +}; + // rep0_sock is our per-socket protocol private structure. struct rep0_sock { - nni_msgq * uwq; - nni_msgq * urq; - nni_mtx lk; - int ttl; - nni_idhash *pipes; - char * btrace; - size_t btrace_len; - nni_aio * aio_getq; + nni_mtx lk; + int ttl; + nni_idhash * pipes; + nni_list recvpipes; // list of pipes with data to receive + nni_list recvq; + bool closed; + rep0_ctx * ctx; + nni_pollable *recvable; + nni_pollable *sendable; }; // rep0_pipe is our per-pipe protocol private structure. struct rep0_pipe { - nni_pipe * pipe; - rep0_sock *rep; - nni_msgq * sendq; - nni_aio * aio_getq; - nni_aio * aio_send; - nni_aio * aio_recv; - nni_aio * aio_putq; + nni_pipe * pipe; + rep0_sock * rep; + uint32_t id; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_list_node rnode; // receivable list linkage + nni_list sendq; // contexts waiting to send + bool busy; }; +static void +rep0_ctx_close(void *arg) +{ + rep0_ctx * ctx = arg; + rep0_sock *s = ctx->sock; + nni_aio * aio; + + nni_mtx_lock(&s->lk); + ctx->closed = true; + if ((aio = ctx->saio) != NULL) { + nni_msg *msg; + nni_list_node_remove(&ctx->sqnode); + msg = nni_aio_get_msg(aio); + nni_msg_free(msg); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + if ((aio = ctx->raio) != NULL) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&s->lk); +} + +static void +rep0_ctx_fini(void *arg) +{ + rep0_ctx *ctx = arg; + + rep0_ctx_close(ctx); + nni_free(ctx->btrace, ctx->btrace_size); + NNI_FREE_STRUCT(ctx); +} + +static int +rep0_ctx_init(void **ctxp, void *sarg) +{ + rep0_sock *s = sarg; + rep0_ctx * ctx; + + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); + } + + // this is 1kB, which covers the worst case. + ctx->btrace_size = 256 * sizeof(uint32_t); + if ((ctx->btrace = nni_alloc(ctx->btrace_size)) == NULL) { + NNI_FREE_STRUCT(ctx); + return (NNG_ENOMEM); + } + NNI_LIST_NODE_INIT(&ctx->sqnode); + NNI_LIST_NODE_INIT(&ctx->rqnode); + ctx->btrace_len = 0; + ctx->sock = s; + ctx->pipe_id = 0; + *ctxp = ctx; + + return (0); +} + +static void +rep0_ctx_cancel_send(nni_aio *aio, int rv) +{ + rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_sock *s = ctx->sock; + + nni_mtx_lock(&s->lk); + if (ctx->saio != aio) { + nni_mtx_unlock(&s->lk); + return; + } + nni_list_node_remove(&ctx->sqnode); + ctx->saio = NULL; + nni_mtx_unlock(&s->lk); + + nni_msg_header_clear(nni_aio_get_msg(aio)); // reset the headers + nni_aio_finish_error(aio, rv); +} + +static void +rep0_ctx_send(void *arg, nni_aio *aio) +{ + rep0_ctx * ctx = arg; + rep0_sock *s = ctx->sock; + rep0_pipe *p; + nni_msg * msg; + int rv; + size_t len; + uint32_t p_id; // pipe id + + msg = nni_aio_get_msg(aio); + nni_msg_header_clear(msg); + + nni_mtx_lock(&s->lk); + len = ctx->btrace_len; + p_id = ctx->pipe_id; + + // Assert "completion" of the previous req request. This ensures + // exactly one send for one receive ordering. + ctx->btrace_len = 0; + ctx->pipe_id = 0; + + if (ctx == s->ctx) { + // No matter how this goes, we will no longer be able + // to send on the socket (root context). That's because + // we will have finished (successfully or otherwise) the + // reply for the single request we got. + nni_pollable_clear(s->sendable); + } + + if (nni_aio_start(aio, rep0_ctx_cancel_send, ctx) != 0) { + nni_mtx_unlock(&s->lk); + return; + } + if (ctx->closed) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (len == 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ESTATE); + return; + } + if ((rv = nni_msg_header_append(msg, ctx->btrace, len)) != 0) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, rv); + return; + } + + if ((rv = nni_idhash_find(s->pipes, p_id, (void **) &p)) != 0) { + // Pipe is gone. Make this look like a good send to avoid + // disrupting the state machine. We don't care if the peer + // lost interest in our reply. + nni_aio_set_msg(aio, NULL); + nni_mtx_unlock(&s->lk); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + return; + } + if (p->busy) { + ctx->saio = aio; + nni_list_append(&p->sendq, ctx); + nni_mtx_unlock(&s->lk); + return; + } + + p->busy = true; + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + nni_mtx_unlock(&s->lk); + + nni_aio_finish(aio, 0, len); +} + static void rep0_sock_fini(void *arg) { rep0_sock *s = arg; - nni_aio_stop(s->aio_getq); - nni_aio_fini(s->aio_getq); nni_idhash_fini(s->pipes); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); + if (s->ctx != NULL) { + rep0_ctx_fini(s->ctx); } + nni_pollable_free(s->sendable); + nni_pollable_free(s->recvable); nni_mtx_fini(&s->lk); NNI_FREE_STRUCT(s); } @@ -80,21 +251,34 @@ rep0_sock_init(void **sp, nni_sock *sock) rep0_sock *s; int rv; + NNI_ARG_UNUSED(sock); + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } nni_mtx_init(&s->lk); - if (((rv = nni_idhash_init(&s->pipes)) != 0) || - ((rv = nni_aio_init(&s->aio_getq, rep0_sock_getq_cb, s)) != 0)) { + if ((rv = nni_idhash_init(&s->pipes)) != 0) { + rep0_sock_fini(s); + return (rv); + } + + NNI_LIST_INIT(&s->recvq, rep0_ctx, rqnode); + NNI_LIST_INIT(&s->recvpipes, rep0_pipe, rnode); + + s->ttl = 8; + + if ((rv = rep0_ctx_init((void **) &s->ctx, s)) != 0) { rep0_sock_fini(s); return (rv); } - s->ttl = 8; // Per RFC - s->btrace = NULL; - s->btrace_len = 0; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); + // We start off without being either readable or pollable. + // Readability comes when there is something on the socket. + if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || + ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { + rep0_sock_fini(s); + return (rv); + } *sp = s; @@ -104,9 +288,7 @@ rep0_sock_init(void **sp, nni_sock *sock) static void rep0_sock_open(void *arg) { - rep0_sock *s = arg; - - nni_msgq_aio_get(s->uwq, s->aio_getq); + NNI_ARG_UNUSED(arg); } static void @@ -114,7 +296,7 @@ rep0_sock_close(void *arg) { rep0_sock *s = arg; - nni_aio_abort(s->aio_getq, NNG_ECLOSED); + rep0_ctx_close(s->ctx); } static void @@ -122,11 +304,8 @@ rep0_pipe_fini(void *arg) { rep0_pipe *p = arg; - nni_aio_fini(p->aio_getq); nni_aio_fini(p->aio_send); nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_putq); - nni_msgq_fini(p->sendq); NNI_FREE_STRUCT(p); } @@ -139,15 +318,15 @@ rep0_pipe_init(void **pp, nni_pipe *pipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - if (((rv = nni_msgq_init(&p->sendq, 2)) != 0) || - ((rv = nni_aio_init(&p->aio_getq, rep0_pipe_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, rep0_pipe_putq_cb, p)) != 0)) { + if (((rv = nni_aio_init(&p->aio_send, rep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, rep0_pipe_recv_cb, p)) != 0)) { rep0_pipe_fini(p); return (rv); } + NNI_LIST_INIT(&p->sendq, rep0_ctx, sqnode); + + p->id = nni_pipe_id(pipe); p->pipe = pipe; p->rep = s; *pp = p; @@ -164,8 +343,8 @@ rep0_pipe_start(void *arg) if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { return (rv); } - - nni_msgq_aio_get(p->sendq, p->aio_getq); + // By definition, we have not received a request yet on this pipe, + // so it cannot cause us to become sendable. nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -175,94 +354,136 @@ rep0_pipe_stop(void *arg) { rep0_pipe *p = arg; rep0_sock *s = p->rep; + rep0_ctx * ctx; + + nni_mtx_lock(&s->lk); + while ((ctx = nni_list_first(&p->sendq)) != NULL) { + nni_aio *aio; + nni_msg *msg; + // Pipe was closed. To avoid pushing an error back to the + // entire socket, we pretend we completed this successfully. + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + msg = nni_aio_get_msg(aio); + nni_aio_finish(aio, 0, nni_msg_len(msg)); + nni_msg_free(msg); + } + if (p->id == s->ctx->pipe_id) { + // We "can" send. (Well, not really, but we will happily + // accept a message and discard it.) + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->lk); - nni_msgq_close(p->sendq); - nni_aio_stop(p->aio_getq); nni_aio_stop(p->aio_send); nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_putq); nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); } static void -rep0_sock_getq_cb(void *arg) +rep0_pipe_send_cb(void *arg) { - rep0_sock *s = arg; - nni_msgq * uwq = s->uwq; + rep0_pipe *p = arg; + rep0_sock *s = p->rep; + rep0_ctx * ctx; + nni_aio * aio; nni_msg * msg; - uint32_t id; - rep0_pipe *p; - int rv; - - // This watches for messages from the upper write queue, - // extracts the destination pipe, and forwards it to the appropriate - // destination pipe via a separate queue. This prevents a single bad - // or slow pipe from gumming up the works for the entire socket. + size_t len; - if (nni_aio_result(s->aio_getq) != 0) { - // Closed socket? + nni_mtx_lock(&s->lk); + p->busy = false; + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + nni_mtx_unlock(&s->lk); return; } - - msg = nni_aio_get_msg(s->aio_getq); - nni_aio_set_msg(s->aio_getq, NULL); - - // We yank the outgoing pipe id from the header - if (nni_msg_header_len(msg) < 4) { - nni_msg_free(msg); - - // Look for another message on the upper write queue. - nni_msgq_aio_get(uwq, s->aio_getq); + if ((ctx = nni_list_first(&p->sendq)) == NULL) { + // Nothing else to send. + if (p->id == s->ctx->pipe_id) { + // Mark us ready for the other side to send! + nni_pollable_raise(s->sendable); + } + nni_mtx_unlock(&s->lk); return; } - id = nni_msg_header_trim_u32(msg); + nni_list_remove(&p->sendq, ctx); + aio = ctx->saio; + ctx->saio = NULL; + p->busy = true; + msg = nni_aio_get_msg(aio); + len = nni_msg_len(msg); + nni_aio_set_msg(aio, NULL); + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); - // Look for the pipe, and attempt to put the message there - // (nonblocking) if we can. If we can't for any reason, then we - // free the message. - // XXX: LOCKING?!?! - if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { - rv = nni_msgq_tryput(p->sendq, msg); - } - if (rv != 0) { - nni_msg_free(msg); - } + nni_mtx_unlock(&s->lk); - // Now look for another message on the upper write queue. - nni_msgq_aio_get(uwq, s->aio_getq); + nni_aio_finish_synch(aio, 0, len); } static void -rep0_pipe_getq_cb(void *arg) +rep0_cancel_recv(nni_aio *aio, int rv) { - rep0_pipe *p = arg; + rep0_ctx * ctx = nni_aio_get_prov_data(aio); + rep0_sock *s = ctx->sock; - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; + nni_mtx_lock(&s->lk); + if (ctx->raio == aio) { + nni_list_remove(&s->recvq, ctx); + ctx->raio = NULL; + nni_aio_finish_error(aio, rv); } - - nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - nni_pipe_send(p->pipe, p->aio_send); + nni_mtx_unlock(&s->lk); } static void -rep0_pipe_send_cb(void *arg) +rep0_ctx_recv(void *arg, nni_aio *aio) { - rep0_pipe *p = arg; + rep0_ctx * ctx = arg; + rep0_sock *s = ctx->sock; + rep0_pipe *p; + size_t len; + nni_msg * msg; - if (nni_aio_result(p->aio_send) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_send)); - nni_aio_set_msg(p->aio_send, NULL); - nni_pipe_stop(p->pipe); + nni_mtx_lock(&s->lk); + if (nni_aio_start(aio, rep0_cancel_recv, ctx) != 0) { + nni_mtx_unlock(&s->lk); + return; + } + if (ctx->closed) { + nni_mtx_unlock(&s->lk); + nni_aio_finish_error(aio, NNG_ECLOSED); return; } + if ((p = nni_list_first(&s->recvpipes)) == NULL) { + nni_pollable_clear(s->recvable); + ctx->raio = aio; + nni_list_append(&s->recvq, ctx); + nni_mtx_unlock(&s->lk); + return; + } + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_list_remove(&s->recvpipes, p); + if (nni_list_empty(&s->recvpipes)) { + nni_pollable_clear(s->recvable); + } + nni_pipe_recv(p->pipe, p->aio_recv); + + len = nni_msg_header_len(msg); + memcpy(ctx->btrace, nni_msg_header(msg), len); + ctx->btrace_len = len; + ctx->pipe_id = nni_pipe_id(p->pipe); + nni_mtx_unlock(&s->lk); - nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_msg_header_clear(msg); + nni_aio_set_msg(aio, msg); + nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void @@ -270,9 +491,12 @@ rep0_pipe_recv_cb(void *arg) { rep0_pipe *p = arg; rep0_sock *s = p->rep; + rep0_ctx * ctx; nni_msg * msg; int rv; uint8_t * body; + nni_aio * aio; + size_t len; int hops; if (nni_aio_result(p->aio_recv) != 0) { @@ -281,28 +505,22 @@ rep0_pipe_recv_cb(void *arg) } msg = nni_aio_get_msg(p->aio_recv); - nni_aio_set_msg(p->aio_recv, NULL); - nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - - // Store the pipe id in the header, first thing. - rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); - if (rv != 0) { - // Failure here causes us to drop the message. - goto drop; - } + nni_msg_set_pipe(msg, p->id); // Move backtrace from body to header hops = 1; for (;;) { int end = 0; - if (hops >= s->ttl) { + + if (hops > s->ttl) { // This isn't malformed, but it has gone through // too many hops. Do not disconnect, because we // can legitimately receive messages with too many // hops from devices, etc. goto drop; } + hops++; if (nni_msg_len(msg) < 4) { // Peer is speaking garbage. Kick it. nni_msg_free(msg); @@ -313,10 +531,7 @@ rep0_pipe_recv_cb(void *arg) end = (body[0] & 0x80) ? 1 : 0; rv = nni_msg_header_append(msg, body, 4); if (rv != 0) { - // Presumably this is due to out of memory. - // We could just discard and try again, but we - // just toss the connection for now. Given the - // out of memory situation, this is not unreasonable. + // Out of memory, so drop it. goto drop; } nni_msg_trim(msg, 4); @@ -325,28 +540,46 @@ rep0_pipe_recv_cb(void *arg) } } - // Go ahead and send it up. - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(s->urq, p->aio_putq); - return; + len = nni_msg_header_len(msg); -drop: - nni_msg_free(msg); + nni_mtx_lock(&s->lk); + + if ((ctx = nni_list_first(&s->recvq)) == NULL) { + // No one waiting to receive yet, holding pattern. + nni_list_append(&s->recvpipes, p); + nni_pollable_raise(s->recvable); + nni_mtx_unlock(&s->lk); + return; + } + + nni_list_remove(&s->recvq, ctx); + aio = ctx->raio; + ctx->raio = NULL; + nni_aio_set_msg(aio, msg); + nni_aio_set_msg(p->aio_recv, NULL); + + // schedule another receive nni_pipe_recv(p->pipe, p->aio_recv); -} -static void -rep0_pipe_putq_cb(void *arg) -{ - rep0_pipe *p = arg; + ctx->btrace_len = len; + memcpy(ctx->btrace, nni_msg_header(msg), len); + nni_msg_header_clear(msg); + ctx->pipe_id = p->id; - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; + // If we got a request on a pipe that wasn't busy, we should mark + // it sendable. (The sendable flag is not set when there is no + // request needing a reply.) + if ((ctx == s->ctx) && (!p->busy)) { + nni_pollable_raise(s->sendable); } + nni_mtx_unlock(&s->lk); + + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + return; + +drop: + nni_msg_free(msg); nni_pipe_recv(p->pipe, p->aio_recv); } @@ -354,6 +587,7 @@ static int rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { rep0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); } @@ -361,75 +595,43 @@ static int rep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) { rep0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); } -static nni_msg * -rep0_sock_filter(void *arg, nni_msg *msg) +static int +rep0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) { rep0_sock *s = arg; - char * header; - size_t len; - - nni_mtx_lock(&s->lk); + int rv; + int fd; - len = nni_msg_header_len(msg); - header = nni_msg_header(msg); - if (s->btrace != NULL) { - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - } - if ((s->btrace = nni_alloc(len)) == NULL) { - nni_msg_free(msg); - return (NULL); + if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + return (rv); } - s->btrace_len = len; - memcpy(s->btrace, header, len); - nni_msg_header_clear(msg); - nni_mtx_unlock(&s->lk); - return (msg); + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -rep0_sock_send_raw(void *arg, nni_aio *aio) +static int +rep0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { rep0_sock *s = arg; - nni_msgq_aio_put(s->uwq, aio); + int rv; + int fd; + + if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + return (rv); + } + + return (nni_copyout_int(fd, buf, szp, typ)); } static void rep0_sock_send(void *arg, nni_aio *aio) { rep0_sock *s = arg; - int rv; - nni_msg * msg; - - nni_mtx_lock(&s->lk); - if (s->btrace == NULL) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } - - msg = nni_aio_get_msg(aio); - - // drop anything else in the header... (it should already be - // empty, but there can be stale backtrace info there.) - nni_msg_header_clear(msg); - - if ((rv = nni_msg_header_append(msg, s->btrace, s->btrace_len)) != 0) { - nni_mtx_unlock(&s->lk); - nni_aio_finish_error(aio, rv); - return; - } - - nni_free(s->btrace, s->btrace_len); - s->btrace = NULL; - s->btrace_len = 0; - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); + rep0_ctx_send(s->ctx, aio); } static void @@ -437,7 +639,7 @@ rep0_sock_recv(void *arg, nni_aio *aio) { rep0_sock *s = arg; - nni_msgq_aio_get(s->urq, aio); + rep0_ctx_recv(s->ctx, aio); } // This is the global protocol structure -- our linkage to the core. @@ -449,6 +651,21 @@ static nni_proto_pipe_ops rep0_pipe_ops = { .pipe_stop = rep0_pipe_stop, }; +static nni_proto_ctx_option rep0_ctx_options[] = { + // terminate list + { + .co_name = NULL, + }, +}; + +static nni_proto_ctx_ops rep0_ctx_ops = { + .ctx_init = rep0_ctx_init, + .ctx_fini = rep0_ctx_fini, + .ctx_send = rep0_ctx_send, + .ctx_recv = rep0_ctx_recv, + .ctx_options = rep0_ctx_options, +}; + static nni_proto_sock_option rep0_sock_options[] = { { .pso_name = NNG_OPT_MAXTTL, @@ -456,6 +673,18 @@ static nni_proto_sock_option rep0_sock_options[] = { .pso_getopt = rep0_sock_getopt_maxttl, .pso_setopt = rep0_sock_setopt_maxttl, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = rep0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = rep0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -468,38 +697,18 @@ static nni_proto_sock_ops rep0_sock_ops = { .sock_open = rep0_sock_open, .sock_close = rep0_sock_close, .sock_options = rep0_sock_options, - .sock_filter = rep0_sock_filter, .sock_send = rep0_sock_send, .sock_recv = rep0_sock_recv, }; -static nni_proto_sock_ops rep0_sock_ops_raw = { - .sock_init = rep0_sock_init, - .sock_fini = rep0_sock_fini, - .sock_open = rep0_sock_open, - .sock_close = rep0_sock_close, - .sock_options = rep0_sock_options, - .sock_filter = NULL, // No filtering for raw mode - .sock_send = rep0_sock_send_raw, - .sock_recv = rep0_sock_recv, -}; - static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, .proto_peer = { NNI_PROTO_REQ_V0, "req" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &rep0_sock_ops, .proto_pipe_ops = &rep0_pipe_ops, -}; - -static nni_proto rep0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REP_V0, "rep" }, - .proto_peer = { NNI_PROTO_REQ_V0, "req" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &rep0_sock_ops_raw, - .proto_pipe_ops = &rep0_pipe_ops, + .proto_ctx_ops = &rep0_ctx_ops, }; int @@ -507,9 +716,3 @@ nng_rep0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &rep0_proto)); } - -int -nng_rep0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &rep0_proto_raw)); -} diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 4d35ca1f..8149ce08 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -28,34 +28,53 @@ typedef struct req0_pipe req0_pipe; typedef struct req0_sock req0_sock; +typedef struct req0_ctx req0_ctx; -static void req0_resend(req0_sock *); -static void req0_timeout(void *); +static void req0_run_sendq(req0_sock *, nni_list *); +static void req0_ctx_reset(req0_ctx *); +static void req0_ctx_timeout(void *); static void req0_pipe_fini(void *); +static void req0_ctx_fini(void *); +static int req0_ctx_init(void **, void *); + +// A req0_ctx is a "context" for the request. It uses most of the +// socket, but keeps track of its own outstanding replays, the request ID, +// and so forth. +struct req0_ctx { + nni_list_node snode; + nni_list_node sqnode; // node on the sendq + nni_list_node pnode; // node on the pipe list + uint32_t reqid; + req0_sock * sock; + nni_aio * raio; // user aio waiting to receive - only one! + nni_aio * saio; + nng_msg * reqmsg; // request message + size_t reqlen; + nng_msg * repmsg; // reply message + nni_timer_node timer; + nni_duration retry; +}; // A req0_sock is our per-socket protocol private structure. struct req0_sock { - nni_msgq * uwq; - nni_msgq * urq; + nni_sock * nsock; nni_duration retry; - nni_time resend; - bool raw; - bool wantw; bool closed; int ttl; - nni_msg * reqmsg; - req0_pipe *pendpipe; + req0_ctx *ctx; // base socket ctx nni_list readypipes; nni_list busypipes; + nni_list ctxs; - nni_timer_node timer; + nni_list sendq; // contexts waiting to send. + nni_idhash * reqids; // contexts by request ID + nni_pollable *recvable; + nni_pollable *sendable; - uint32_t nextid; // next id - uint8_t reqid[4]; // outstanding request ID (big endian) - nni_mtx mtx; - nni_cv cv; + nni_mtx mtx; + nni_cv cv; }; // A req0_pipe is our per-pipe protocol private structure. @@ -63,60 +82,61 @@ struct req0_pipe { nni_pipe * pipe; req0_sock * req; nni_list_node node; - nni_aio * aio_getq; // raw mode only - nni_aio * aio_sendraw; // raw mode only - nni_aio * aio_sendcooked; // cooked mode only + nni_list ctxs; // ctxs with pending traffic + nni_aio * aio_send; nni_aio * aio_recv; - nni_aio * aio_putq; - nni_mtx mtx; }; -static void req0_getq_cb(void *); -static void req0_sendraw_cb(void *); -static void req0_sendcooked_cb(void *); +static void req0_sock_fini(void *); +static void req0_send_cb(void *); static void req0_recv_cb(void *); -static void req0_putq_cb(void *); static int -req0_sock_init_impl(void **sp, nni_sock *sock, bool raw) +req0_sock_init(void **sp, nni_sock *sock) { req0_sock *s; + int rv; if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } + if ((rv = nni_idhash_init(&s->reqids)) != 0) { + NNI_FREE_STRUCT(s); + return (rv); + } + + // Request IDs are 32 bits, with the high order bit set. + // We start at a random point, to minimize likelihood of + // accidental collision across restarts. + nni_idhash_set_limits( + s->reqids, 0x80000000u, 0xffffffffu, nni_random() | 0x80000000u); + nni_mtx_init(&s->mtx); nni_cv_init(&s->cv, &s->mtx); NNI_LIST_INIT(&s->readypipes, req0_pipe, node); NNI_LIST_INIT(&s->busypipes, req0_pipe, node); - nni_timer_init(&s->timer, req0_timeout, s); + NNI_LIST_INIT(&s->sendq, req0_ctx, sqnode); + NNI_LIST_INIT(&s->ctxs, req0_ctx, snode); // this is "semi random" start for request IDs. - s->nextid = nni_random(); - s->retry = NNI_SECOND * 60; - s->reqmsg = NULL; - s->raw = raw; - s->wantw = false; - s->resend = NNI_TIME_ZERO; - s->ttl = 8; - s->uwq = nni_sock_sendq(sock); - s->urq = nni_sock_recvq(sock); - *sp = s; + s->nsock = sock; + s->retry = NNI_SECOND * 60; - return (0); -} + if ((rv = req0_ctx_init((void **) &s->ctx, s)) != 0) { + req0_sock_fini(s); + return (rv); + } + if (((rv = nni_pollable_alloc(&s->sendable)) != 0) || + ((rv = nni_pollable_alloc(&s->recvable)) != 0)) { + req0_sock_fini(s); + return (rv); + } -static int -req0_sock_init(void **sp, nni_sock *sock) -{ - return (req0_sock_init_impl(sp, sock, false)); -} + s->ttl = 8; + *sp = s; -static int -req0_sock_init_raw(void **sp, nni_sock *sock) -{ - return (req0_sock_init_impl(sp, sock, true)); + return (0); } static void @@ -129,12 +149,18 @@ static void req0_sock_close(void *arg) { req0_sock *s = arg; + req0_ctx * ctx; nni_mtx_lock(&s->mtx); s->closed = true; + NNI_LIST_FOREACH (&s->ctxs, ctx) { + if (ctx->raio != NULL) { + nni_aio_finish_error(ctx->raio, NNG_ECLOSED); + ctx->raio = NULL; + req0_ctx_reset(ctx); + } + } nni_mtx_unlock(&s->mtx); - - nni_timer_cancel(&s->timer); } static void @@ -147,10 +173,13 @@ req0_sock_fini(void *arg) (!nni_list_empty(&s->busypipes))) { nni_cv_wait(&s->cv); } - if (s->reqmsg != NULL) { - nni_msg_free(s->reqmsg); - } nni_mtx_unlock(&s->mtx); + if (s->ctx) { + req0_ctx_fini(s->ctx); + } + nni_pollable_free(s->recvable); + nni_pollable_free(s->sendable); + nni_idhash_fini(s->reqids); nni_cv_fini(&s->cv); nni_mtx_fini(&s->mtx); NNI_FREE_STRUCT(s); @@ -161,12 +190,8 @@ req0_pipe_fini(void *arg) { req0_pipe *p = arg; - nni_aio_fini(p->aio_getq); - nni_aio_fini(p->aio_putq); nni_aio_fini(p->aio_recv); - nni_aio_fini(p->aio_sendcooked); - nni_aio_fini(p->aio_sendraw); - nni_mtx_fini(&p->mtx); + nni_aio_fini(p->aio_send); NNI_FREE_STRUCT(p); } @@ -179,18 +204,14 @@ req0_pipe_init(void **pp, nni_pipe *pipe, void *s) if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { return (NNG_ENOMEM); } - nni_mtx_init(&p->mtx); - if (((rv = nni_aio_init(&p->aio_getq, req0_getq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_putq, req0_putq_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendraw, req0_sendraw_cb, p)) != 0) || - ((rv = nni_aio_init(&p->aio_sendcooked, req0_sendcooked_cb, p)) != - 0)) { + if (((rv = nni_aio_init(&p->aio_recv, req0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, req0_send_cb, p)) != 0)) { req0_pipe_fini(p); return (rv); } NNI_LIST_NODE_INIT(&p->node); + NNI_LIST_INIT(&p->ctxs, req0_ctx, pnode); p->pipe = pipe; p->req = s; *pp = p; @@ -213,14 +234,10 @@ req0_pipe_start(void *arg) return (NNG_ECLOSED); } nni_list_append(&s->readypipes, p); - // If sock was waiting for somewhere to send data, go ahead and - // send it to this pipe. - if (s->wantw) { - req0_resend(s); - } + nni_pollable_raise(s->sendable); + req0_run_sendq(s, NULL); nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->uwq, p->aio_getq); nni_pipe_recv(p->pipe, p->aio_recv); return (0); } @@ -230,12 +247,10 @@ req0_pipe_stop(void *arg) { req0_pipe *p = arg; req0_sock *s = p->req; + req0_ctx * ctx; - nni_aio_stop(p->aio_getq); - nni_aio_stop(p->aio_putq); nni_aio_stop(p->aio_recv); - nni_aio_stop(p->aio_sendcooked); - nni_aio_stop(p->aio_sendraw); + nni_aio_stop(p->aio_send); // At this point there should not be any further AIOs running. // Further, any completion tasks have completed. @@ -249,126 +264,54 @@ req0_pipe_stop(void *arg) nni_cv_wake(&s->cv); } } - - if ((p == s->pendpipe) && (s->reqmsg != NULL)) { - // removing the pipe we sent the last request on... - // schedule immediate resend. - s->pendpipe = NULL; - s->resend = NNI_TIME_ZERO; - s->wantw = true; - req0_resend(s); + if (nni_list_empty(&s->readypipes)) { + nni_pollable_clear(s->sendable); } - nni_mtx_unlock(&s->mtx); -} -static int -req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); -} - -static int -req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, typ)); -} - -static int -req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_ms(&s->retry, buf, sz, typ)); -} - -static int -req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_ms(s->retry, buf, szp, typ)); -} - -// Raw and cooked mode differ in the way they send messages out. -// -// For cooked mdes, we have a getq callback on the upper write queue, which -// when it finds a message, cancels any current processing, and saves a copy -// of the message, and then tries to "resend" the message, looking for a -// suitable available outgoing pipe. If no suitable pipe is available, -// a flag is set, so that as soon as such a pipe is available we trigger -// a resend attempt. We also trigger the attempt on either timeout, or if -// the underlying pipe we chose disconnects. -// -// For raw mode we can just let the pipes "contend" via getq to get a -// message from the upper write queue. The msgqueue implementation -// actually provides ordering, so load will be spread automatically. -// (NB: We may have to revise this in the future if we want to provide some -// kind of priority.) - -static void -req0_getq_cb(void *arg) -{ - req0_pipe *p = arg; - - // We should be in RAW mode. Cooked mode traffic bypasses - // the upper write queue entirely, and should never end up here. - // If the mode changes, we may briefly deliver a message, but - // that's ok (there's an inherent race anyway). (One minor - // exception: we wind up here in error state when the uwq is closed.) - - if (nni_aio_result(p->aio_getq) != 0) { - nni_pipe_stop(p->pipe); - return; + while ((ctx = nni_list_first(&p->ctxs)) != NULL) { + nni_list_remove(&p->ctxs, ctx); + // Reset the timer on this so it expires immediately. + // This is actually easier than canceling the timer and + // running the sendq separately. (In particular, it avoids + // a potential deadlock on cancelling the timer.) + nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO); } - - nni_aio_set_msg(p->aio_sendraw, nni_aio_get_msg(p->aio_getq)); - nni_aio_set_msg(p->aio_getq, NULL); - - // Send the message, but use the raw mode aio. - nni_pipe_send(p->pipe, p->aio_sendraw); + nni_mtx_unlock(&s->mtx); } -static void -req0_sendraw_cb(void *arg) -{ - req0_pipe *p = arg; - - if (nni_aio_result(p->aio_sendraw) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_sendraw)); - nni_aio_set_msg(p->aio_sendraw, NULL); - nni_pipe_stop(p->pipe); - return; - } - - // Sent a message so we just need to look for another one. - nni_msgq_aio_get(p->req->uwq, p->aio_getq); -} +// For cooked mode, we use a context, and send out that way. This +// completely bypasses the upper write queue. Each context keeps one +// message pending; these are "scheduled" via the sendq. The sendq +// is ordered, so FIFO ordering between contexts is provided for. static void -req0_sendcooked_cb(void *arg) +req0_send_cb(void *arg) { req0_pipe *p = arg; req0_sock *s = p->req; + nni_aio * aio; + nni_list aios; - if (nni_aio_result(p->aio_sendcooked) != 0) { + nni_aio_list_init(&aios); + if (nni_aio_result(p->aio_send) != 0) { // We failed to send... clean up and deal with it. - // We leave ourselves on the busy list for now, which - // means no new asynchronous traffic can occur here. - nni_msg_free(nni_aio_get_msg(p->aio_sendcooked)); - nni_aio_set_msg(p->aio_sendcooked, NULL); + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); nni_pipe_stop(p->pipe); return; } - // Cooked mode. We completed a cooked send, so we need to - // reinsert ourselves in the ready list, and possibly schedule - // a resend. + // We completed a cooked send, so we need to reinsert ourselves + // in the ready list, and re-run the sendq. nni_mtx_lock(&s->mtx); if (nni_list_active(&s->busypipes, p)) { nni_list_remove(&s->busypipes, p); nni_list_append(&s->readypipes, p); - req0_resend(s); + if (nni_list_empty(&s->sendq)) { + nni_pollable_raise(s->sendable); + } + req0_run_sendq(s, &aios); } else { // We wind up here if stop was called from the reader // side while we were waiting to be scheduled to run for the @@ -377,29 +320,22 @@ req0_sendcooked_cb(void *arg) nni_pipe_stop(p->pipe); } nni_mtx_unlock(&s->mtx); -} -static void -req0_putq_cb(void *arg) -{ - req0_pipe *p = arg; - - if (nni_aio_result(p->aio_putq) != 0) { - nni_msg_free(nni_aio_get_msg(p->aio_putq)); - nni_aio_set_msg(p->aio_putq, NULL); - nni_pipe_stop(p->pipe); - return; + while ((aio = nni_list_first(&aios)) != NULL) { + nni_list_remove(&aios, aio); + nni_aio_finish_synch(aio, 0, 0); } - nni_aio_set_msg(p->aio_putq, NULL); - - nni_pipe_recv(p->pipe, p->aio_recv); } static void req0_recv_cb(void *arg) { req0_pipe *p = arg; + req0_sock *s = p->req; + req0_ctx * ctx; nni_msg * msg; + nni_aio * aio; + uint32_t id; if (nni_aio_result(p->aio_recv) != 0) { nni_pipe_stop(p->pipe); @@ -410,22 +346,58 @@ req0_recv_cb(void *arg) nni_aio_set_msg(p->aio_recv, NULL); nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); - // We yank 4 bytes of body, and move them to the header. + // We yank 4 bytes from front of body, and move them to the header. if (nni_msg_len(msg) < 4) { // Malformed message. goto malformed; } - if (nni_msg_header_append(msg, nni_msg_body(msg), 4) != 0) { + id = nni_msg_trim_u32(msg); + if (nni_msg_header_append_u32(msg, id) != 0) { // Arguably we could just discard and carry on. But // dropping the connection is probably more helpful since // it lets the other side see that a problem occurred. // Plus it gives us a chance to reclaim some memory. goto malformed; } - (void) nni_msg_trim(msg, 4); // Cannot fail - nni_aio_set_msg(p->aio_putq, msg); - nni_msgq_aio_put(p->req->urq, p->aio_putq); + // Schedule another receive while we are processing this. + nni_mtx_lock(&s->mtx); + nni_pipe_recv(p->pipe, p->aio_recv); + + // Look for a context to receive it. + if ((nni_idhash_find(s->reqids, id, (void **) &ctx) != 0) || + (ctx->saio != NULL) || (ctx->repmsg != NULL)) { + nni_mtx_unlock(&s->mtx); + // No waiting context, we have not sent the request out to + // the wire yet, or context already has a reply ready. + // Discard the message. + nni_msg_free(msg); + return; + } + + // We have our match, so we can remove this. + nni_list_node_remove(&ctx->sqnode); + nni_idhash_remove(s->reqids, id); + ctx->reqid = 0; + if (ctx->reqmsg != NULL) { + nni_msg_free(ctx->reqmsg); + ctx->reqmsg = NULL; + } + + // Is there an aio waiting for us? + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_mtx_unlock(&s->mtx); + nni_aio_set_msg(aio, msg); + nni_aio_finish_synch(aio, 0, nni_msg_len(msg)); + } else { + // No AIO, so stash msg. Receive will pick it up later. + ctx->repmsg = msg; + if (ctx == s->ctx) { + nni_pollable_raise(s->recvable); + } + nni_mtx_unlock(&s->mtx); + } return; malformed: @@ -434,191 +406,417 @@ malformed: } static void -req0_timeout(void *arg) +req0_ctx_timeout(void *arg) { - req0_sock *s = arg; + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); - if (s->reqmsg != NULL) { - s->wantw = true; - req0_resend(s); + if ((ctx->reqmsg != NULL) && (!s->closed)) { + if (!nni_list_node_active(&ctx->sqnode)) { + nni_list_append(&s->sendq, ctx); + } + req0_run_sendq(s, NULL); } nni_mtx_unlock(&s->mtx); } -static void -req0_resend(req0_sock *s) +static int +req0_ctx_init(void **cpp, void *sarg) { - req0_pipe *p; - nni_msg * msg; + req0_sock *s = sarg; + req0_ctx * ctx; - // Note: This routine should be called with the socket lock held. - // Also, this should only be called while handling cooked mode - // requests. - if ((msg = s->reqmsg) == NULL) { - return; + if ((ctx = NNI_ALLOC_STRUCT(ctx)) == NULL) { + return (NNG_ENOMEM); } - if (s->closed) { - s->reqmsg = NULL; - nni_msg_free(msg); + nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx); + + nni_mtx_lock(&s->mtx); + ctx->sock = s; + ctx->raio = NULL; + ctx->retry = s->retry; + nni_list_append(&s->ctxs, ctx); + nni_mtx_unlock(&s->mtx); + + *cpp = ctx; + return (0); +} + +static void +req0_ctx_fini(void *arg) +{ + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; + nni_aio * aio; + + nni_mtx_lock(&s->mtx); + if ((aio = ctx->raio) != NULL) { + ctx->raio = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); } + if ((aio = ctx->saio) != NULL) { + ctx->saio = NULL; + nni_aio_set_msg(aio, ctx->reqmsg); + ctx->reqmsg = NULL; + nni_aio_finish_error(aio, NNG_ECLOSED); + } + req0_ctx_reset(ctx); + nni_list_remove(&s->ctxs, ctx); + nni_mtx_unlock(&s->mtx); - if (s->wantw) { - s->wantw = false; + nni_timer_cancel(&ctx->timer); + nni_timer_fini(&ctx->timer); - if (nni_msg_dup(&msg, s->reqmsg) != 0) { - // Failed to alloc message, reschedule it. Also, - // mark that we have a message we want to resend, - // in case something comes available. - s->wantw = true; - nni_timer_schedule(&s->timer, nni_clock() + s->retry); - return; - } + NNI_FREE_STRUCT(ctx); +} + +static int +req0_ctx_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ) +{ + req0_ctx *ctx = arg; + return (nni_copyin_ms(&ctx->retry, buf, sz, typ)); +} + +static int +req0_ctx_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ) +{ + req0_ctx *ctx = arg; + return (nni_copyout_ms(ctx->retry, buf, szp, typ)); +} + +static void +req0_run_sendq(req0_sock *s, nni_list *aiolist) +{ + req0_ctx *ctx; + nni_aio * aio; + + // Note: This routine should be called with the socket lock held. + while ((ctx = nni_list_first(&s->sendq)) != NULL) { + nni_msg * msg; + req0_pipe *p; - // Now we iterate across all possible outpipes, until - // one accepts it. if ((p = nni_list_first(&s->readypipes)) == NULL) { - // No pipes ready to process us. Note that we have - // something to send, and schedule it. - nni_msg_free(msg); - s->wantw = true; return; } + // We have a place to send it, so do the send. + // If a sending error occurs that causes the message to + // be dropped, we rely on the resend timer to pick it up. + // We also notify the completion callback if this is the + // first send attempt. + nni_list_remove(&s->sendq, ctx); + + // Schedule a resubmit timer. We only do this if we got + // a pipe to send to. Otherwise, we should get handled + // the next time that the sendq is run. + nni_timer_schedule(&ctx->timer, nni_clock() + ctx->retry); + + if (nni_msg_dup(&msg, ctx->reqmsg) != 0) { + // Oops. Well, keep trying each context; maybe + // one of them will get lucky. + continue; + } + + // Put us on the pipe list of active contexts. + // This gives the pipe a chance to kick a resubmit + // if the pipe is removed. + nni_list_node_remove(&ctx->pnode); + nni_list_append(&p->ctxs, ctx); + nni_list_remove(&s->readypipes, p); nni_list_append(&s->busypipes, p); - s->pendpipe = p; - s->resend = nni_clock() + s->retry; - nni_aio_set_msg(p->aio_sendcooked, msg); + if ((aio = ctx->saio) != NULL) { + ctx->saio = NULL; + nni_aio_bump_count(aio, ctx->reqlen); + // If the list was passed in, we want to do a + // synchronous completion later. + if (aiolist != NULL) { + nni_list_append(aiolist, aio); + } else { + nni_aio_finish(aio, 0, 0); + } + if (ctx == s->ctx) { + if (nni_list_empty(&s->readypipes)) { + nni_pollable_clear(s->sendable); + } else { + nni_pollable_raise(s->sendable); + } + } + } + + nni_aio_set_msg(p->aio_send, msg); + nni_pipe_send(p->pipe, p->aio_send); + } +} - // Note that because we were ready rather than busy, we - // should not have any I/O oustanding and hence the aio - // object will be available for our use. - nni_pipe_send(p->pipe, p->aio_sendcooked); - nni_timer_schedule(&s->timer, s->resend); +void +req0_ctx_reset(req0_ctx *ctx) +{ + req0_sock *s = ctx->sock; + // Call with sock lock held! + + // We cannot safely "wait" using nni_timer_cancel, but this removes + // any scheduled timer activation. If the timeout is already running + // concurrently, it will still run. It should do nothing, because + // we toss the reqmsg. There is still a very narrow race if the + // timeout fires, but doesn't actually start running before we + // both finish this function, *and* manage to reschedule another + // request. The consequence of that occurring is that the request + // will be emitted on the wire twice. This is not actually tragic. + nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER); + + nni_list_node_remove(&ctx->pnode); + nni_list_node_remove(&ctx->sqnode); + if (ctx->reqid != 0) { + nni_idhash_remove(s->reqids, ctx->reqid); + ctx->reqid = 0; + } + if (ctx->reqmsg != NULL) { + nni_msg_free(ctx->reqmsg); + ctx->reqmsg = NULL; + } + if (ctx->repmsg != NULL) { + nni_msg_free(ctx->repmsg); + ctx->repmsg = NULL; } } static void -req0_sock_send(void *arg, nni_aio *aio) +req0_ctx_cancel_recv(nni_aio *aio, int rv) { - req0_sock *s = arg; - uint32_t id; - size_t len; - nni_msg * msg; - int rv; + req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_sock *s = ctx->sock; nni_mtx_lock(&s->mtx); + if (ctx->raio != aio) { + // already completed, ignore this. + nni_mtx_unlock(&s->mtx); + return; + } + ctx->raio = NULL; - msg = nni_aio_get_msg(aio); - len = nni_msg_len(msg); + // Cancellation of a pending receive is treated as aborting the + // entire state machine. This allows us to preserve the semantic of + // exactly one receive operation per send operation, and should + // be the least surprising for users. The main consequence is that + // if a receive operation is completed (in error or otherwise), the + // user must submit a new send operation to restart the state machine. + req0_ctx_reset(ctx); - // In cooked mode, because we need to manage our own resend logic, - // we bypass the upper writeq entirely. + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); +} - // Generate a new request ID. We always set the high - // order bit so that the peer can locate the end of the - // backtrace. (Pipe IDs have the high order bit clear.) - id = (s->nextid++) | 0x80000000u; - // Request ID is in big endian format. - NNI_PUT32(s->reqid, id); +static void +req0_ctx_recv(void *arg, nni_aio *aio) +{ + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; + nni_msg * msg; - if ((rv = nni_msg_header_append(msg, s->reqid, 4)) != 0) { + nni_mtx_lock(&s->mtx); + if (nni_aio_start(aio, req0_ctx_cancel_recv, ctx) != 0) { nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, rv); return; } - - // If another message is there, this cancels it. - if (s->reqmsg != NULL) { - nni_msg_free(s->reqmsg); - s->reqmsg = NULL; + if (s->closed) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((ctx->raio != NULL) || + ((ctx->reqmsg == NULL) && (ctx->repmsg == NULL))) { + // We have already got a pending receive or have not + // tried to send a request yet. + // Either of these violate our basic state assumptions. + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } - nni_aio_set_msg(aio, NULL); - - // Make a duplicate message... for retries. - s->reqmsg = msg; - // Schedule for immediate send - s->resend = NNI_TIME_ZERO; - s->wantw = true; + if ((msg = ctx->repmsg) == NULL) { + ctx->raio = aio; + nni_mtx_unlock(&s->mtx); + return; + } - req0_resend(s); + ctx->repmsg = NULL; + // We have got a message to pass up, yay! + nni_aio_set_msg(aio, msg); + if (ctx == s->ctx) { + nni_pollable_clear(s->recvable); + } nni_mtx_unlock(&s->mtx); - - nni_aio_finish(aio, 0, len); + nni_aio_finish(aio, 0, nni_msg_len(msg)); } static void -req0_sock_send_raw(void *arg, nni_aio *aio) +req0_ctx_cancel_send(nni_aio *aio, int rv) { - req0_sock *s = arg; + req0_ctx * ctx = nni_aio_get_prov_data(aio); + req0_sock *s = ctx->sock; + + nni_mtx_lock(&s->mtx); + if (ctx->saio != aio) { + // already completed, ignore this. + nni_mtx_unlock(&s->mtx); + return; + } - nni_msgq_aio_put(s->uwq, aio); + // There should not be a pending reply, because we canceled + // it while we were waiting. + NNI_ASSERT(ctx->raio == NULL); + ctx->saio = NULL; + // Restore the message back to the aio. + nni_aio_set_msg(aio, ctx->reqmsg); + nni_msg_header_clear(ctx->reqmsg); + ctx->reqmsg = NULL; + + // Cancellation of a pending receive is treated as aborting the + // entire state machine. This allows us to preserve the semantic of + // exactly one receive operation per send operation, and should + // be the least surprising for users. The main consequence is that + // if a receive operation is completed (in error or otherwise), the + // user must submit a new send operation to restart the state machine. + req0_ctx_reset(ctx); + + nni_aio_finish_error(aio, rv); + nni_mtx_unlock(&s->mtx); } -static nni_msg * -req0_sock_filter(void *arg, nni_msg *msg) +static void +req0_ctx_send(void *arg, nni_aio *aio) { - req0_sock *s = arg; - nni_msg * rmsg; + req0_ctx * ctx = arg; + req0_sock *s = ctx->sock; + nng_msg * msg = nni_aio_get_msg(aio); + uint64_t id; + int rv; nni_mtx_lock(&s->mtx); - - if (nni_msg_header_len(msg) < 4) { + // Even though we always complete synchronously, this guards against + // restarting a request that was stopped. + if (nni_aio_start(aio, req0_ctx_cancel_send, ctx) != 0) { nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); + return; + } + // Sending a new requst cancels the old one, including any + // outstanding reply. + if (ctx->raio != NULL) { + nni_aio_finish_error(ctx->raio, NNG_ECANCELED); + ctx->raio = NULL; + } + if (ctx->saio != NULL) { + nni_aio_set_msg(ctx->saio, ctx->reqmsg); + nni_msg_header_clear(ctx->reqmsg); + ctx->reqmsg = NULL; + nni_aio_finish_error(ctx->saio, NNG_ECANCELED); + ctx->saio = NULL; + nni_list_remove(&s->sendq, ctx); } - if ((rmsg = s->reqmsg) == NULL) { - // We had no outstanding request. (Perhaps canceled, - // or duplicate response.) + // This resets the entire state machine. + req0_ctx_reset(ctx); + + // Insert us on the per ID hash list, so that receives can find us. + if ((rv = nni_idhash_alloc(s->reqids, &id, ctx)) != 0) { nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); + nni_aio_finish_error(aio, rv); + return; } - - if (memcmp(nni_msg_header(msg), s->reqid, 4) != 0) { - // Wrong request id. + ctx->reqid = (uint32_t) id; + if ((rv = nni_msg_header_append_u32(msg, ctx->reqid)) != 0) { + nni_idhash_remove(s->reqids, id); nni_mtx_unlock(&s->mtx); - nni_msg_free(msg); - return (NULL); + nni_aio_finish_error(aio, rv); + return; } + ctx->reqlen = nni_msg_len(msg); + ctx->reqmsg = msg; + ctx->saio = aio; + nni_aio_set_msg(aio, NULL); - s->reqmsg = NULL; - s->pendpipe = NULL; - nni_mtx_unlock(&s->mtx); + // Stick us on the sendq list. + nni_list_append(&s->sendq, ctx); - nni_msg_free(rmsg); + req0_run_sendq(s, NULL); + nni_mtx_unlock(&s->mtx); +} - return (msg); +static void +req0_sock_send(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + req0_ctx_send(s->ctx, aio); } static void req0_sock_recv(void *arg, nni_aio *aio) { req0_sock *s = arg; + req0_ctx_recv(s->ctx, aio); +} - nni_mtx_lock(&s->mtx); - if (s->reqmsg == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; +static int +req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +{ + req0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); +} + +static int +req0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) +{ + req0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); +} + +static int +req0_sock_setopt_resendtime(void *arg, const void *buf, size_t sz, int typ) +{ + req0_sock *s = arg; + int rv; + rv = req0_ctx_setopt_resendtime(s->ctx, buf, sz, typ); + s->retry = s->ctx->retry; + return (rv); +} + +static int +req0_sock_getopt_resendtime(void *arg, void *buf, size_t *szp, int typ) +{ + req0_sock *s = arg; + return (req0_ctx_getopt_resendtime(s->ctx, buf, szp, typ)); +} + +static int +req0_sock_getopt_sendfd(void *arg, void *buf, size_t *szp, int typ) +{ + req0_sock *s = arg; + int rv; + int fd; + + if ((rv = nni_pollable_getfd(s->sendable, &fd)) != 0) { + return (rv); } - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_get(s->urq, aio); + return (nni_copyout_int(fd, buf, szp, typ)); } -static void -req0_sock_recv_raw(void *arg, nni_aio *aio) +static int +req0_sock_getopt_recvfd(void *arg, void *buf, size_t *szp, int typ) { req0_sock *s = arg; + int rv; + int fd; - nni_msgq_aio_get(s->urq, aio); + if ((rv = nni_pollable_getfd(s->recvable, &fd)) != 0) { + return (rv); + } + + return (nni_copyout_int(fd, buf, szp, typ)); } static nni_proto_pipe_ops req0_pipe_ops = { @@ -628,6 +826,26 @@ static nni_proto_pipe_ops req0_pipe_ops = { .pipe_stop = req0_pipe_stop, }; +static nni_proto_ctx_option req0_ctx_options[] = { + { + .co_name = NNG_OPT_REQ_RESENDTIME, + .co_type = NNI_TYPE_DURATION, + .co_getopt = req0_ctx_getopt_resendtime, + .co_setopt = req0_ctx_setopt_resendtime, + }, + { + .co_name = NULL, + }, +}; + +static nni_proto_ctx_ops req0_ctx_ops = { + .ctx_init = req0_ctx_init, + .ctx_fini = req0_ctx_fini, + .ctx_recv = req0_ctx_recv, + .ctx_send = req0_ctx_send, + .ctx_options = req0_ctx_options, +}; + static nni_proto_sock_option req0_sock_options[] = { { .pso_name = NNG_OPT_MAXTTL, @@ -641,6 +859,18 @@ static nni_proto_sock_option req0_sock_options[] = { .pso_getopt = req0_sock_getopt_resendtime, .pso_setopt = req0_sock_setopt_resendtime, }, + { + .pso_name = NNG_OPT_RECVFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = req0_sock_getopt_recvfd, + .pso_setopt = NULL, + }, + { + .pso_name = NNG_OPT_SENDFD, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = req0_sock_getopt_sendfd, + .pso_setopt = NULL, + }, // terminate list { .pso_name = NULL, @@ -653,7 +883,6 @@ static nni_proto_sock_ops req0_sock_ops = { .sock_open = req0_sock_open, .sock_close = req0_sock_close, .sock_options = req0_sock_options, - .sock_filter = req0_sock_filter, .sock_send = req0_sock_send, .sock_recv = req0_sock_recv, }; @@ -662,9 +891,10 @@ static nni_proto req0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REQ_V0, "req" }, .proto_peer = { NNI_PROTO_REP_V0, "rep" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_NOMSGQ, .proto_sock_ops = &req0_sock_ops, .proto_pipe_ops = &req0_pipe_ops, + .proto_ctx_ops = &req0_ctx_ops, }; int @@ -672,28 +902,3 @@ nng_req0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &req0_proto)); } - -static nni_proto_sock_ops req0_sock_ops_raw = { - .sock_init = req0_sock_init_raw, - .sock_fini = req0_sock_fini, - .sock_open = req0_sock_open, - .sock_close = req0_sock_close, - .sock_options = req0_sock_options, - .sock_send = req0_sock_send_raw, - .sock_recv = req0_sock_recv_raw, -}; - -static nni_proto req0_proto_raw = { - .proto_version = NNI_PROTOCOL_VERSION, - .proto_self = { NNI_PROTO_REQ_V0, "req" }, - .proto_peer = { NNI_PROTO_REP_V0, "rep" }, - .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, - .proto_sock_ops = &req0_sock_ops_raw, - .proto_pipe_ops = &req0_pipe_ops, -}; - -int -nng_req0_open_raw(nng_socket *sidp) -{ - return (nni_proto_open(sidp, &req0_proto_raw)); -} \ No newline at end of file diff --git a/src/protocol/reqrep0/xrep.c b/src/protocol/reqrep0/xrep.c new file mode 100644 index 00000000..f7189453 --- /dev/null +++ b/src/protocol/reqrep0/xrep.c @@ -0,0 +1,434 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include + +#include "core/nng_impl.h" +#include "protocol/reqrep0/rep.h" + +// Response protocol in raw mode. The REP protocol is the "reply" side of a +// request-reply pair. This is useful for building RPC servers, for +// example. + +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif + +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif + +typedef struct xrep0_pipe xrep0_pipe; +typedef struct xrep0_sock xrep0_sock; + +static void xrep0_sock_getq_cb(void *); +static void xrep0_pipe_getq_cb(void *); +static void xrep0_pipe_putq_cb(void *); +static void xrep0_pipe_send_cb(void *); +static void xrep0_pipe_recv_cb(void *); +static void xrep0_pipe_fini(void *); + +// xrep0_sock is our per-socket protocol private structure. +struct xrep0_sock { + nni_msgq * uwq; + nni_msgq * urq; + nni_mtx lk; + int ttl; + nni_idhash *pipes; + nni_aio * aio_getq; +}; + +// xrep0_pipe is our per-pipe protocol private structure. +struct xrep0_pipe { + nni_pipe * pipe; + xrep0_sock *rep; + nni_msgq * sendq; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; +}; + +static void +xrep0_sock_fini(void *arg) +{ + xrep0_sock *s = arg; + + nni_aio_stop(s->aio_getq); + nni_aio_fini(s->aio_getq); + nni_idhash_fini(s->pipes); + nni_mtx_fini(&s->lk); + NNI_FREE_STRUCT(s); +} + +static int +xrep0_sock_init(void **sp, nni_sock *sock) +{ + xrep0_sock *s; + int rv; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + nni_mtx_init(&s->lk); + if (((rv = nni_idhash_init(&s->pipes)) != 0) || + ((rv = nni_aio_init(&s->aio_getq, xrep0_sock_getq_cb, s)) != 0)) { + xrep0_sock_fini(s); + return (rv); + } + + s->ttl = 8; // Per RFC + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + + *sp = s; + + return (0); +} + +static void +xrep0_sock_open(void *arg) +{ + xrep0_sock *s = arg; + + // This starts us retrieving message from the upper write q. + nni_msgq_aio_get(s->uwq, s->aio_getq); +} + +static void +xrep0_sock_close(void *arg) +{ + xrep0_sock *s = arg; + + nni_aio_abort(s->aio_getq, NNG_ECLOSED); +} + +static void +xrep0_pipe_fini(void *arg) +{ + xrep0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_send); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_putq); + nni_msgq_fini(p->sendq); + NNI_FREE_STRUCT(p); +} + +static int +xrep0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + xrep0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + + // We want a pretty deep sendq on pipes. The rationale here is + // that the send rate will be mitigated by the receive rate. + // If a slow pipe (req pipe not reading its own responses!?) + // comes up, then we will start discarding its replies eventually, + // but it takes some time. It would be poor form for a peer to + // smash us with requests, but be unable to handle replies faster + // than we can forward them. If they do that, their replies get + // dropped. (From a DDoS perspective, it might be nice in the + // future if we had a way to exert backpressure to the send side -- + // essentially don't let peers send requests faster than they are + // willing to receive replies. Something to think about for the + // future.) + if (((rv = nni_msgq_init(&p->sendq, 64)) != 0) || + ((rv = nni_aio_init(&p->aio_getq, xrep0_pipe_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, xrep0_pipe_send_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, xrep0_pipe_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, xrep0_pipe_putq_cb, p)) != 0)) { + xrep0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->rep = s; + *pp = p; + return (0); +} + +static int +xrep0_pipe_start(void *arg) +{ + xrep0_pipe *p = arg; + xrep0_sock *s = p->rep; + int rv; + + if ((rv = nni_idhash_insert(s->pipes, nni_pipe_id(p->pipe), p)) != 0) { + return (rv); + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +xrep0_pipe_stop(void *arg) +{ + xrep0_pipe *p = arg; + xrep0_sock *s = p->rep; + + nni_msgq_close(p->sendq); + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_send); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_putq); + + nni_idhash_remove(s->pipes, nni_pipe_id(p->pipe)); +} + +static void +xrep0_sock_getq_cb(void *arg) +{ + xrep0_sock *s = arg; + nni_msgq * uwq = s->uwq; + nni_msg * msg; + uint32_t id; + xrep0_pipe *p; + int rv; + + // This watches for messages from the upper write queue, + // extracts the destination pipe, and forwards it to the appropriate + // destination pipe via a separate queue. This prevents a single bad + // or slow pipe from gumming up the works for the entire socket. + + if (nni_aio_result(s->aio_getq) != 0) { + // Closed socket? + return; + } + + msg = nni_aio_get_msg(s->aio_getq); + nni_aio_set_msg(s->aio_getq, NULL); + + // We yank the outgoing pipe id from the header + if (nni_msg_header_len(msg) < 4) { + nni_msg_free(msg); + + // Look for another message on the upper write queue. + nni_msgq_aio_get(uwq, s->aio_getq); + return; + } + + id = nni_msg_header_trim_u32(msg); + + // Look for the pipe, and attempt to put the message there + // (nonblocking) if we can. If we can't for any reason, then we + // free the message. + if ((rv = nni_idhash_find(s->pipes, id, (void **) &p)) == 0) { + rv = nni_msgq_tryput(p->sendq, msg); + } + if (rv != 0) { + nni_msg_free(msg); + } + + // Now look for another message on the upper write queue. + nni_msgq_aio_get(uwq, s->aio_getq); +} + +static void +xrep0_pipe_getq_cb(void *arg) +{ + xrep0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->pipe, p->aio_send); +} + +static void +xrep0_pipe_send_cb(void *arg) +{ + xrep0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_msgq_aio_get(p->sendq, p->aio_getq); +} + +static void +xrep0_pipe_recv_cb(void *arg) +{ + xrep0_pipe *p = arg; + xrep0_sock *s = p->rep; + nni_msg * msg; + int rv; + uint8_t * body; + int hops; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // Store the pipe id in the header, first thing. + rv = nni_msg_header_append_u32(msg, nni_pipe_id(p->pipe)); + if (rv != 0) { + // Failure here causes us to drop the message. + goto drop; + } + + // Move backtrace from body to header + hops = 1; + for (;;) { + int end = 0; + if (hops > s->ttl) { + // This isn't malformed, but it has gone through + // too many hops. Do not disconnect, because we + // can legitimately receive messages with too many + // hops from devices, etc. + goto drop; + } + hops++; + if (nni_msg_len(msg) < 4) { + // Peer is speaking garbage. Kick it. + nni_msg_free(msg); + nni_pipe_stop(p->pipe); + return; + } + body = nni_msg_body(msg); + end = (body[0] & 0x80) ? 1 : 0; + rv = nni_msg_header_append(msg, body, 4); + if (rv != 0) { + // Out of memory most likely, but keep going to + // avoid breaking things. + goto drop; + } + nni_msg_trim(msg, 4); + if (end) { + break; + } + } + + // Go ahead and send it up. + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(s->urq, p->aio_putq); + return; + +drop: + nni_msg_free(msg); + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +xrep0_pipe_putq_cb(void *arg) +{ + xrep0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static int +xrep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +{ + xrep0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); +} + +static int +xrep0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) +{ + xrep0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); +} + +static void +xrep0_sock_send(void *arg, nni_aio *aio) +{ + xrep0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +xrep0_sock_recv(void *arg, nni_aio *aio) +{ + xrep0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +// This is the global protocol structure -- our linkage to the core. +// This should be the only global non-static symbol in this file. +static nni_proto_pipe_ops xrep0_pipe_ops = { + .pipe_init = xrep0_pipe_init, + .pipe_fini = xrep0_pipe_fini, + .pipe_start = xrep0_pipe_start, + .pipe_stop = xrep0_pipe_stop, +}; + +static nni_proto_sock_option xrep0_sock_options[] = { + { + .pso_name = NNG_OPT_MAXTTL, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = xrep0_sock_getopt_maxttl, + .pso_setopt = xrep0_sock_setopt_maxttl, + }, + // terminate list + { + .pso_name = NULL, + }, +}; + +static nni_proto_sock_ops xrep0_sock_ops = { + .sock_init = xrep0_sock_init, + .sock_fini = xrep0_sock_fini, + .sock_open = xrep0_sock_open, + .sock_close = xrep0_sock_close, + .sock_options = xrep0_sock_options, + .sock_filter = NULL, // No filtering for raw mode + .sock_send = xrep0_sock_send, + .sock_recv = xrep0_sock_recv, +}; + +static nni_proto xrep0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REP_V0, "rep" }, + .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xrep0_sock_ops, + .proto_pipe_ops = &xrep0_pipe_ops, +}; + +int +nng_rep0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xrep0_proto)); +} diff --git a/src/protocol/reqrep0/xreq.c b/src/protocol/reqrep0/xreq.c new file mode 100644 index 00000000..5c1841b2 --- /dev/null +++ b/src/protocol/reqrep0/xreq.c @@ -0,0 +1,324 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include +#include +#include + +#include "core/nng_impl.h" +#include "protocol/reqrep0/req.h" + +// Request protocol. The REQ protocol is the "request" side of a +// request-reply pair. This is useful for building RPC clients, for example. + +#ifndef NNI_PROTO_REQ_V0 +#define NNI_PROTO_REQ_V0 NNI_PROTO(3, 0) +#endif + +#ifndef NNI_PROTO_REP_V0 +#define NNI_PROTO_REP_V0 NNI_PROTO(3, 1) +#endif + +typedef struct xreq0_pipe xreq0_pipe; +typedef struct xreq0_sock xreq0_sock; + +// An xreq0_sock is our per-socket protocol private structure. +struct xreq0_sock { + nni_msgq *uwq; + nni_msgq *urq; + int ttl; +}; + +// A req0_pipe is our per-pipe protocol private structure. +struct xreq0_pipe { + nni_pipe * pipe; + xreq0_sock *req; + nni_aio * aio_getq; + nni_aio * aio_send; + nni_aio * aio_recv; + nni_aio * aio_putq; +}; + +static void xreq0_sock_fini(void *); +static void xreq0_getq_cb(void *); +static void xreq0_send_cb(void *); +static void xreq0_recv_cb(void *); +static void xreq0_putq_cb(void *); + +static int +xreq0_sock_init(void **sp, nni_sock *sock) +{ + xreq0_sock *s; + + if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { + return (NNG_ENOMEM); + } + + s->ttl = 8; + s->uwq = nni_sock_sendq(sock); + s->urq = nni_sock_recvq(sock); + *sp = s; + + return (0); +} + +static void +xreq0_sock_open(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +xreq0_sock_close(void *arg) +{ + NNI_ARG_UNUSED(arg); +} + +static void +xreq0_sock_fini(void *arg) +{ + xreq0_sock *s = arg; + + NNI_FREE_STRUCT(s); +} + +static void +xreq0_pipe_fini(void *arg) +{ + xreq0_pipe *p = arg; + + nni_aio_fini(p->aio_getq); + nni_aio_fini(p->aio_putq); + nni_aio_fini(p->aio_recv); + nni_aio_fini(p->aio_send); + NNI_FREE_STRUCT(p); +} + +static int +xreq0_pipe_init(void **pp, nni_pipe *pipe, void *s) +{ + xreq0_pipe *p; + int rv; + + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if (((rv = nni_aio_init(&p->aio_getq, xreq0_getq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_putq, xreq0_putq_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_recv, xreq0_recv_cb, p)) != 0) || + ((rv = nni_aio_init(&p->aio_send, xreq0_send_cb, p)) != 0)) { + xreq0_pipe_fini(p); + return (rv); + } + + p->pipe = pipe; + p->req = s; + *pp = p; + return (0); +} + +static int +xreq0_pipe_start(void *arg) +{ + xreq0_pipe *p = arg; + xreq0_sock *s = p->req; + + if (nni_pipe_peer(p->pipe) != NNI_PROTO_REP_V0) { + return (NNG_EPROTO); + } + + nni_msgq_aio_get(s->uwq, p->aio_getq); + nni_pipe_recv(p->pipe, p->aio_recv); + return (0); +} + +static void +xreq0_pipe_stop(void *arg) +{ + xreq0_pipe *p = arg; + + nni_aio_stop(p->aio_getq); + nni_aio_stop(p->aio_putq); + nni_aio_stop(p->aio_recv); + nni_aio_stop(p->aio_send); + + // At this point there should not be any further AIOs running. + // Further, any completion tasks have completed. +} + +// For raw mode we can just let the pipes "contend" via getq to get a +// message from the upper write queue. The msgqueue implementation +// actually provides ordering, so load will be spread automatically. +// (NB: We may have to revise this in the future if we want to provide some +// kind of priority.) + +static void +xreq0_getq_cb(void *arg) +{ + xreq0_pipe *p = arg; + + if (nni_aio_result(p->aio_getq) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + nni_aio_set_msg(p->aio_send, nni_aio_get_msg(p->aio_getq)); + nni_aio_set_msg(p->aio_getq, NULL); + + nni_pipe_send(p->pipe, p->aio_send); +} + +static void +xreq0_send_cb(void *arg) +{ + xreq0_pipe *p = arg; + + if (nni_aio_result(p->aio_send) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_send)); + nni_aio_set_msg(p->aio_send, NULL); + nni_pipe_stop(p->pipe); + return; + } + + // Sent a message so we just need to look for another one. + nni_msgq_aio_get(p->req->uwq, p->aio_getq); +} + +static void +xreq0_putq_cb(void *arg) +{ + xreq0_pipe *p = arg; + + if (nni_aio_result(p->aio_putq) != 0) { + nni_msg_free(nni_aio_get_msg(p->aio_putq)); + nni_aio_set_msg(p->aio_putq, NULL); + nni_pipe_stop(p->pipe); + return; + } + nni_aio_set_msg(p->aio_putq, NULL); + + nni_pipe_recv(p->pipe, p->aio_recv); +} + +static void +xreq0_recv_cb(void *arg) +{ + xreq0_pipe *p = arg; + xreq0_sock *sock = p->req; + nni_msg * msg; + uint32_t id; + + if (nni_aio_result(p->aio_recv) != 0) { + nni_pipe_stop(p->pipe); + return; + } + + msg = nni_aio_get_msg(p->aio_recv); + nni_aio_set_msg(p->aio_recv, NULL); + nni_msg_set_pipe(msg, nni_pipe_id(p->pipe)); + + // We yank 4 bytes from front of body, and move them to the header. + if (nni_msg_len(msg) < 4) { + // Malformed message. + goto malformed; + } + id = nni_msg_trim_u32(msg); + if (nni_msg_header_append_u32(msg, id) != 0) { + // Arguably we could just discard and carry on. But + // dropping the connection is probably more helpful since + // it lets the other side see that a problem occurred. + // Plus it gives us a chance to reclaim some memory. + goto malformed; + } + + nni_aio_set_msg(p->aio_putq, msg); + nni_msgq_aio_put(sock->urq, p->aio_putq); + return; + +malformed: + nni_msg_free(msg); + nni_pipe_stop(p->pipe); +} + +static void +xreq0_sock_send(void *arg, nni_aio *aio) +{ + xreq0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void +xreq0_sock_recv(void *arg, nni_aio *aio) +{ + xreq0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + +static int +xreq0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +{ + xreq0_sock *s = arg; + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); +} + +static int +xreq0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) +{ + xreq0_sock *s = arg; + return (nni_copyout_int(s->ttl, buf, szp, typ)); +} + +static nni_proto_pipe_ops xreq0_pipe_ops = { + .pipe_init = xreq0_pipe_init, + .pipe_fini = xreq0_pipe_fini, + .pipe_start = xreq0_pipe_start, + .pipe_stop = xreq0_pipe_stop, +}; + +static nni_proto_sock_option xreq0_sock_options[] = { + { + .pso_name = NNG_OPT_MAXTTL, + .pso_type = NNI_TYPE_INT32, + .pso_getopt = xreq0_sock_getopt_maxttl, + .pso_setopt = xreq0_sock_setopt_maxttl, + }, + // terminate list + { + .pso_name = NULL, + }, +}; + +static nni_proto_sock_ops xreq0_sock_ops = { + .sock_init = xreq0_sock_init, + .sock_fini = xreq0_sock_fini, + .sock_open = xreq0_sock_open, + .sock_close = xreq0_sock_close, + .sock_options = xreq0_sock_options, + .sock_send = xreq0_sock_send, + .sock_recv = xreq0_sock_recv, +}; + +static nni_proto xreq0_proto = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REQ_V0, "req" }, + .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &xreq0_sock_ops, + .proto_pipe_ops = &xreq0_pipe_ops, + .proto_ctx_ops = NULL, // raw mode does not support contexts +}; + +int +nng_req0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &xreq0_proto)); +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index df7e5701..2177c4df 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -147,9 +147,9 @@ add_nng_test(scalability 20 ON) add_nng_test(sha1 5 NNG_SUPP_SHA1) add_nng_test(sock 5 ON) add_nng_test(synch 5 ON) -add_nng_test(tls 10 NNG_TRANSPORT_TLS) -add_nng_test(tcp 5 NNG_TRANSPORT_TCP) -add_nng_test(tcp6 5 NNG_TRANSPORT_TCP) +add_nng_test(tls 60 NNG_TRANSPORT_TLS) +add_nng_test(tcp 60 NNG_TRANSPORT_TCP) +add_nng_test(tcp6 60 NNG_TRANSPORT_TCP) add_nng_test(transport 5 ON) add_nng_test(udp 5 ON) add_nng_test(url 5 ON) @@ -162,6 +162,8 @@ add_nng_proto_test(bus 5 NNG_PROTO_BUS0 NNG_PROTO_BUS0) add_nng_test(pipeline 5 NNG_PROTO_PULL0 NNG_PROTO_PIPELINE0) add_nng_proto_test(pair1 5 NNG_PROTO_PAIR1 NNG_PROTO_PAIR1) add_nng_proto_test(pubsub 5 NNG_PROTO_PUB0 NNG_PROTO_SUB0) +add_nng_proto_test(reqctx 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) +add_nng_proto_test(reqpoll 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_proto_test(reqrep 5 NNG_PROTO_REQ0 NNG_PROTO_REP0) add_nng_test(survey 5 NNG_PROTO_SURVEYOR0 NNG_PROTO_RESPONDENT0) diff --git a/tests/reqctx.c b/tests/reqctx.c new file mode 100644 index 00000000..4aae2e24 --- /dev/null +++ b/tests/reqctx.c @@ -0,0 +1,258 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "convey.h" +#include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +#include + +static struct { + nng_aio *aio; + enum { START, SEND, RECV } state; + nng_socket s; + nng_msg * msg; + int cnt; +} rep_state; + +void +rep_cb(void) +{ + int rv; + + if (rep_state.state == START) { + rep_state.state = RECV; + nng_recv_aio(rep_state.s, rep_state.aio); + return; + } + if ((rv = nng_aio_result(rep_state.aio)) != 0) { + if (rep_state.msg != NULL) { + nng_msg_free(rep_state.msg); + rep_state.msg = NULL; + } + return; + } + switch (rep_state.state) { + case START: + break; + case RECV: + rep_state.msg = nng_aio_get_msg(rep_state.aio); + rep_state.state = SEND; + nng_aio_set_msg(rep_state.aio, rep_state.msg); + nng_send_aio(rep_state.s, rep_state.aio); + break; + case SEND: + rep_state.msg = NULL; + rep_state.state = RECV; + nng_aio_set_msg(rep_state.aio, NULL); + nng_recv_aio(rep_state.s, rep_state.aio); + rep_state.cnt++; + break; + } +} + +#define NCTX 1000 + +void +markr(void *arg) +{ + *(bool *) arg = true; +} + +static void +marks(void *arg) +{ + *(bool *) arg = true; +} + +nng_ctx ctxs[NCTX]; +uint32_t recv_order[NCTX]; +nng_aio *saios[NCTX]; +nng_aio *raios[NCTX]; +bool recd[NCTX]; +bool sent[NCTX]; + +TestMain("REQ concurrent contexts", { + int rv; + const char *addr = "inproc://test"; + int i; + + memset(recv_order, 0, NCTX * sizeof(int)); + + atexit(nng_fini); + + Convey("We can use REQ contexts concurrently", { + nng_socket req; + + So(nng_aio_alloc(&rep_state.aio, (void *) rep_cb, NULL) == 0); + So(nng_rep_open(&rep_state.s) == 0); + So(nng_req_open(&req) == 0); + + for (i = 0; i < NCTX; i++) { + sent[i] = recd[i] = false; + recv_order[i] = (uint32_t) i; + if (nng_aio_alloc(&raios[i], markr, &(recd[i])) != 0) { + break; + } + nng_aio_set_timeout(raios[i], 5000); + if (nng_aio_alloc(&saios[i], marks, &(sent[i])) != 0) { + break; + } + nng_aio_set_timeout(saios[i], 5000); + } + + So(nng_setopt_int(rep_state.s, NNG_OPT_SENDBUF, NCTX) == 0); + So(i == NCTX); + for (i = 0; i < NCTX; i++) { + uint32_t tmp; + int ni = rand() % NCTX; // recv index + + tmp = recv_order[i]; + recv_order[i] = recv_order[ni]; + recv_order[ni] = tmp; + } + Reset({ + for (i = 0; i < NCTX; i++) { + nng_aio_free(saios[i]); + nng_aio_free(raios[i]); + } + nng_close(req); + nng_close(rep_state.s); + nng_aio_free(rep_state.aio); + }); + + So(nng_listen(rep_state.s, addr, NULL, 0) == 0); + So(nng_dial(req, addr, NULL, 0) == 0); + + nng_msleep(100); // let things establish. + + // Start the rep state machine going. + rep_cb(); + + for (i = 0; i < NCTX; i++) { + if ((rv = nng_ctx_open(&ctxs[i], req)) != 0) { + break; + } + } + So(rv == 0); + So(i == NCTX); + + // Send messages + for (i = 0; i < NCTX; i++) { + nng_msg *m; + if ((rv = nng_msg_alloc(&m, sizeof(uint32_t))) != 0) { + Fail("msg alloc failed: %s", nng_strerror(rv)); + } + if ((rv = nng_msg_append_u32(m, i)) != 0) { + Fail("append failed: %s", nng_strerror(rv)); + } + nng_aio_set_msg(saios[i], m); + nng_ctx_send(ctxs[i], saios[i]); + } + So(rv == 0); + So(i == NCTX); + + for (i = 0; i < NCTX; i++) { + nng_aio_wait(saios[i]); + if ((rv = nng_aio_result(saios[i])) != 0) { + Fail("send failed: %s", nng_strerror(rv)); + So(false); + break; + } + } + for (i = 0; i < NCTX; i++) { + if (!sent[i]) { + Fail("Index %d (%d) not sent", i, i); + } + } + + So(rv == 0); + So(i == NCTX); + // Receive answers + for (i = 0; i < NCTX; i++) { + int ri = recv_order[i]; + nng_ctx_recv(ctxs[ri], raios[ri]); + } + + for (i = 0; i < NCTX; i++) { + nng_msg *msg; + uint32_t x; + + nng_aio_wait(raios[i]); + if ((rv = nng_aio_result(raios[i])) != 0) { + Fail("recv %d (%d) %d failed: %s", i, + recv_order[i], rep_state.cnt, + nng_strerror(rv)); + continue; + } + msg = nng_aio_get_msg(raios[i]); + if ((rv = nng_msg_chop_u32(msg, &x)) != 0) { + Fail("recv msg trim: %s", nng_strerror(rv)); + break; + } + if (x != (uint32_t) i) { + Fail("message body mismatch: %x %x\n", x, + (uint32_t) i); + break; + } + + nng_msg_free(msg); + } + for (i = 0; i < NCTX; i++) { + if (!recd[i]) { + Fail("Index %d (%d) not received", i, + recv_order[i]); + break; + } + } + + So(rv == 0); + So(i == NCTX); + }); + + Convey("Given a socket and a context", { + nng_socket req; + nng_ctx ctx; + nng_aio * aio; + + So(nng_req0_open(&req) == 0); + So(nng_ctx_open(&ctx, req) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + nng_aio_set_timeout(aio, 1000); + + Reset({ nng_aio_free(aio); }); + + Convey("Closing the socket aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_close(req); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_msg_free(msg); + }); + + Convey("Closing the context aborts a context send", { + nng_msg *msg; + So(nng_msg_alloc(&msg, 0) == 0); + nng_aio_set_msg(aio, msg); + nng_ctx_send(ctx, aio); + nng_ctx_close(ctx); + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECLOSED); + nng_msg_free(msg); + nng_close(req); + }); + }); +}); diff --git a/tests/reqpoll.c b/tests/reqpoll.c new file mode 100644 index 00000000..64c8df66 --- /dev/null +++ b/tests/reqpoll.c @@ -0,0 +1,148 @@ +// +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#ifndef _WIN32 +#include +#include +#define SOCKET int +#else + +#define poll WSAPoll +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif + +#include +#include + +#include +#endif + +#include "convey.h" +#include "nng.h" +#include "protocol/reqrep0/rep.h" +#include "protocol/reqrep0/req.h" +#include "stubs.h" +#include "supplemental/util/platform.h" + +bool +isready(SOCKET fd) +{ + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLRDNORM; + pfd.revents = 0; + + switch (poll(&pfd, 1, 0)) { + case 0: + return (false); + case 1: + return (true); + default: + printf("BAD POLL RETURN!\n"); + abort(); + } +} + +TestMain("REQ pollable", { + + atexit(nng_fini); + + Convey("Given a connected REQ/REP pair", { + nng_socket req; + nng_socket rep; + nng_ctx ctx; + + So(nng_req0_open(&req) == 0); + So(nng_rep0_open(&rep) == 0); + So(nng_ctx_open(&ctx, req) == 0); + + Reset({ + nng_ctx_close(ctx); + nng_close(req); + nng_close(rep); + }); + So(nng_listen(rep, "inproc://ctx1", NULL, 0) == 0); + + Convey("REQ ctx not pollable", { + int fd; + So(nng_ctx_open(&ctx, req) == 0); + Reset({ nng_ctx_close(req); }); + So(nng_ctx_getopt_int(ctx, NNG_OPT_SENDFD, &fd) == + NNG_ENOTSUP); + So(nng_ctx_getopt_int(ctx, NNG_OPT_RECVFD, &fd) == + NNG_ENOTSUP); + }); + + Convey("REQ starts not writable", { + int fd; + + So(nng_getopt_int(req, NNG_OPT_SENDFD, &fd) == 0); + So(isready(fd) == false); + + Convey("And becomes readable on connect", { + So(nng_dial(req, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(isready(fd) == true); + + Convey("Not writable with message pending", { + for (int i = 0; i < 10; i++) { + nng_msg *m; + So(nng_msg_alloc(&m, 0) == 0); + // Fill intermediate queues. + if (nng_sendmsg(req, m, + NNG_FLAG_NONBLOCK) != + 0) { + nng_msg_free(m); + } + } + So(isready(fd) == false); + }); + }); + }); + + Convey("REQ starts not readable", { + int fd; + + So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0); + So(isready(fd) == false); + + Convey("And doesn't become readable on connect", { + So(nng_dial(req, "inproc://ctx1", NULL, 0) == + 0); + nng_msleep(100); + So(isready(fd) == false); + }); + }); + + Convey("REQ becomes readable", { + int fd; + nng_msg *msg; + + So(nng_dial(req, "inproc://ctx1", NULL, 0) == 0); + + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_getopt_int(req, NNG_OPT_RECVFD, &fd) == 0); + So(isready(fd) == false); + So(nng_msg_append(msg, "xyz", 3) == 0); + So(nng_sendmsg(req, msg, 0) == 0); + So(nng_recvmsg(rep, &msg, 0) == 0); // recv on rep + So(nng_sendmsg(rep, msg, 0) == 0); // echo it back + nng_msleep(200); // give time for message to arrive + So(isready(fd) == true); + Convey("And is no longer readable after receive", { + So(nng_recvmsg(req, &msg, 0) == 0); + nng_msg_free(msg); + So(isready(fd) == false); + }); + }); + }); +}); diff --git a/tests/reqrep.c b/tests/reqrep.c index 97ed371a..76cf279c 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore -// Copyright 2017 Capitar IT Group BV +// Copyright 2018 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -175,5 +175,85 @@ TestMain("REQ/REP pattern", { nng_msg_free(cmd); }); + Convey("Request cancellation aborts pending recv", { + nng_msg * abc; + nng_msg * def; + nng_msg * cmd; + nng_aio * aio; + nng_duration retry = 100; // 100 ms + + nng_socket req; + nng_socket rep; + + So(nng_rep_open(&rep) == 0); + + So(nng_req_open(&req) == 0); + So(nng_aio_alloc(&aio, NULL, NULL) == 0); + + Reset({ + nng_close(rep); + nng_close(req); + nng_aio_free(aio); + }); + + So(nng_setopt_ms(req, NNG_OPT_REQ_RESENDTIME, retry) == 0); + So(nng_setopt_int(req, NNG_OPT_SENDBUF, 16) == 0); + + So(nng_msg_alloc(&abc, 0) == 0); + So(nng_msg_append(abc, "abc", 4) == 0); + So(nng_msg_alloc(&def, 0) == 0); + So(nng_msg_append(def, "def", 4) == 0); + + So(nng_listen(rep, addr, NULL, 0) == 0); + So(nng_dial(req, addr, NULL, 0) == 0); + + // Send req #1 (abc). + So(nng_sendmsg(req, abc, 0) == 0); + + // Sleep a bit. This is so that we ensure that our + // request gets to the far side. (If we cancel too + // fast, then our outgoing send will be canceled before + // it gets to the wire.) + nng_msleep(20); + + nng_aio_set_timeout(aio, 1000); // an entire second + nng_recv_aio(req, aio); + + // Give time for this recv to post properly. + nng_msleep(20); + + // Send the next next request ("def"). Note that + // the REP side server will have already buffered the receive + // request, and should simply be waiting for us to reply to + // abc. + So(nng_sendmsg(req, def, 0) == 0); + + nng_aio_wait(aio); + So(nng_aio_result(aio) == NNG_ECANCELED); + + // Receive the first request (should be abc) on the REP server. + So(nng_recvmsg(rep, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "abc") == 0); + + // REP sends the reply to first command. This will be + // discarded by the REQ server. + So(nng_sendmsg(rep, cmd, 0) == 0); + + // Now get the next command from the REP; should be "def". + So(nng_recvmsg(rep, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "def") == 0); + + // And send it back to REQ. + So(nng_sendmsg(rep, cmd, 0) == 0); + + // Try a req command. This should give back "def" + So(nng_recvmsg(req, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "def") == 0); + nng_msg_free(cmd); + }); + nng_fini(); }) -- cgit v1.2.3-70-g09d2