diff options
55 files changed, 944 insertions, 700 deletions
diff --git a/docs/man/nng.7.adoc b/docs/man/nng.7.adoc index ee120774..507ddeec 100644 --- a/docs/man/nng.7.adoc +++ b/docs/man/nng.7.adoc @@ -73,46 +73,73 @@ other languages please check the http://nanomsg.org/[website]. == Conceptual Overview -_nng_ presents a _socket_ view of networking. The sockets are constructed -using protocol-specific functions, as a given socket implements precisely -one _nng_ protocol. +_nng_ presents a _socket_ view of networking. +The sockets are constructed using protocol-specific functions, as a given +socket implements precisely one _nng_ protocol. Each socket can be used to send and receive messages (if the protocol) -supports it, and implements the appropriate protocol semantics. For -example, <<nng_sub.7#,nng_sub(7)>> sockets automatically filter incoming +supports it, and implements the appropriate protocol semantics. +For example, <<nng_sub.7#,_sub_>> sockets automatically filter incoming messages to discard those for topics that have not been subscribed. _nng_ sockets are message oriented, so that messages are either delivered -wholly, or not at all. Partial delivery is not possible. Furthermore, -_nng_ does not provide any other delivery or ordering guarantees; -messages may be dropped or reordered. (Some protocols, such as -<<nng_req.7#,nng_req(7)>> may offer stronger guarantees by -performing their own retry and validation schemes.) +wholly, or not at all. Partial delivery is not possible. +Furthermore, _nng_ does not provide any other delivery or ordering guarantees; +messages may be dropped or reordered +(Some protocols, such as <<nng_req.7#,_req_>> may offer stronger +guarantees by performing their own retry and validation schemes.) Each socket can have zero, one, or many "endpoints", which are either -_listeners_ or _dialers_. (A given socket may freely choose whether it uses -listeners, dialers, or both.) These "endpoints" provide access to -underlying transports, such as TCP, etc. - -Each endpoint is associated with a URL, which is a service address. For -dialers, this will be the service address that will be contacted, whereas -for listeners this is where the listener will bind and watch for new -connections. - -Endpoints do not themselves transport data. They are instead responsible -for the creation of _pipes_, which can be thought of as message-oriented, -connected, streams. Pipes frequently correspond to a single underlying -byte stream -- for example both IPC and TCP transports implement their -pipes using a 1:1 relationship with a connected socket. - -Endpoints create pipes as needed. Listeners will create them when a new -client connection request arrives, and dialers will generally create one, -then wait for it to disconnect before reconnecting. +_listeners_ or _dialers_. +(A given socket may freely choose whether it uses listeners, dialers, or both.) +These "endpoints" provide access to underlying transports, such as TCP, etc. + +Each endpoint is associated with a URL, which is a service address. +For dialers, this will be the service address that will be contacted, whereas +for listeners this is where the listener will accept new connections. + +Endpoints do not themselves transport data. +They are instead responsible for the creation of _pipes_, which can be +thought of as message-oriented connected streams. +Pipes frequently correspond to a single underlying byte stream. +For example both IPC and TCP transports implement their +pipes using a 1:1 relationship with a connected operating system socket. + +Endpoints create pipes as needed. +Listeners will create them when a new client connection request arrives, +and dialers will generally create one, then wait for it to disconnect before +reconnecting. Most applications should not have to worry about endpoints or pipes at all; the socket abstraction should provide all the functionality needed other than in a few specific circumstances. +[[raw_mode]](((raw mode))) +=== Raw Mode + +(((cooked mode))) +Most applications will use _nng_ sockets in "`cooked`" mode. +This mode provides the full semantics of the protocol. +For example, <<nng_req.7#,_req_>> sockets will automatically +match a reply to a request, and resend requests periodically if no reply +was received. + +There are situations, such as with <<nng_device.7#,proxies>>, +where it is desirable to bypass these semantics and simply pass messages +to and from the socket with no extra semantic handling. +This is possible using "`raw`" mode sockets. + +Raw mode sockets are generally constructed with a different function, +such as <<nng_req_open.3#,`nng_req0_open_raw()`>>. +Using these sockets, the application can simply send and receive messages, +and is responsible for supplying any additional socket semantics. +Typically this means that the application will need to inspect message +headers on incoming messages, and supply them on outgoing messages. + +TIP: The <<nng_device.3#,`nng_device()`>> function only works with raw mode +sockets, but as it only forwards the messages, no additional application +processing is needed. + === URLs (((URL))) diff --git a/docs/man/nng_bus.7.adoc b/docs/man/nng_bus.7.adoc index a05a9d04..d5c57710 100644 --- a/docs/man/nng_bus.7.adoc +++ b/docs/man/nng_bus.7.adoc @@ -18,8 +18,6 @@ nng_bus - bus protocol [source,c] ---- #include <nng/protocol/bus0/bus.h> - -int nng_bus0_open(nng_socket *s); ---- == DESCRIPTION @@ -51,7 +49,7 @@ the more likely that message loss is to occur. === Socket Operations -The <<nng_bus_open.3#,`nng_bus0_open()`>> call creates a bus socket. +The <<nng_bus_open.3#,`nng_bus0_open()`>> functions create a bus socket. This socket may be used to send and receive messages. Sending messages will attempt to deliver to each directly connected peer. diff --git a/docs/man/nng_bus_open.3.adoc b/docs/man/nng_bus_open.3.adoc index 119dc4e3..50369c07 100644 --- a/docs/man/nng_bus_open.3.adoc +++ b/docs/man/nng_bus_open.3.adoc @@ -21,6 +21,8 @@ nng_bus_open - create bus socket #include <nng/protocol/bus0/bus.h> int nng_bus0_open(nng_socket *s); + +int nng_bus0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -28,9 +30,13 @@ int nng_bus0_open(nng_socket *s); The `nng_bus0_open()` function creates a <<nng_bus.7#,_bus_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_bus0_open_raw()` function creates a <<nng_bus.7#,_bus_>> version 0 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode, and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_device.3.adoc b/docs/man/nng_device.3.adoc index 80780884..09717e21 100644 --- a/docs/man/nng_device.3.adoc +++ b/docs/man/nng_device.3.adoc @@ -31,6 +31,11 @@ This function is used to create forwarders, which can be used to create complex network topologies to provide for improved ((horizontal scalability)), reliability, and isolation. +Only <<nng_options.5#NNG_OPT_RAW,raw>> mode sockets may be used with this +function. +These can be created using `_raw` forms of the various socket constructors, +such as <<nng_req_open.3#,`nng_req0_open_raw()`>>. + The `nng_device()` function does not return until one of the sockets is closed. @@ -60,11 +65,14 @@ be a _bus_ socket. === Operation -This `nng_device()` function puts each socket into raw mode -(see <<nng_options.5#NNG_OPT_RAW,`NNG_OPT_RAW`>>), and then moves messages -between them. -When a protocol has a ((backtrace)) style header, routing information is -added as the message crosses the forwarder, allowing replies to be +The `nng_device()` function moves messages between the provided sockets. + +When a protocol has a ((backtrace)) style header, routing information +is present in the header of received messages, and is copied to the +header of the output bound message. +The underlying raw mode protocols supply the necessary header +adjustments to add or remove routing headers as needed. +This allows replies to be returned to requestors, and responses to be routed back to surveyors. Additionally, some protocols have a maximum ((time-to-live)) to protect diff --git a/docs/man/nng_options.5.adoc b/docs/man/nng_options.5.adoc index f07194d1..b5b62815 100644 --- a/docs/man/nng_options.5.adoc +++ b/docs/man/nng_options.5.adoc @@ -99,7 +99,7 @@ listeners but not dialers. (((raw mode))) (((cooked mode))) (`bool`) -This option determines whether the socket is in "`raw`" mode. +This read-only option indicates whether the socket is in "`raw`" mode. If `true`, the socket is in "`raw`" mode, and if `false` the socket is in "`cooked`" mode. Raw mode sockets generally do not have any protocol-specific semantics applied @@ -107,6 +107,7 @@ to them; instead the application is expected to perform such semantics itself. (For example, in "`cooked`" mode a <<nng_rep.7#,_rep_>> socket would automatically copy message headers from a received message to the corresponding reply, whereas in "`raw`" mode this is not done.) +See <<nng.7#raw_mode,Raw Mode>> for more details. [[NNG_OPT_RECONNMINT]] ((`NNG_OPT_RECONNMINT`)):: diff --git a/docs/man/nng_pair.7.adoc b/docs/man/nng_pair.7.adoc index f0824555..f03a4a5f 100644 --- a/docs/man/nng_pair.7.adoc +++ b/docs/man/nng_pair.7.adoc @@ -19,16 +19,12 @@ nng_pair - pair protocol [source,c] ---- #include <nng/protocol/pair0/pair.h> - -int nng_pair0_open(nng_socket *s); ---- .Version 1 [source,c] ---- #include <nng/protocol/pair1/pair.h> - -int nng_pair1_open(nng_socket *s); ---- == DESCRIPTION @@ -43,9 +39,10 @@ some additional sophistication in the application. === Socket Operations -The `nng_pair_open()` call creates a _pair_ socket. Normally, this -pattern will block when attempting to send a message, if no peer is -able to receive the message. +The <<nng_pair_open.3#,`nng_pair_open()`>> functions create _pair_ socket. + +Normally, this pattern will block when attempting to send a message if +no peer is able to receive the message. NOTE: Even though this mode may appear to be "reliable", because back-pressure prevents discarding messages most of the time, there are topologies involving diff --git a/docs/man/nng_pair_open.3.adoc b/docs/man/nng_pair_open.3.adoc index 35bda5e5..054c3ffc 100644 --- a/docs/man/nng_pair_open.3.adoc +++ b/docs/man/nng_pair_open.3.adoc @@ -21,6 +21,8 @@ nng_pair_open - create pair socket #include <nng/protocol/pair0/pair.h> int nng_pair0_open(nng_socket *s); + +int nng_pair0_open_raw(nng_socket *s); ---- .Version 1 @@ -29,6 +31,8 @@ int nng_pair0_open(nng_socket *s); #include <nng/protocol/pair1/pair.h> int nng_pair1_open(nng_socket *s); + +int nng_pair1_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -37,6 +41,11 @@ The `nng_pair0_open()` and `nng_pair1_open()` functions create a <<nng_pair.7#,_pair_>> version 0 or version 1 <<nng_socket.5#,socket>> and return it at the location pointed to by _s_. +The `nng_pair0_open_raw()` and `nng_pair1_open_raw()` functions +create a <<nng_pair.7#,_pair_>> version 0 or version 1 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and return it at the location pointed to by _s_. + == RETURN VALUES These functions returns 0 on success, and non-zero otherwise. diff --git a/docs/man/nng_pub.7.adoc b/docs/man/nng_pub.7.adoc index 6f88115c..e45201b7 100644 --- a/docs/man/nng_pub.7.adoc +++ b/docs/man/nng_pub.7.adoc @@ -18,8 +18,6 @@ nng_pub - publisher protocol [source,c] ---- #include <nng/protocol/pubsub0/pub.h> - -int nng_pub0_open(nng_socket *s); ---- == DESCRIPTION @@ -46,7 +44,7 @@ Applications should construct their messages accordingly. === Socket Operations -The <<nng_pub_open.3#,`nng_pub0_open()`>> call creates a publisher socket. +The <<nng_pub_open.3#,`nng_pub0_open()`>> functions create a publisher socket. This socket may be used to send messages, but is unable to receive them. Attempts to receive messages will result in `NNG_ENOTSUP`. diff --git a/docs/man/nng_pub_open.3.adoc b/docs/man/nng_pub_open.3.adoc index ad46c740..32655b1d 100644 --- a/docs/man/nng_pub_open.3.adoc +++ b/docs/man/nng_pub_open.3.adoc @@ -21,6 +21,8 @@ nng_pub_open - create pub socket #include <nng/protocol/pubsub0/pub.h> int nng_pub0_open(nng_socket *s); + +int nng_pub0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -28,9 +30,13 @@ int nng_pub0_open(nng_socket *s); The `nng_pub0_open()` function creates a <<nng_pub.7#,_pub_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_pub0_open_raw()` function creates a <<nng_pub.7#,_pub_>> version 0 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_pull.7.adoc b/docs/man/nng_pull.7.adoc index 9cc17bca..c57515c5 100644 --- a/docs/man/nng_pull.7.adoc +++ b/docs/man/nng_pull.7.adoc @@ -16,11 +16,9 @@ nng_pull - pull protocol == SYNOPSIS [source,c] ----------- +---- #include <nng/protocol/pipeline0/pull.h> - -int nng_pull0_open(nng_socket *s); ----------- +---- == DESCRIPTION @@ -37,7 +35,7 @@ This property makes this pattern useful in ((load-balancing)) scenarios. === Socket Operations -The <<nng_pull_open.3#,`nng_pull0_open()`>> call creates a puller socket. +The <<nng_pull_open.3#,`nng_pull0_open()`>> functions create a puller socket. This socket may be used to receive messages, but is unable to send them. Attempts to send messages will result in `NNG_ENOTSUP`. diff --git a/docs/man/nng_pull_open.3.adoc b/docs/man/nng_pull_open.3.adoc index 8b4b2389..1fd7eac4 100644 --- a/docs/man/nng_pull_open.3.adoc +++ b/docs/man/nng_pull_open.3.adoc @@ -21,6 +21,8 @@ nng_pull_open - create pull socket #include <nng/protocol/pipeline0/pull.h> int nng_pull0_open(nng_socket *s); + +int nng_pull0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -28,9 +30,13 @@ int nng_pull0_open(nng_socket *s); The `nng_pull0_open()` function creates a <<nng_pull.7#,_pull_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_pull0_open_raw()` function creates a <<nng_pull.7#,_pull_>> version 0 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_push.7.adoc b/docs/man/nng_push.7.adoc index 27ddd925..e2ab50ab 100644 --- a/docs/man/nng_push.7.adoc +++ b/docs/man/nng_push.7.adoc @@ -16,11 +16,9 @@ nng_push - push protocol == SYNOPSIS [source,c] ----------- +---- #include <nng/protocol/pipeline0/push.h> - -int nng_push0_open(nng_socket *s); ----------- +---- == DESCRIPTION diff --git a/docs/man/nng_push_open.3.adoc b/docs/man/nng_push_open.3.adoc index 8470b632..1769b2f2 100644 --- a/docs/man/nng_push_open.3.adoc +++ b/docs/man/nng_push_open.3.adoc @@ -21,6 +21,8 @@ nng_push_open - create push socket #include <nng/protocol/pipeline0/push.h> int nng_push0_open(nng_socket *s); + +int nng_push0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -28,9 +30,13 @@ int nng_push0_open(nng_socket *s); The `nng_push0_open()` function creates a <<nng_push.7#,_push_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_push0_open_raw()` function creates a <<nng_push.7#,_push_>> version 0 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_rep.7.adoc b/docs/man/nng_rep.7.adoc index 7b3635c4..255a8f4b 100644 --- a/docs/man/nng_rep.7.adoc +++ b/docs/man/nng_rep.7.adoc @@ -18,8 +18,6 @@ nng_rep - reply protocol [source,c] ---- #include <nng/protocol/reqrep0/rep.h> - -int nng_rep0_open(nng_socket *s); ---- == DESCRIPTION @@ -40,7 +38,7 @@ The _rep_ protocol is the replier side, and the === Socket Operations -The <<nng_rep_open.3#,`nng_rep0_open()`>> call creates a replier socket. +The <<nng_rep_open.3#,`nng_rep0_open()`>> functions create a replier socket. This socket may be used to receive messages (requests), and then to send replies. Generally a reply can only be sent after receiving a request. @@ -50,8 +48,7 @@ is no outstanding request.) Attempts to send on a socket with no outstanding requests will result in `NNG_ESTATE`. -Raw mode sockets (set with <<nng_options.5#NNG_OPT_RAW,`NNG_OPT_RAW`>>) -ignore all these restrictions. +<<nng.7#raw_mode,Raw>> mode sockets ignore all these restrictions. === Protocol Versions diff --git a/docs/man/nng_rep_open.3.adoc b/docs/man/nng_rep_open.3.adoc index 4be697bb..f90cefe2 100644 --- a/docs/man/nng_rep_open.3.adoc +++ b/docs/man/nng_rep_open.3.adoc @@ -28,9 +28,13 @@ int nng_rep0_open(nng_socket *s); The `nng_rep0_open()` function creates a <<nng_rep.7#,_rep_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_rep0_open_raw()` function creates a <<nng_rep.7#,_rep_>> version 0 +<<nng_socket.5#,socket>> +in <<nng.7#,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_req.7.adoc b/docs/man/nng_req.7.adoc index 9d93eade..9b956172 100644 --- a/docs/man/nng_req.7.adoc +++ b/docs/man/nng_req.7.adoc @@ -16,11 +16,9 @@ nng_req - request protocol == SYNOPSIS [source,c] ----------- +---- #include <nng/protocol/reqrep0/req.h> - -int nng_req0_open(nng_socket *s); ----------- +---- == DESCRIPTION @@ -54,7 +52,7 @@ The _req_ protocol is the requester side, and the === Socket Operations -The <<nng_req_open.3#,`nng_req0_open()`>> call creates a requester socket. +The <<nng_req_open.3#,`nng_req0_open()`>> functions create a requester socket. This socket may be used to send messages (requests), and then to receive replies. Generally a reply can only be received after sending a request. @@ -70,8 +68,7 @@ that has already been placed on the wire. Attempts to receive on a socket with no outstanding requests will result in `NNG_ESTATE`. -Raw mode sockets (set with <<nng_options.5#NNG_OPT_RAW,`NNG_OPT_RAW`>>) -ignore all these restrictions. +<<nng.7#raw_mode,Raw>> mode sockets ignore all these restrictions. === Protocol Versions diff --git a/docs/man/nng_req_open.3.adoc b/docs/man/nng_req_open.3.adoc index 638ef43a..dd2e7cb6 100644 --- a/docs/man/nng_req_open.3.adoc +++ b/docs/man/nng_req_open.3.adoc @@ -21,6 +21,8 @@ nng_req_open - create rep socket #include <nng/protocol/reqrep0/req.h> int nng_req0_open(nng_socket *s); + +int nng_req0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -28,9 +30,14 @@ int nng_req0_open(nng_socket *s); The `nng_req0_open()` function creates a <<nng_req.7#,_req_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_req0_open_raw()` function creates a <<nng_req.7#,_req_>> version 0 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode +and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS @@ -39,6 +46,7 @@ This function returns 0 on success, and non-zero otherwise. == SEE ALSO +<<nng_options.5#,nng_options(5)>>, <<nng_socket.5#,nng_socket(5)>>, <<nng_rep.7#,nng_rep(7)>>, <<nng_req.7#,nng_req(7)>>, diff --git a/docs/man/nng_respondent.7.adoc b/docs/man/nng_respondent.7.adoc index 1235941a..2db78866 100644 --- a/docs/man/nng_respondent.7.adoc +++ b/docs/man/nng_respondent.7.adoc @@ -18,8 +18,6 @@ nng_respondent - respondent protocol [source,c] ---------- #include <nng/protocol/survey0/respond.h> - -int nng_respondent0_open(nng_socket *s); ---------- == DESCRIPTION @@ -41,7 +39,7 @@ The _respondent_ protocol is the respondent side, and the === Socket Operations -The <<nng_respondent_open.3#,`nng_respondent0_open()`>> call creates a +The <<nng_respondent_open.3#,`nng_respondent0_open()`>> functions create a respondent socket. This socket may be used to receive messages, and then to send replies. A reply can only be sent after receiving a survey, and generally the diff --git a/docs/man/nng_respondent_open.3.adoc b/docs/man/nng_respondent_open.3.adoc index a62fc9d8..33e9904f 100644 --- a/docs/man/nng_respondent_open.3.adoc +++ b/docs/man/nng_respondent_open.3.adoc @@ -21,6 +21,8 @@ nng_respondent_open - create respondent socket #include <nng/protocol/survey0/respond.h> int nng_respondent0_open(nng_socket *s); + +int nng_respondent0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -30,9 +32,14 @@ The `nng_respondent0_open()` function creates a version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_respondent0_open_raw()` function creates a +<<nng_respondent.7#,_respondent_>> +version 0 <<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_sockaddr_in6.5.adoc b/docs/man/nng_sockaddr_in6.5.adoc index f4164dd1..95e6ec8c 100644 --- a/docs/man/nng_sockaddr_in6.5.adoc +++ b/docs/man/nng_sockaddr_in6.5.adoc @@ -28,7 +28,6 @@ typedef struct { uint16_t sa_port; uint8_t sa_addr[16]; } nng_sockaddr_in6; - ---- == DESCRIPTION diff --git a/docs/man/nng_sub.7.adoc b/docs/man/nng_sub.7.adoc index f222327d..18606f2b 100644 --- a/docs/man/nng_sub.7.adoc +++ b/docs/man/nng_sub.7.adoc @@ -19,8 +19,6 @@ nng_sub - subscriber protocol ---- #include <nng/nng.h> #include <nng/protocol/pubsub0/sub.h> - -int nng_sub0_open(nng_socket *s); ---- == DESCRIPTION @@ -45,7 +43,7 @@ Applications should construct their messages accordingly. === Socket Operations -The <<nng_sub_open.3#,`nng_sub0_open()`>> call creates a subscriber socket. +The <<nng_sub_open.3#,`nng_sub0_open()`>> functions create a subscriber socket. This socket may be used to receive messages, but is unable to send them. Attempts to send messages will result in `NNG_ENOTSUP`. diff --git a/docs/man/nng_sub_open.3.adoc b/docs/man/nng_sub_open.3.adoc index bfc315bb..0e7263dd 100644 --- a/docs/man/nng_sub_open.3.adoc +++ b/docs/man/nng_sub_open.3.adoc @@ -21,6 +21,8 @@ nng_sub_open - create sub socket #include <nng/protocol/pubsub0/sub.h> int nng_sub0_open(nng_socket *s); + +int nng_sub0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -28,9 +30,13 @@ int nng_sub0_open(nng_socket *s); The `nng_sub0_open()` function creates a <<nng_sub.7#,_sub_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_sub0_open()` function creates a <<nng_sub.7#,_sub_>> version 0 +<<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/docs/man/nng_surveyor.7.adoc b/docs/man/nng_surveyor.7.adoc index 2c53e3ab..97578b33 100644 --- a/docs/man/nng_surveyor.7.adoc +++ b/docs/man/nng_surveyor.7.adoc @@ -19,8 +19,6 @@ nng_surveyor - surveyor protocol ---- #include <nng/nng.h> #include <nng/protocol/survey0/survey.h> - -int nng_surveyor0_open(nng_socket *s); ---- == DESCRIPTION @@ -43,7 +41,7 @@ The _surveyor_ protocol is the surveyor side, and the === Socket Operations The <<nng_surveyor_open.3#,`nng_surveyor0_open()`>> -call creates a surveyor socket. +functions create a surveyor socket. This socket may be used to send messages (surveys), and then to receive replies. A reply can only be received after sending a survey. A surveyor can normally expect to receive at most one reply from each responder. @@ -59,8 +57,7 @@ Only one survey can be outstanding at a time; sending another survey will cancel the prior one, and any responses from respondents from the prior survey that arrive after this will be discarded. -Raw mode sockets (set with <<nng_options.5#NNG_OPT_RAW,`NNG_OPT_RAW`>>) -ignore all these restrictions. +<<nng.7#raw_mode,Raw>> mode sockets ignore all these restrictions. === Protocol Versions diff --git a/docs/man/nng_surveyor_open.3.adoc b/docs/man/nng_surveyor_open.3.adoc index 17be9889..0ca95d90 100644 --- a/docs/man/nng_surveyor_open.3.adoc +++ b/docs/man/nng_surveyor_open.3.adoc @@ -21,6 +21,8 @@ nng_surveyor_open - create surveyor socket #include <nng/protocol/survey0/survey.h> int nng_surveyor0_open(nng_socket *s); + +int nng_surveyor0_open_raw(nng_socket *s); ---- == DESCRIPTION @@ -29,9 +31,13 @@ The `nng_surveyor0_open()` function creates a <<nng_surveyor.7#,_surveyor_>> version 0 <<nng_socket.5#,socket>> and returns it at the location pointed to by _s_. +The `nng_surveyor0_open_raw()` function creates a <<nng_surveyor.7#,_surveyor_>> +version 0 <<nng_socket.5#,socket>> in +<<nng.7#raw_mode,raw>> mode and returns it at the location pointed to by _s_. + == RETURN VALUES -This function returns 0 on success, and non-zero otherwise. +These functions return 0 on success, and non-zero otherwise. == ERRORS diff --git a/src/compat/nanomsg/nn.c b/src/compat/nanomsg/nn.c index caa02c72..a95481bf 100644 --- a/src/compat/nanomsg/nn.c +++ b/src/compat/nanomsg/nn.c @@ -110,40 +110,81 @@ nn_errno(void) static const struct { uint16_t p_id; int (*p_open)(nng_socket *); + int (*p_open_raw)(nng_socket *); } nn_protocols[] = { -// clang-format off #ifdef NNG_HAVE_BUS0 - { NN_BUS, nng_bus0_open }, + { + .p_id = NN_BUS, + .p_open = nng_bus0_open, + .p_open_raw = nng_bus0_open_raw, + }, #endif #ifdef NNG_HAVE_PAIR0 - { NN_PAIR, nng_pair0_open }, -#endif -#ifdef NNG_HAVE_PUSH0 - { NN_PUSH, nng_push0_open }, + { + .p_id = NN_PAIR, + .p_open = nng_pair0_open, + .p_open_raw = nng_pair0_open_raw, + }, #endif #ifdef NNG_HAVE_PULL0 - { NN_PULL, nng_pull0_open }, + { + .p_id = NN_PULL, + .p_open = nng_pull0_open, + .p_open_raw = nng_pull0_open_raw, + }, +#endif +#ifdef NNG_HAVE_PUSH0 + { + .p_id = NN_PUSH, + .p_open = nng_push0_open, + .p_open_raw = nng_push0_open_raw, + }, #endif #ifdef NNG_HAVE_PUB0 - { NN_PUB, nng_pub0_open }, + { + .p_id = NN_PUB, + .p_open = nng_pub0_open, + .p_open_raw = nng_pub0_open_raw, + }, #endif #ifdef NNG_HAVE_SUB0 - { NN_SUB, nng_sub0_open }, + { + .p_id = NN_SUB, + .p_open = nng_sub0_open, + .p_open_raw = nng_sub0_open_raw, + }, #endif #ifdef NNG_HAVE_REQ0 - { NN_REQ, nng_req0_open }, + { + .p_id = NN_REQ, + .p_open = nng_req0_open, + .p_open_raw = nng_req0_open_raw, + }, #endif #ifdef NNG_HAVE_REP0 - { NN_REP, nng_rep0_open }, + { + .p_id = NN_REP, + .p_open = nng_rep0_open, + .p_open_raw = nng_rep0_open_raw, + }, #endif #ifdef NNG_HAVE_SURVEYOR0 - { NN_SURVEYOR, nng_surveyor0_open }, + { + .p_id = NN_SURVEYOR, + .p_open = nng_surveyor0_open, + .p_open_raw = nng_surveyor0_open_raw, + }, #endif #ifdef NNG_HAVE_RESPONDENT0 - { NN_RESPONDENT, nng_respondent0_open }, + { + .p_id = NN_RESPONDENT, + .p_open = nng_respondent0_open, + .p_open_raw = nng_respondent0_open_raw, + }, #endif - { 0, NULL }, - // clang-format on + { + .p_id = 0, + }, }; int @@ -168,17 +209,16 @@ nn_socket(int domain, int protocol) return (-1); } - if ((rv = nn_protocols[i].p_open(&sock)) != 0) { + if (domain == AF_SP_RAW) { + rv = nn_protocols[i].p_open_raw(&sock); + } else { + rv = nn_protocols[i].p_open(&sock); + } + if (rv != 0) { nn_seterror(rv); return (-1); } - if (domain == AF_SP_RAW) { - if ((rv = nng_setopt_bool(sock, NNG_OPT_RAW, true)) != 0) { - nn_seterror(rv); - nng_close(sock); - return (-1); - } - } + return ((int) sock); } diff --git a/src/core/device.c b/src/core/device.c index 0eaec30e..6def9f64 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -105,6 +105,8 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) nni_device_data *dd; int npath = 2; int i; + bool raw; + size_t rsz; // Specifying either of these as null turns the device into // a loopback reflector. @@ -123,6 +125,21 @@ nni_device_init(nni_device_data **dp, nni_sock *s1, nni_sock *s2) return (NNG_EINVAL); } + raw = false; + rsz = sizeof(raw); + if (((nni_sock_getopt(s1, NNG_OPT_RAW, &raw, &rsz, NNI_TYPE_BOOL) != + 0)) || + (!raw)) { + return (NNG_EINVAL); + } + + rsz = sizeof(raw); + if (((nni_sock_getopt(s2, NNG_OPT_RAW, &raw, &rsz, NNI_TYPE_BOOL) != + 0)) || + (!raw)) { + return (NNG_EINVAL); + } + // Note we assume that since they peers, we only need to look // at the receive flags -- the other side is assumed to be able // to send. @@ -201,7 +218,7 @@ nni_device(nni_sock *s1, nni_sock *s2) if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) { return (rv); } - if (nni_device_init(&dd, s1, s2) != 0) { + if ((rv = nni_device_init(&dd, s1, s2)) != 0) { nni_aio_fini(aio); return (rv); } diff --git a/src/core/protocol.h b/src/core/protocol.h index 01e3d11c..9b241138 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -127,10 +127,12 @@ struct nni_proto { // These flags determine which operations make sense. We use them so that // we can reject attempts to create notification fds for operations that make -// no sense. +// no sense. Also, we can detect raw mode, thereby providing handling for +// that at the socket layer (NNG_PROTO_FLAG_RAW). #define NNI_PROTO_FLAG_RCV 1 // Protocol can receive #define NNI_PROTO_FLAG_SND 2 // Protocol can send #define NNI_PROTO_FLAG_SNDRCV 3 // Protocol can both send & recv +#define NNI_PROTO_FLAG_RAW 4 // Protocol is raw // nni_proto_open is called by the protocol to create a socket instance // with its ops vector. The intent is that applications will only see diff --git a/src/core/socket.c b/src/core/socket.c index 40ef32f3..62950b1c 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -193,6 +193,13 @@ nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp, int typ) } static int +nni_sock_getopt_raw(nni_sock *s, void *buf, size_t *szp, int typ) +{ + bool raw = ((nni_sock_flags(s) & NNI_PROTO_FLAG_RAW) != 0); + return (nni_copyout_bool(raw, buf, szp, typ)); +} + +static int nni_sock_setopt_recvtimeo(nni_sock *s, const void *buf, size_t sz, int typ) { return (nni_copyin_ms(&s->s_rcvtimeo, buf, sz, typ)); @@ -347,6 +354,12 @@ static const nni_socket_option nni_sock_options[] = { .so_getopt = nni_sock_getopt_sockname, .so_setopt = nni_sock_setopt_sockname, }, + { + .so_name = NNG_OPT_RAW, + .so_type = NNI_TYPE_BOOL, + .so_getopt = nni_sock_getopt_raw, + .so_setopt = NULL, + }, // terminate list { .so_name = NULL, @@ -920,7 +933,8 @@ nni_sock_setopt(nni_sock *s, const char *name, const void *v, size_t sz, int t) return (NNG_ECLOSED); } - // Protocol options. + // Protocol options. The protocol can override options that + // the socket framework would otherwise supply, like buffer sizes. for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) { if (strcmp(pso->pso_name, name) != 0) { continue; @@ -1076,7 +1090,8 @@ nni_sock_getopt(nni_sock *s, const char *name, void *val, size_t *szp, int t) return (NNG_ECLOSED); } - // Protocol specific options. + // Protocol specific options. The protocol can override + // options like the send buffer or notification descriptors this way. for (pso = s->s_sock_ops.sock_options; pso->pso_name != NULL; pso++) { if (strcmp(name, pso->pso_name) != 0) { continue; diff --git a/src/protocol/bus0/bus.c b/src/protocol/bus0/bus.c index 57fad341..2a2a1228 100644 --- a/src/protocol/bus0/bus.c +++ b/src/protocol/bus0/bus.c @@ -42,7 +42,6 @@ static void bus0_pipe_putq_cb(void *); // bus0_sock is our per-socket protocol private structure. struct bus0_sock { - bool raw; nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; @@ -89,7 +88,6 @@ bus0_sock_init(void **sp, nni_sock *nsock) bus0_sock_fini(s); return (rv); } - s->raw = false; s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); @@ -333,20 +331,6 @@ bus0_pipe_recv(bus0_pipe *p) nni_pipe_recv(p->npipe, p->aio_recv); } -static int -bus0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - bus0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -bus0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - bus0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void bus0_sock_send(void *arg, nni_aio *aio) { @@ -371,12 +355,6 @@ static nni_proto_pipe_ops bus0_pipe_ops = { }; static nni_proto_sock_option bus0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = bus0_sock_getopt_raw, - .pso_setopt = bus0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -402,8 +380,23 @@ static nni_proto bus0_proto = { .proto_pipe_ops = &bus0_pipe_ops, }; +static nni_proto bus0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_BUS_V0, "bus" }, + .proto_peer = { NNI_PROTO_BUS_V0, "bus" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &bus0_sock_ops, + .proto_pipe_ops = &bus0_pipe_ops, +}; + int nng_bus0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &bus0_proto)); } + +int +nng_bus0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &bus0_proto_raw)); +} diff --git a/src/protocol/bus0/bus.h b/src/protocol/bus0/bus.h index 0ef3d391..c8c23d84 100644 --- a/src/protocol/bus0/bus.h +++ b/src/protocol/bus0/bus.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_bus0_open(nng_socket *); +NNG_DECL int nng_bus0_open_raw(nng_socket *); + #ifndef nng_bus_open #define nng_bus_open nng_bus0_open #endif +#ifndef nng_bus_open_raw +#define nng_bus_open_raw nng_bus0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pair0/pair.c b/src/protocol/pair0/pair.c index ccece972..e275e52c 100644 --- a/src/protocol/pair0/pair.c +++ b/src/protocol/pair0/pair.c @@ -36,7 +36,6 @@ struct pair0_sock { pair0_pipe *ppipe; nni_msgq * uwq; nni_msgq * urq; - bool raw; nni_mtx mtx; }; @@ -63,7 +62,6 @@ pair0_sock_init(void **sp, nni_sock *nsock) } nni_mtx_init(&s->mtx); s->ppipe = NULL; - s->raw = false; s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); *sp = s; @@ -231,20 +229,6 @@ pair0_sock_close(void *arg) NNI_ARG_UNUSED(arg); } -static int -pair0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pair0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pair0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pair0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pair0_sock_send(void *arg, nni_aio *aio) { @@ -269,12 +253,6 @@ static nni_proto_pipe_ops pair0_pipe_ops = { }; static nni_proto_sock_option pair0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pair0_sock_getopt_raw, - .pso_setopt = pair0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -301,8 +279,23 @@ static nni_proto pair0_proto = { .proto_pipe_ops = &pair0_pipe_ops, }; +static nni_proto pair0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PAIR_V0, "pair" }, + .proto_peer = { NNI_PROTO_PAIR_V0, "pair" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pair0_sock_ops, + .proto_pipe_ops = &pair0_pipe_ops, +}; + int nng_pair0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pair0_proto)); } + +int +nng_pair0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pair0_proto_raw)); +} diff --git a/src/protocol/pair0/pair.h b/src/protocol/pair0/pair.h index 6828c921..1356f1cd 100644 --- a/src/protocol/pair0/pair.h +++ b/src/protocol/pair0/pair.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_pair0_open(nng_socket *); +NNG_DECL int nng_pair0_open_raw(nng_socket *); + #ifndef nng_pair_open #define nng_pair_open nng_pair0_open #endif +#ifndef nng_pair_open_raw +#define nng_pair_open_raw nng_pair0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pair1/pair.c b/src/protocol/pair1/pair.c index becbbfa7..a3c01d46 100644 --- a/src/protocol/pair1/pair.c +++ b/src/protocol/pair1/pair.c @@ -72,7 +72,7 @@ pair1_sock_fini(void *arg) } static int -pair1_sock_init(void **sp, nni_sock *nsock) +pair1_sock_init_impl(void **sp, nni_sock *nsock, bool raw) { pair1_sock *s; int rv; @@ -94,7 +94,7 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (rv); } - s->raw = false; + s->raw = raw; s->poly = false; s->uwq = nni_sock_sendq(nsock); s->urq = nni_sock_recvq(nsock); @@ -104,6 +104,18 @@ pair1_sock_init(void **sp, nni_sock *nsock) return (0); } +static int +pair1_sock_init(void **sp, nni_sock *nsock) +{ + return (pair1_sock_init_impl(sp, nsock, false)); +} + +static int +pair1_sock_init_raw(void **sp, nni_sock *nsock) +{ + return (pair1_sock_init_impl(sp, nsock, true)); +} + static void pair1_pipe_fini(void *arg) { @@ -397,24 +409,6 @@ pair1_sock_close(void *arg) } static int -pair1_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pair1_sock *s = arg; - int rv; - nni_mtx_lock(&s->mtx); - rv = s->started ? NNG_ESTATE : nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -pair1_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pair1_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int pair1_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { pair1_sock *s = arg; @@ -475,12 +469,6 @@ static nni_proto_pipe_ops pair1_pipe_ops = { static nni_proto_sock_option pair1_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pair1_sock_getopt_raw, - .pso_setopt = pair1_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = pair1_sock_getopt_maxttl, @@ -522,3 +510,28 @@ nng_pair1_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pair1_proto)); } + +static nni_proto_sock_ops pair1_sock_ops_raw = { + .sock_init = pair1_sock_init_raw, + .sock_fini = pair1_sock_fini, + .sock_open = pair1_sock_open, + .sock_close = pair1_sock_close, + .sock_recv = pair1_sock_recv, + .sock_send = pair1_sock_send, + .sock_options = pair1_sock_options, +}; + +static nni_proto pair1_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PAIR_V1, "pair1" }, + .proto_peer = { NNI_PROTO_PAIR_V1, "pair1" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pair1_sock_ops_raw, + .proto_pipe_ops = &pair1_pipe_ops, +}; + +int +nng_pair1_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pair1_proto_raw)); +} diff --git a/src/protocol/pair1/pair.h b/src/protocol/pair1/pair.h index bc519d9f..85da9d45 100644 --- a/src/protocol/pair1/pair.h +++ b/src/protocol/pair1/pair.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pair1_open(nng_socket *); +NNG_DECL int nng_pair1_open_raw(nng_socket *); #ifndef nng_pair_open #define nng_pair_open nng_pair1_open #endif +#ifndef nng_pair_open_raw +#define nng_pair_open_raw nng_pair1_open_raw +#endif + #define NNG_OPT_PAIR1_POLY "pair1:polyamorous" #ifdef __cplusplus diff --git a/src/protocol/pipeline0/pull.c b/src/protocol/pipeline0/pull.c index 9aa7bea9..c5017d50 100644 --- a/src/protocol/pipeline0/pull.c +++ b/src/protocol/pipeline0/pull.c @@ -53,7 +53,6 @@ pull0_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = false; s->urq = nni_sock_recvq(sock); *sp = s; @@ -180,20 +179,6 @@ pull0_sock_close(void *arg) NNI_ARG_UNUSED(arg); } -static int -pull0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pull0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pull0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pull0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pull0_sock_send(void *arg, nni_aio *aio) { @@ -217,12 +202,6 @@ static nni_proto_pipe_ops pull0_pipe_ops = { }; static nni_proto_sock_option pull0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pull0_sock_getopt_raw, - .pso_setopt = pull0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -248,8 +227,23 @@ static nni_proto pull0_proto = { .proto_sock_ops = &pull0_sock_ops, }; +static nni_proto pull0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PULL_V0, "pull" }, + .proto_peer = { NNI_PROTO_PUSH_V0, "push" }, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_pipe_ops = &pull0_pipe_ops, + .proto_sock_ops = &pull0_sock_ops, +}; + int nng_pull0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pull0_proto)); } + +int +nng_pull0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pull0_proto_raw)); +} diff --git a/src/protocol/pipeline0/pull.h b/src/protocol/pipeline0/pull.h index 75bded03..1c5d63e3 100644 --- a/src/protocol/pipeline0/pull.h +++ b/src/protocol/pipeline0/pull.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pull0_open(nng_socket *); +NNG_DECL int nng_pull0_open_raw(nng_socket *); #ifndef nng_pull_open #define nng_pull_open nng_pull0_open #endif +#ifndef nng_pull_open_raw +#define nng_pull_open_raw nng_pull0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pipeline0/push.c b/src/protocol/pipeline0/push.c index 8c8fa13e..2ad657b6 100644 --- a/src/protocol/pipeline0/push.c +++ b/src/protocol/pipeline0/push.c @@ -36,7 +36,6 @@ static void push0_getq_cb(void *); // push0_sock is our per-socket protocol private structure. struct push0_sock { nni_msgq *uwq; - bool raw; }; // push0_pipe is our per-pipe protocol private structure. @@ -58,7 +57,6 @@ push0_sock_init(void **sp, nni_sock *sock) if ((s = NNI_ALLOC_STRUCT(s)) == NULL) { return (NNG_ENOMEM); } - s->raw = false; s->uwq = nni_sock_sendq(sock); *sp = s; return (0); @@ -197,20 +195,6 @@ push0_getq_cb(void *arg) nni_pipe_send(p->pipe, p->aio_send); } -static int -push0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - push0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -push0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - push0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void push0_sock_send(void *arg, nni_aio *aio) { @@ -234,12 +218,6 @@ static nni_proto_pipe_ops push0_pipe_ops = { }; static nni_proto_sock_option push0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = push0_sock_getopt_raw, - .pso_setopt = push0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -265,8 +243,23 @@ static nni_proto push0_proto = { .proto_sock_ops = &push0_sock_ops, }; +static nni_proto push0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUSH_V0, "push" }, + .proto_peer = { NNI_PROTO_PULL_V0, "pull" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_pipe_ops = &push0_pipe_ops, + .proto_sock_ops = &push0_sock_ops, +}; + int nng_push0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &push0_proto)); } + +int +nng_push0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &push0_proto_raw)); +} diff --git a/src/protocol/pipeline0/push.h b/src/protocol/pipeline0/push.h index c7303b92..a1384e0a 100644 --- a/src/protocol/pipeline0/push.h +++ b/src/protocol/pipeline0/push.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_push0_open(nng_socket *); +NNG_DECL int nng_push0_open_raw(nng_socket *); #ifndef nng_push_open #define nng_push_open nng_push0_open #endif +#ifndef nng_push_open_raw +#define nng_push_open_raw nng_push0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pubsub0/pub.c b/src/protocol/pubsub0/pub.c index aaa22801..45f4b7d9 100644 --- a/src/protocol/pubsub0/pub.c +++ b/src/protocol/pubsub0/pub.c @@ -40,7 +40,6 @@ static void pub0_pipe_fini(void *); // pub0_sock is our per-socket protocol private structure. struct pub0_sock { nni_msgq *uwq; - bool raw; nni_aio * aio_getq; nni_list pipes; nni_mtx mtx; @@ -83,7 +82,6 @@ pub0_sock_init(void **sp, nni_sock *sock) return (rv); } - s->raw = false; NNI_LIST_INIT(&s->pipes, pub0_pipe, node); s->uwq = nni_sock_sendq(sock); @@ -273,20 +271,6 @@ pub0_pipe_send_cb(void *arg) nni_msgq_aio_get(p->sendq, p->aio_getq); } -static int -pub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - pub0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -pub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - pub0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void pub0_sock_recv(void *arg, nni_aio *aio) { @@ -310,12 +294,6 @@ static nni_proto_pipe_ops pub0_pipe_ops = { }; static nni_proto_sock_option pub0_sock_options[] = { - { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = pub0_sock_getopt_raw, - .pso_setopt = pub0_sock_setopt_raw, - }, // terminate list { .pso_name = NULL, @@ -341,8 +319,23 @@ static nni_proto pub0_proto = { .proto_pipe_ops = &pub0_pipe_ops, }; +static nni_proto pub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_PUB_V0, "pub" }, + .proto_peer = { NNI_PROTO_SUB_V0, "sub" }, + .proto_flags = NNI_PROTO_FLAG_SND | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &pub0_sock_ops, + .proto_pipe_ops = &pub0_pipe_ops, +}; + int nng_pub0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &pub0_proto)); } + +int +nng_pub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &pub0_proto_raw)); +} diff --git a/src/protocol/pubsub0/pub.h b/src/protocol/pubsub0/pub.h index 2388a292..877f2f1c 100644 --- a/src/protocol/pubsub0/pub.h +++ b/src/protocol/pubsub0/pub.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_pub0_open(nng_socket *); +NNG_DECL int nng_pub0_open_raw(nng_socket *); #ifndef nng_pub_open #define nng_pub_open nng_pub0_open #endif +#ifndef nng_pub_open_raw +#define nng_pub_open_raw nng_pub0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/pubsub0/sub.c b/src/protocol/pubsub0/sub.c index 6b1f1173..b41b33ea 100644 --- a/src/protocol/pubsub0/sub.c +++ b/src/protocol/pubsub0/sub.c @@ -44,7 +44,6 @@ struct sub0_topic { struct sub0_sock { nni_list topics; nni_msgq *urq; - bool raw; nni_mtx lk; }; @@ -66,7 +65,6 @@ sub0_sock_init(void **sp, nni_sock *sock) } nni_mtx_init(&s->lk); NNI_LIST_INIT(&s->topics, sub0_topic, node); - s->raw = false; s->urq = nni_sock_recvq(sock); *sp = s; @@ -277,20 +275,6 @@ sub0_unsubscribe(void *arg, const void *buf, size_t sz, int typ) return (NNG_ENOENT); } -static int -sub0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - sub0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -sub0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - sub0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - static void sub0_sock_send(void *arg, nni_aio *aio) { @@ -315,16 +299,13 @@ sub0_sock_filter(void *arg, nni_msg *msg) size_t len; int match; - nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } - body = nni_msg_body(msg); len = nni_msg_len(msg); match = 0; + + nni_mtx_lock(&s->lk); + // Check to see if the message matches one of our subscriptions. NNI_LIST_FOREACH (&s->topics, topic) { if (len >= topic->len) { @@ -362,12 +343,6 @@ static nni_proto_pipe_ops sub0_pipe_ops = { static nni_proto_sock_option sub0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = sub0_sock_getopt_raw, - .pso_setopt = sub0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_SUB_SUBSCRIBE, .pso_type = NNI_TYPE_OPAQUE, .pso_getopt = NULL, @@ -396,6 +371,17 @@ static nni_proto_sock_ops sub0_sock_ops = { .sock_options = sub0_sock_options, }; +static nni_proto_sock_ops sub0_sock_ops_raw = { + .sock_init = sub0_sock_init, + .sock_fini = sub0_sock_fini, + .sock_open = sub0_sock_open, + .sock_close = sub0_sock_close, + .sock_send = sub0_sock_send, + .sock_recv = sub0_sock_recv, + .sock_filter = NULL, // raw does not filter + .sock_options = sub0_sock_options, +}; + static nni_proto sub0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SUB_V0, "sub" }, @@ -405,8 +391,23 @@ static nni_proto sub0_proto = { .proto_pipe_ops = &sub0_pipe_ops, }; +static nni_proto sub0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SUB_V0, "sub" }, + .proto_peer = { NNI_PROTO_PUB_V0, "pub" }, + .proto_flags = NNI_PROTO_FLAG_RCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &sub0_sock_ops_raw, + .proto_pipe_ops = &sub0_pipe_ops, +}; + int nng_sub0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &sub0_proto)); } + +int +nng_sub0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &sub0_proto_raw)); +} diff --git a/src/protocol/pubsub0/sub.h b/src/protocol/pubsub0/sub.h index 1a09145d..acb5cda3 100644 --- a/src/protocol/pubsub0/sub.h +++ b/src/protocol/pubsub0/sub.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -17,10 +17,16 @@ extern "C" { NNG_DECL int nng_sub0_open(nng_socket *); +NNG_DECL int nng_sub0_open_raw(nng_socket *); + #ifndef nng_sub_open #define nng_sub_open nng_sub0_open #endif +#ifndef nng_sub_open_raw +#define nng_sub_open_raw nng_sub0_open_raw +#endif + #define NNG_OPT_SUB_SUBSCRIBE "sub:subscribe" #define NNG_OPT_SUB_UNSUBSCRIBE "sub:unsubscribe" diff --git a/src/protocol/reqrep0/rep.c b/src/protocol/reqrep0/rep.c index f62406cd..78a1f2ee 100644 --- a/src/protocol/reqrep0/rep.c +++ b/src/protocol/reqrep0/rep.c @@ -41,7 +41,6 @@ struct rep0_sock { nni_msgq * uwq; nni_msgq * urq; nni_mtx lk; - bool raw; int ttl; nni_idhash *pipes; char * btrace; @@ -92,7 +91,6 @@ rep0_sock_init(void **sp, nni_sock *sock) } s->ttl = 8; // Per RFC - s->raw = false; s->btrace = NULL; s->btrace_len = 0; s->uwq = nni_sock_sendq(sock); @@ -353,25 +351,6 @@ rep0_pipe_putq_cb(void *arg) } static int -rep0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - rep0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->lk); - rv = nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->lk); - return (rv); -} - -static int -rep0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - rep0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int rep0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { rep0_sock *s = arg; @@ -393,10 +372,6 @@ rep0_sock_filter(void *arg, nni_msg *msg) size_t len; nni_mtx_lock(&s->lk); - if (s->raw) { - nni_mtx_unlock(&s->lk); - return (msg); - } len = nni_msg_header_len(msg); header = nni_msg_header(msg); @@ -417,6 +392,13 @@ rep0_sock_filter(void *arg, nni_msg *msg) } static void +rep0_sock_send_raw(void *arg, nni_aio *aio) +{ + rep0_sock *s = arg; + nni_msgq_aio_put(s->uwq, aio); +} + +static void rep0_sock_send(void *arg, nni_aio *aio) { rep0_sock *s = arg; @@ -424,12 +406,6 @@ rep0_sock_send(void *arg, nni_aio *aio) nni_msg * msg; nni_mtx_lock(&s->lk); - if (s->raw) { - // Pass thru - nni_mtx_unlock(&s->lk); - nni_msgq_aio_put(s->uwq, aio); - return; - } if (s->btrace == NULL) { nni_mtx_unlock(&s->lk); nni_aio_finish_error(aio, NNG_ESTATE); @@ -475,12 +451,6 @@ static nni_proto_pipe_ops rep0_pipe_ops = { static nni_proto_sock_option rep0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = rep0_sock_getopt_raw, - .pso_setopt = rep0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = rep0_sock_getopt_maxttl, @@ -503,6 +473,17 @@ static nni_proto_sock_ops rep0_sock_ops = { .sock_recv = rep0_sock_recv, }; +static nni_proto_sock_ops rep0_sock_ops_raw = { + .sock_init = rep0_sock_init, + .sock_fini = rep0_sock_fini, + .sock_open = rep0_sock_open, + .sock_close = rep0_sock_close, + .sock_options = rep0_sock_options, + .sock_filter = NULL, // No filtering for raw mode + .sock_send = rep0_sock_send_raw, + .sock_recv = rep0_sock_recv, +}; + static nni_proto rep0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_REP_V0, "rep" }, @@ -512,8 +493,23 @@ static nni_proto rep0_proto = { .proto_pipe_ops = &rep0_pipe_ops, }; +static nni_proto rep0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REP_V0, "rep" }, + .proto_peer = { NNI_PROTO_REQ_V0, "req" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &rep0_sock_ops_raw, + .proto_pipe_ops = &rep0_pipe_ops, +}; + int nng_rep0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &rep0_proto)); } + +int +nng_rep0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &rep0_proto_raw)); +} diff --git a/src/protocol/reqrep0/rep.h b/src/protocol/reqrep0/rep.h index 93df9379..612127a2 100644 --- a/src/protocol/reqrep0/rep.h +++ b/src/protocol/reqrep0/rep.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_rep0_open(nng_socket *); +NNG_DECL int nng_rep0_open_raw(nng_socket *); #ifndef nng_rep_open #define nng_rep_open nng_rep0_open #endif +#ifndef nng_rep_open +#define nng_rep_open_raw nng_rep0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/reqrep0/req.c b/src/protocol/reqrep0/req.c index 0a5b566a..4d35ca1f 100644 --- a/src/protocol/reqrep0/req.c +++ b/src/protocol/reqrep0/req.c @@ -78,7 +78,7 @@ static void req0_recv_cb(void *); static void req0_putq_cb(void *); static int -req0_sock_init(void **sp, nni_sock *sock) +req0_sock_init_impl(void **sp, nni_sock *sock, bool raw) { req0_sock *s; @@ -96,7 +96,7 @@ req0_sock_init(void **sp, nni_sock *sock) s->nextid = nni_random(); s->retry = NNI_SECOND * 60; s->reqmsg = NULL; - s->raw = false; + s->raw = raw; s->wantw = false; s->resend = NNI_TIME_ZERO; s->ttl = 8; @@ -107,6 +107,18 @@ req0_sock_init(void **sp, nni_sock *sock) return (0); } +static int +req0_sock_init(void **sp, nni_sock *sock) +{ + return (req0_sock_init_impl(sp, sock, false)); +} + +static int +req0_sock_init_raw(void **sp, nni_sock *sock) +{ + return (req0_sock_init_impl(sp, sock, true)); +} + static void req0_sock_open(void *arg) { @@ -250,20 +262,6 @@ req0_pipe_stop(void *arg) } static int -req0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - req0_sock *s = arg; - return (nni_copyin_bool(&s->raw, buf, sz, typ)); -} - -static int -req0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - req0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int req0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { req0_sock *s = arg; @@ -513,11 +511,6 @@ req0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } msg = nni_aio_get_msg(aio); len = nni_msg_len(msg); @@ -559,6 +552,14 @@ req0_sock_send(void *arg, nni_aio *aio) nni_aio_finish(aio, 0, len); } +static void +req0_sock_send_raw(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + static nni_msg * req0_sock_filter(void *arg, nni_msg *msg) { @@ -566,11 +567,6 @@ req0_sock_filter(void *arg, nni_msg *msg) nni_msg * rmsg; nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } if (nni_msg_header_len(msg) < 4) { nni_mtx_unlock(&s->mtx); @@ -608,17 +604,23 @@ req0_sock_recv(void *arg, nni_aio *aio) req0_sock *s = arg; nni_mtx_lock(&s->mtx); - if (!s->raw) { - if (s->reqmsg == NULL) { - nni_mtx_unlock(&s->mtx); - nni_aio_finish_error(aio, NNG_ESTATE); - return; - } + if (s->reqmsg == NULL) { + nni_mtx_unlock(&s->mtx); + nni_aio_finish_error(aio, NNG_ESTATE); + return; } nni_mtx_unlock(&s->mtx); nni_msgq_aio_get(s->urq, aio); } +static void +req0_sock_recv_raw(void *arg, nni_aio *aio) +{ + req0_sock *s = arg; + + nni_msgq_aio_get(s->urq, aio); +} + static nni_proto_pipe_ops req0_pipe_ops = { .pipe_init = req0_pipe_init, .pipe_fini = req0_pipe_fini, @@ -628,12 +630,6 @@ static nni_proto_pipe_ops req0_pipe_ops = { static nni_proto_sock_option req0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = req0_sock_getopt_raw, - .pso_setopt = req0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = req0_sock_getopt_maxttl, @@ -676,3 +672,28 @@ nng_req0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &req0_proto)); } + +static nni_proto_sock_ops req0_sock_ops_raw = { + .sock_init = req0_sock_init_raw, + .sock_fini = req0_sock_fini, + .sock_open = req0_sock_open, + .sock_close = req0_sock_close, + .sock_options = req0_sock_options, + .sock_send = req0_sock_send_raw, + .sock_recv = req0_sock_recv_raw, +}; + +static nni_proto req0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_REQ_V0, "req" }, + .proto_peer = { NNI_PROTO_REP_V0, "rep" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &req0_sock_ops_raw, + .proto_pipe_ops = &req0_pipe_ops, +}; + +int +nng_req0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &req0_proto_raw)); +}
\ No newline at end of file diff --git a/src/protocol/reqrep0/req.h b/src/protocol/reqrep0/req.h index 99c9bf62..392c7932 100644 --- a/src/protocol/reqrep0/req.h +++ b/src/protocol/reqrep0/req.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,10 +16,14 @@ extern "C" { #endif NNG_DECL int nng_req0_open(nng_socket *); +NNG_DECL int nng_req0_open_raw(nng_socket *); #ifndef nng_req_open #define nng_req_open nng_req0_open #endif +#ifndef nng_req_open_raw +#define nng_req_open_raw nng_req0_open_raw +#endif #define NNG_OPT_REQ_RESENDTIME "req:resend-time" diff --git a/src/protocol/survey0/respond.c b/src/protocol/survey0/respond.c index eeb09d2a..1605d9e6 100644 --- a/src/protocol/survey0/respond.c +++ b/src/protocol/survey0/respond.c @@ -40,7 +40,6 @@ static void resp0_pipe_fini(void *); struct resp0_sock { nni_msgq * urq; nni_msgq * uwq; - bool raw; int ttl; nni_idhash *pipes; char * btrace; @@ -93,7 +92,6 @@ resp0_sock_init(void **sp, nni_sock *nsock) } s->ttl = 8; // Per RFC - s->raw = false; s->btrace = NULL; s->btrace_len = 0; s->urq = nni_sock_recvq(nsock); @@ -347,36 +345,25 @@ resp0_putq_cb(void *arg) } static int -resp0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) +resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { resp0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->mtx); - rv = nni_copyin_bool(&s->raw, buf, sz, typ); - nni_mtx_unlock(&s->mtx); - return (rv); + return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); } static int -resp0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) +resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) { resp0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); + return (nni_copyout_int(s->ttl, buf, szp, typ)); } -static int -resp0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) +static void +resp0_sock_send_raw(void *arg, nni_aio *aio) { resp0_sock *s = arg; - return (nni_copyin_int(&s->ttl, buf, sz, 1, 255, typ)); -} -static int -resp0_sock_getopt_maxttl(void *arg, void *buf, size_t *szp, int typ) -{ - resp0_sock *s = arg; - return (nni_copyout_int(s->ttl, buf, szp, typ)); + nni_msgq_aio_put(s->uwq, aio); } static void @@ -387,11 +374,6 @@ resp0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } msg = nni_aio_get_msg(aio); @@ -428,10 +410,6 @@ resp0_sock_filter(void *arg, nni_msg *msg) size_t len; nni_mtx_lock(&s->mtx); - if (s->raw) { - nni_mtx_unlock(&s->mtx); - return (msg); - } len = nni_msg_header_len(msg); header = nni_msg_header(msg); @@ -469,12 +447,6 @@ static nni_proto_pipe_ops resp0_pipe_ops = { static nni_proto_sock_option resp0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = resp0_sock_getopt_raw, - .pso_setopt = resp0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_MAXTTL, .pso_type = NNI_TYPE_INT32, .pso_getopt = resp0_sock_getopt_maxttl, @@ -497,6 +469,17 @@ static nni_proto_sock_ops resp0_sock_ops = { .sock_options = resp0_sock_options, }; +static nni_proto_sock_ops resp0_sock_ops_raw = { + .sock_init = resp0_sock_init, + .sock_fini = resp0_sock_fini, + .sock_open = resp0_sock_open, + .sock_close = resp0_sock_close, + .sock_filter = NULL, // no filter for raw + .sock_send = resp0_sock_send_raw, + .sock_recv = resp0_sock_recv, + .sock_options = resp0_sock_options, +}; + static nni_proto resp0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, @@ -506,8 +489,23 @@ static nni_proto resp0_proto = { .proto_pipe_ops = &resp0_pipe_ops, }; +static nni_proto resp0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_peer = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &resp0_sock_ops_raw, + .proto_pipe_ops = &resp0_pipe_ops, +}; + int nng_respondent0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &resp0_proto)); } + +int +nng_respondent0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &resp0_proto_raw)); +} diff --git a/src/protocol/survey0/respond.h b/src/protocol/survey0/respond.h index 58c65298..b865b2ac 100644 --- a/src/protocol/survey0/respond.h +++ b/src/protocol/survey0/respond.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_respondent0_open(nng_socket *); +NNG_DECL int nng_respondent0_open_raw(nng_socket *); #ifndef nng_respondent_open #define nng_respondent_open nng_respondent0_open #endif +#ifndef nng_respondent_open_raw +#define nng_respondent_open_raw nng_respondent0_open_raw +#endif + #ifdef __cplusplus } #endif diff --git a/src/protocol/survey0/survey.c b/src/protocol/survey0/survey.c index a5909015..b7158464 100644 --- a/src/protocol/survey0/survey.c +++ b/src/protocol/survey0/survey.c @@ -39,7 +39,6 @@ static void surv0_timeout(void *); struct surv0_sock { nni_duration survtime; nni_time expire; - bool raw; int ttl; uint32_t nextid; // next id uint32_t survid; // outstanding request ID (big endian) @@ -92,7 +91,6 @@ surv0_sock_init(void **sp, nni_sock *nsock) nni_timer_init(&s->timer, surv0_timeout, s); s->nextid = nni_random(); - s->raw = false; s->survtime = NNI_SECOND; s->expire = NNI_TIME_ZERO; s->uwq = nni_sock_sendq(nsock); @@ -275,28 +273,6 @@ failed: } static int -surv0_sock_setopt_raw(void *arg, const void *buf, size_t sz, int typ) -{ - surv0_sock *s = arg; - int rv; - - nni_mtx_lock(&s->mtx); - if ((rv = nni_copyin_bool(&s->raw, buf, sz, typ)) == 0) { - s->survid = 0; - nni_timer_cancel(&s->timer); - } - nni_mtx_unlock(&s->mtx); - return (rv); -} - -static int -surv0_sock_getopt_raw(void *arg, void *buf, size_t *szp, int typ) -{ - surv0_sock *s = arg; - return (nni_copyout_bool(s->raw, buf, szp, typ)); -} - -static int surv0_sock_setopt_maxttl(void *arg, const void *buf, size_t sz, int typ) { surv0_sock *s = arg; @@ -391,6 +367,14 @@ surv0_sock_recv(void *arg, nni_aio *aio) } static void +surv0_sock_send_raw(void *arg, nni_aio *aio) +{ + surv0_sock *s = arg; + + nni_msgq_aio_put(s->uwq, aio); +} + +static void surv0_sock_send(void *arg, nni_aio *aio) { surv0_sock *s = arg; @@ -398,13 +382,6 @@ surv0_sock_send(void *arg, nni_aio *aio) int rv; nni_mtx_lock(&s->mtx); - if (s->raw) { - // No automatic retry, and the request ID must - // be in the header coming down. - nni_mtx_unlock(&s->mtx); - nni_msgq_aio_put(s->uwq, aio); - return; - } // Generate a new request ID. We always set the high // order bit so that the peer can locate the end of the @@ -437,11 +414,6 @@ surv0_sock_filter(void *arg, nni_msg *msg) surv0_sock *s = arg; nni_mtx_lock(&s->mtx); - if (s->raw) { - // Pass it unmolested - nni_mtx_unlock(&s->mtx); - return (msg); - } if ((nni_msg_header_len(msg) < sizeof(uint32_t)) || (nni_msg_header_trim_u32(msg) != s->survid)) { @@ -464,12 +436,6 @@ static nni_proto_pipe_ops surv0_pipe_ops = { static nni_proto_sock_option surv0_sock_options[] = { { - .pso_name = NNG_OPT_RAW, - .pso_type = NNI_TYPE_BOOL, - .pso_getopt = surv0_sock_getopt_raw, - .pso_setopt = surv0_sock_setopt_raw, - }, - { .pso_name = NNG_OPT_SURVEYOR_SURVEYTIME, .pso_type = NNI_TYPE_DURATION, .pso_getopt = surv0_sock_getopt_surveytime, @@ -498,6 +464,17 @@ static nni_proto_sock_ops surv0_sock_ops = { .sock_options = surv0_sock_options, }; +static nni_proto_sock_ops surv0_sock_ops_raw = { + .sock_init = surv0_sock_init, + .sock_fini = surv0_sock_fini, + .sock_open = surv0_sock_open, + .sock_close = surv0_sock_close, + .sock_send = surv0_sock_send_raw, + .sock_recv = surv0_sock_recv, + .sock_filter = surv0_sock_filter, + .sock_options = surv0_sock_options, +}; + static nni_proto surv0_proto = { .proto_version = NNI_PROTOCOL_VERSION, .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, @@ -507,8 +484,23 @@ static nni_proto surv0_proto = { .proto_pipe_ops = &surv0_pipe_ops, }; +static nni_proto surv0_proto_raw = { + .proto_version = NNI_PROTOCOL_VERSION, + .proto_self = { NNI_PROTO_SURVEYOR_V0, "surveyor" }, + .proto_peer = { NNI_PROTO_RESPONDENT_V0, "respondent" }, + .proto_flags = NNI_PROTO_FLAG_SNDRCV | NNI_PROTO_FLAG_RAW, + .proto_sock_ops = &surv0_sock_ops_raw, + .proto_pipe_ops = &surv0_pipe_ops, +}; + int nng_surveyor0_open(nng_socket *sidp) { return (nni_proto_open(sidp, &surv0_proto)); } + +int +nng_surveyor0_open_raw(nng_socket *sidp) +{ + return (nni_proto_open(sidp, &surv0_proto_raw)); +} diff --git a/src/protocol/survey0/survey.h b/src/protocol/survey0/survey.h index a7b6d943..37f76fbf 100644 --- a/src/protocol/survey0/survey.h +++ b/src/protocol/survey0/survey.h @@ -1,6 +1,6 @@ // -// Copyright 2017 Garrett D'Amore <garrett@damore.org> -// Copyright 2017 Capitar IT Group BV <info@capitar.com> +// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech> +// Copyright 2018 Capitar IT Group BV <info@capitar.com> // // This software is supplied under the terms of the MIT License, a // copy of which should be located in the distribution where this @@ -16,11 +16,16 @@ extern "C" { #endif NNG_DECL int nng_surveyor0_open(nng_socket *); +NNG_DECL int nng_surveyor0_open_raw(nng_socket *); #ifndef nng_surveyor_open #define nng_surveyor_open nng_surveyor0_open #endif +#ifndef nng_surveyor_open_raw +#define nng_surveyor_open_raw nng_surveyor0_open_raw +#endif + #define NNG_OPT_SURVEYOR_SURVEYTIME "surveyor:survey-time" #ifdef __cplusplus diff --git a/tests/device.c b/tests/device.c index c177a2e5..b1f97123 100644 --- a/tests/device.c +++ b/tests/device.c @@ -42,6 +42,12 @@ Main({ const char *addr1 = "inproc://dev1"; const char *addr2 = "inproc://dev2"; + Convey("We cannot create cooked mode device", { + nng_socket s1; + So(nng_pair1_open(&s1) == 0); + Reset({ nng_close(s1); }); + So(nng_device(s1, s1) == NNG_EINVAL); + }); Convey("We can create a PAIRv1 device", { nng_socket dev1; nng_socket dev2; @@ -51,11 +57,8 @@ Main({ nng_msg * msg; nng_thread * thr; - So(nng_pair1_open(&dev1) == 0); - So(nng_pair1_open(&dev2) == 0); - - So(nng_setopt_bool(dev1, NNG_OPT_RAW, true) == 0); - So(nng_setopt_bool(dev2, NNG_OPT_RAW, true) == 0); + So(nng_pair1_open_raw(&dev1) == 0); + So(nng_pair1_open_raw(&dev2) == 0); struct dev_data ddata; ddata.s1 = dev1; diff --git a/tests/pair1.c b/tests/pair1.c index bff6a44b..ef3fe263 100644 --- a/tests/pair1.c +++ b/tests/pair1.c @@ -101,14 +101,8 @@ TestMain("PAIRv1 protocol", { }); Convey("Cannot set raw mode after connect", { - So(nng_listen(s1, addr, NULL, 0) == 0); - So(nng_dial(c1, addr, NULL, 0) == 0); - nng_msleep(100); - So(nng_setopt_bool(s1, NNG_OPT_RAW, true) == - NNG_ESTATE); - So(nng_setopt_bool(c1, NNG_OPT_RAW, false) == - NNG_ESTATE); + NNG_EREADONLY); }); Convey("Polyamorous mode is best effort", { @@ -176,150 +170,6 @@ TestMain("PAIRv1 protocol", { NNG_ESTATE); }); - Convey("Monogamous raw mode works", { - nng_msg *msg; - uint32_t hops; - - So(nng_setopt_bool(s1, NNG_OPT_RAW, true) == 0); - So(nng_setopt_bool(c1, NNG_OPT_RAW, true) == 0); - So(nng_setopt_bool(c2, NNG_OPT_RAW, true) == 0); - - So(nng_listen(s1, addr, NULL, 0) == 0); - So(nng_dial(c1, addr, NULL, 0) == 0); - nng_msleep(20); - - Convey("Send/recv work", { - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "GAMMA"); - So(nng_msg_header_append_u32(msg, 1) == 0); - So(nng_msg_header_len(msg) == - sizeof(uint32_t)); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == 0); - So(nng_msg_get_pipe(msg) != 0); - CHECKSTR(msg, "GAMMA"); - So(nng_msg_header_len(msg) == - sizeof(uint32_t)); - So(nng_msg_header_trim_u32(msg, &hops) == 0); - So(hops == 2); - nng_msg_free(msg); - - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "EPSILON"); - So(nng_msg_header_append_u32(msg, 1) == 0); - So(nng_sendmsg(s1, msg, 0) == 0); - So(nng_recvmsg(c1, &msg, 0) == 0); - CHECKSTR(msg, "EPSILON"); - So(nng_msg_header_len(msg) == - sizeof(uint32_t)); - So(nng_msg_header_trim_u32(msg, &hops) == 0); - So(nng_msg_get_pipe(msg) != 0); - So(hops == 2); - nng_msg_free(msg); - }); - - Convey("Missing raw header fails", { - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT); - - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_append_u32(msg, 0xFEEDFACE) == 0); - So(nng_msg_header_append_u32(msg, 1) == 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == 0); - So(nng_msg_trim_u32(msg, &v) == 0); - So(v == 0xFEEDFACE); - nng_msg_free(msg); - }); - - Convey("Reserved bits in raw header", { - - Convey("Nonzero bits fail", { - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_header_append_u32( - msg, 0xDEAD0000) == 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == - NNG_ETIMEDOUT); - }); - Convey("Zero bits pass", { - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_append_u32( - msg, 0xFEEDFACE) == 0); - So(nng_msg_header_append_u32(msg, 1) == - 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == 0); - So(nng_msg_trim_u32(msg, &v) == 0); - So(v == 0xFEEDFACE); - nng_msg_free(msg); - }); - }); - - Convey("TTL is honored", { - int ttl; - - So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 4) == 0); - So(nng_getopt_int(s1, NNG_OPT_MAXTTL, &ttl) == - 0); - So(ttl == 4); - Convey("Bad TTL bounces", { - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_header_append_u32(msg, 4) == - 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == - NNG_ETIMEDOUT); - }); - Convey("Good TTL passes", { - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_append_u32( - msg, 0xFEEDFACE) == 0); - So(nng_msg_header_append_u32(msg, 3) == - 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == 0); - So(nng_msg_trim_u32(msg, &v) == 0); - So(v == 0xFEEDFACE); - So(nng_msg_header_trim_u32(msg, &v) == - 0); - So(v == 4); - nng_msg_free(msg); - }); - - Convey("Large TTL passes", { - ttl = 0xff; - So(nng_setopt_int( - s1, NNG_OPT_MAXTTL, 0xff) == 0); - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_append_u32(msg, 1234) == 0); - So(nng_msg_header_append_u32( - msg, 0xfe) == 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == 0); - So(nng_msg_trim_u32(msg, &v) == 0); - So(v == 1234); - So(nng_msg_header_trim_u32(msg, &v) == - 0); - So(v == 0xff); - nng_msg_free(msg); - }); - - Convey("Max TTL fails", { - ttl = 0xff; - So(nng_setopt_int( - s1, NNG_OPT_MAXTTL, 0xff) == 0); - So(nng_msg_alloc(&msg, 0) == 0); - So(nng_msg_header_append_u32( - msg, 0xff) == 0); - So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == - NNG_ETIMEDOUT); - }); - }); - }); - Convey("We cannot set insane TTLs", { int ttl; @@ -428,94 +278,253 @@ TestMain("PAIRv1 protocol", { CHECKSTR(msg, "AGAIN"); nng_msg_free(msg); }); + }); - Convey("Polyamorous raw mode works", { - nng_msg *msg; - bool v; - uint32_t hops; - nng_pipe p1; - nng_pipe p2; + Convey("Monogamous raw mode works", { + nng_msg *msg; + uint32_t hops; - So(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0); - So(v == 0); + So(nng_pair1_open_raw(&s1) == 0); + So(nng_pair1_open_raw(&c1) == 0); + So(nng_pair1_open_raw(&c2) == 0); - So(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0); - So(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0); - So(v == true); + Reset({ + nng_close(s1); + nng_close(c1); + nng_close(c2); + }); - v = false; - So(nng_setopt_bool(s1, NNG_OPT_RAW, true) == 0); - So(nng_getopt_bool(s1, NNG_OPT_RAW, &v) == 0); - So(v == true); + tmo = MILLISECOND(300); + So(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, tmo) == 0); + So(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, tmo) == 0); + So(nng_setopt_ms(c2, NNG_OPT_RECVTIMEO, tmo) == 0); + tmo = 0; + So(nng_getopt_ms(s1, NNG_OPT_RECVTIMEO, &tmo) == 0); + So(tmo == MILLISECOND(300)); - So(nng_listen(s1, addr, NULL, 0) == 0); - So(nng_dial(c1, addr, NULL, 0) == 0); - So(nng_dial(c2, addr, NULL, 0) == 0); - nng_msleep(20); + So(nng_listen(s1, addr, NULL, 0) == 0); + So(nng_dial(c1, addr, NULL, 0) == 0); + nng_msleep(20); + + Convey("Send/recv work", { + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "GAMMA"); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_msg_header_len(msg) == sizeof(uint32_t)); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == 0); + So(nng_msg_get_pipe(msg) != 0); + CHECKSTR(msg, "GAMMA"); + So(nng_msg_header_len(msg) == sizeof(uint32_t)); + So(nng_msg_header_trim_u32(msg, &hops) == 0); + So(hops == 2); + nng_msg_free(msg); + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "EPSILON"); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_sendmsg(s1, msg, 0) == 0); + So(nng_recvmsg(c1, &msg, 0) == 0); + CHECKSTR(msg, "EPSILON"); + So(nng_msg_header_len(msg) == sizeof(uint32_t)); + So(nng_msg_header_trim_u32(msg, &hops) == 0); + So(nng_msg_get_pipe(msg) != 0); + So(hops == 2); + nng_msg_free(msg); + }); + + Convey("Missing raw header fails", { + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT); + + So(nng_msg_alloc(&msg, 0) == 0); + So(nng_msg_append_u32(msg, 0xFEEDFACE) == 0); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == 0); + So(nng_msg_trim_u32(msg, &v) == 0); + So(v == 0xFEEDFACE); + nng_msg_free(msg); + }); - Convey("Send/recv works", { + Convey("Reserved bits in raw header", { + + Convey("Nonzero bits fail", { So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "ONE"); + So(nng_msg_header_append_u32( + msg, 0xDEAD0000) == 0); So(nng_sendmsg(c1, msg, 0) == 0); - So(nng_recvmsg(s1, &msg, 0) == 0); - CHECKSTR(msg, "ONE"); - p1 = nng_msg_get_pipe(msg); - So(p1 != 0); - So(nng_msg_header_trim_u32(msg, &hops) == 0); - So(hops == 1); - nng_msg_free(msg); - + So(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT); + }); + Convey("Zero bits pass", { So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "TWO"); - So(nng_sendmsg(c2, msg, 0) == 0); + So(nng_msg_append_u32(msg, 0xFEEDFACE) == 0); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_sendmsg(c1, msg, 0) == 0); So(nng_recvmsg(s1, &msg, 0) == 0); - CHECKSTR(msg, "TWO"); - p2 = nng_msg_get_pipe(msg); - So(p2 != 0); - So(nng_msg_header_trim_u32(msg, &hops) == 0); - So(hops == 1); + So(nng_msg_trim_u32(msg, &v) == 0); + So(v == 0xFEEDFACE); nng_msg_free(msg); + }); + }); - So(p1 != p2); + Convey("TTL is honored", { + int ttl; + So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 4) == 0); + So(nng_getopt_int(s1, NNG_OPT_MAXTTL, &ttl) == 0); + So(ttl == 4); + Convey("Bad TTL bounces", { So(nng_msg_alloc(&msg, 0) == 0); - nng_msg_set_pipe(msg, p1); - APPENDSTR(msg, "UNO"); - So(nng_msg_header_append_u32(msg, 1) == 0); - So(nng_sendmsg(s1, msg, 0) == 0); - So(nng_recvmsg(c1, &msg, 0) == 0); - CHECKSTR(msg, "UNO"); - nng_msg_free(msg); - + So(nng_msg_header_append_u32(msg, 4) == 0); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT); + }); + Convey("Good TTL passes", { So(nng_msg_alloc(&msg, 0) == 0); - nng_msg_set_pipe(msg, p2); - APPENDSTR(msg, "DOS"); - So(nng_msg_header_append_u32(msg, 1) == 0); - So(nng_sendmsg(s1, msg, 0) == 0); - So(nng_recvmsg(c2, &msg, 0) == 0); - CHECKSTR(msg, "DOS"); + So(nng_msg_append_u32(msg, 0xFEEDFACE) == 0); + So(nng_msg_header_append_u32(msg, 3) == 0); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == 0); + So(nng_msg_trim_u32(msg, &v) == 0); + So(v == 0xFEEDFACE); + So(nng_msg_header_trim_u32(msg, &v) == 0); + So(v == 4); nng_msg_free(msg); }); - Convey("Closed pipes don't work", { + Convey("Large TTL passes", { + ttl = 0xff; + So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0xff) == + 0); So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "ONE"); + So(nng_msg_append_u32(msg, 1234) == 0); + So(nng_msg_header_append_u32(msg, 0xfe) == 0); So(nng_sendmsg(c1, msg, 0) == 0); So(nng_recvmsg(s1, &msg, 0) == 0); - CHECKSTR(msg, "ONE"); - p1 = nng_msg_get_pipe(msg); - So(p1 != 0); + So(nng_msg_trim_u32(msg, &v) == 0); + So(v == 1234); + So(nng_msg_header_trim_u32(msg, &v) == 0); + So(v == 0xff); nng_msg_free(msg); + }); - nng_close(c1); - + Convey("Max TTL fails", { + ttl = 0xff; + So(nng_setopt_int(s1, NNG_OPT_MAXTTL, 0xff) == + 0); So(nng_msg_alloc(&msg, 0) == 0); - nng_msg_set_pipe(msg, p1); - APPENDSTR(msg, "EIN"); - So(nng_msg_header_append_u32(msg, 1) == 0); - So(nng_sendmsg(s1, msg, 0) == 0); - So(nng_recvmsg(c2, &msg, 0) == NNG_ETIMEDOUT); + So(nng_msg_header_append_u32(msg, 0xff) == 0); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == NNG_ETIMEDOUT); }); }); }); + + Convey("Polyamorous raw mode works", { + nng_msg *msg; + bool v; + uint32_t hops; + nng_pipe p1; + nng_pipe p2; + + So(nng_pair1_open_raw(&s1) == 0); + So(nng_pair1_open(&c1) == 0); + So(nng_pair1_open(&c2) == 0); + + Reset({ + nng_close(s1); + nng_close(c1); + nng_close(c2); + }); + + tmo = MILLISECOND(300); + So(nng_setopt_ms(s1, NNG_OPT_RECVTIMEO, tmo) == 0); + So(nng_setopt_ms(c1, NNG_OPT_RECVTIMEO, tmo) == 0); + So(nng_setopt_ms(c2, NNG_OPT_RECVTIMEO, tmo) == 0); + tmo = 0; + So(nng_getopt_ms(s1, NNG_OPT_RECVTIMEO, &tmo) == 0); + So(tmo == MILLISECOND(300)); + + So(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0); + So(v == 0); + + So(nng_setopt_bool(s1, NNG_OPT_PAIR1_POLY, true) == 0); + So(nng_getopt_bool(s1, NNG_OPT_PAIR1_POLY, &v) == 0); + So(v == true); + + v = false; + So(nng_getopt_bool(s1, NNG_OPT_RAW, &v) == 0); + So(v == true); + + So(nng_listen(s1, addr, NULL, 0) == 0); + So(nng_dial(c1, addr, NULL, 0) == 0); + So(nng_dial(c2, addr, NULL, 0) == 0); + nng_msleep(20); + + Convey("Send/recv works", { + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "ONE"); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == 0); + CHECKSTR(msg, "ONE"); + p1 = nng_msg_get_pipe(msg); + So(p1 != 0); + So(nng_msg_header_trim_u32(msg, &hops) == 0); + So(hops == 1); + nng_msg_free(msg); + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "TWO"); + So(nng_sendmsg(c2, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == 0); + CHECKSTR(msg, "TWO"); + p2 = nng_msg_get_pipe(msg); + So(p2 != 0); + So(nng_msg_header_trim_u32(msg, &hops) == 0); + So(hops == 1); + nng_msg_free(msg); + + So(p1 != p2); + + So(nng_msg_alloc(&msg, 0) == 0); + nng_msg_set_pipe(msg, p1); + APPENDSTR(msg, "UNO"); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_sendmsg(s1, msg, 0) == 0); + So(nng_recvmsg(c1, &msg, 0) == 0); + CHECKSTR(msg, "UNO"); + nng_msg_free(msg); + + So(nng_msg_alloc(&msg, 0) == 0); + nng_msg_set_pipe(msg, p2); + APPENDSTR(msg, "DOS"); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_sendmsg(s1, msg, 0) == 0); + So(nng_recvmsg(c2, &msg, 0) == 0); + CHECKSTR(msg, "DOS"); + nng_msg_free(msg); + }); + + Convey("Closed pipes don't work", { + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "ONE"); + So(nng_sendmsg(c1, msg, 0) == 0); + So(nng_recvmsg(s1, &msg, 0) == 0); + CHECKSTR(msg, "ONE"); + p1 = nng_msg_get_pipe(msg); + So(p1 != 0); + nng_msg_free(msg); + + nng_close(c1); + + So(nng_msg_alloc(&msg, 0) == 0); + nng_msg_set_pipe(msg, p1); + APPENDSTR(msg, "EIN"); + So(nng_msg_header_append_u32(msg, 1) == 0); + So(nng_sendmsg(s1, msg, 0) == 0); + So(nng_recvmsg(c2, &msg, 0) == NNG_ETIMEDOUT); + }); + }); }) diff --git a/tests/pubsub.c b/tests/pubsub.c index bd0d7f56..2151611d 100644 --- a/tests/pubsub.c +++ b/tests/pubsub.c @@ -146,19 +146,44 @@ TestMain("PUB/SUB pattern", { So(nng_recvmsg(sub, &msg, 0) == NNG_ETIMEDOUT); }); - Convey("Subs in raw receive", { + }); - nng_msg *msg; + Convey("Subs in raw receive", { - So(nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 90) == 0); - So(nng_setopt_bool(sub, NNG_OPT_RAW, true) == 0); + nng_msg * msg; + nng_socket pub; + nng_socket sub; + bool raw; - So(nng_msg_alloc(&msg, 0) == 0); - APPENDSTR(msg, "/some/like/it/raw"); - So(nng_sendmsg(pub, msg, 0) == 0); - So(nng_recvmsg(sub, &msg, 0) == 0); - CHECKSTR(msg, "/some/like/it/raw"); - nng_msg_free(msg); + So(nng_pub_open(&pub) == 0); + + So(nng_sub_open_raw(&sub) == 0); + + Reset({ + nng_close(pub); + nng_close(sub); }); + + // Most applications will usually have the pub listen, + // and the sub dial. However, this creates a problem + // for our tests, since we can wind up trying to push + // data before the pipe is fully registered (the accept + // runs asynchronously.) + So(nng_listen(sub, addr, NULL, 0) == 0); + So(nng_dial(pub, addr, NULL, 0) == 0); + + nng_msleep(20); // give time for connecting threads + + So(nng_setopt_ms(sub, NNG_OPT_RECVTIMEO, 90) == 0); + So(nng_getopt_bool(sub, NNG_OPT_RAW, &raw) == 0); + So(raw == true); + + So(nng_msg_alloc(&msg, 0) == 0); + APPENDSTR(msg, "/some/like/it/raw"); + So(nng_sendmsg(pub, msg, 0) == 0); + So(nng_recvmsg(sub, &msg, 0) == 0); + CHECKSTR(msg, "/some/like/it/raw"); + nng_msg_free(msg); }); + }) diff --git a/tests/reqrep.c b/tests/reqrep.c index 4e837c34..97ed371a 100644 --- a/tests/reqrep.c +++ b/tests/reqrep.c @@ -13,6 +13,7 @@ #include "protocol/reqrep0/rep.h" #include "protocol/reqrep0/req.h" #include "stubs.h" +#include "supplemental/util/platform.h" #include <string.h> @@ -135,19 +136,44 @@ TestMain("REQ/REP pattern", { So(nng_listen(rep, addr, NULL, 0) == 0); So(nng_dial(req, addr, NULL, 0) == 0); + // Send req #1 (abc). So(nng_sendmsg(req, abc, 0) == 0); + + // Sleep a bit. This is so that we ensure that our + // request gets to the far side. (If we cancel too + // fast, then our outgoing send will be canceled before + // it gets to the wire.) + nng_msleep(20); + + // Send the next next request ("def"). Note that + // the REP side server will have already buffered the receive + // request, and should simply be waiting for us to reply to + // abc. So(nng_sendmsg(req, def, 0) == 0); + + // Receive the first request (should be abc) on the REP server. So(nng_recvmsg(rep, &cmd, 0) == 0); - So(cmd != NULL); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "abc") == 0); + // REP sends the reply to first command. This will be + // discarded by the REQ server. So(nng_sendmsg(rep, cmd, 0) == 0); + + // Now get the next command from the REP; should be "def". So(nng_recvmsg(rep, &cmd, 0) == 0); + So(nng_msg_len(cmd) == 4); + So(strcmp(nng_msg_body(cmd), "def") == 0); + + // And send it back to REQ. So(nng_sendmsg(rep, cmd, 0) == 0); - So(nng_recvmsg(req, &cmd, 0) == 0); + // Try a req command. This should give back "def" + So(nng_recvmsg(req, &cmd, 0) == 0); So(nng_msg_len(cmd) == 4); - So(memcmp(nng_msg_body(cmd), "def", 4) == 0); + So(strcmp(nng_msg_body(cmd), "def") == 0); nng_msg_free(cmd); }); + nng_fini(); }) diff --git a/tests/sock.c b/tests/sock.c index a5b9bdc1..c4de6ad4 100644 --- a/tests/sock.c +++ b/tests/sock.c @@ -128,11 +128,6 @@ TestMain("Socket Operations", { So(nng_getopt_bool(s1, NNG_OPT_RAW, &raw) == 0); So(raw == false); - So(nng_setopt_bool(s1, NNG_OPT_RAW, true) == - 0); - So(nng_getopt_bool(s1, NNG_OPT_RAW, &raw) == - 0); - So(raw == true); }); Convey("URL option works", { @@ -255,16 +250,9 @@ TestMain("Socket Operations", { sz) == NNG_EINVAL); }); - Convey("Bogus raw fails", { - // Bool type is 1 byte. - So(nng_setopt_int(s1, NNG_OPT_RAW, 42) == - NNG_EBADTYPE); - So(nng_setopt_int(s1, NNG_OPT_RAW, -42) == - NNG_EBADTYPE); - So(nng_setopt_int(s1, NNG_OPT_RAW, 0) == - NNG_EBADTYPE); - So(nng_setopt(s1, NNG_OPT_RAW, "abcd", 4) == - NNG_EINVAL); + Convey("Cannot set raw", { + So(nng_setopt_bool(s1, NNG_OPT_RAW, true) == + NNG_EREADONLY); }); Convey("Unsupported options fail", { |
