aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h3
-rw-r--r--src/core/socket.c78
-rw-r--r--src/core/socket.h9
-rw-r--r--src/nng.c33
-rw-r--r--src/nng.h66
5 files changed, 133 insertions, 56 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index 478bb464..de81752c 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -36,10 +36,11 @@ typedef struct nni_protocol nni_protocol;
typedef int nni_signal; // Turnstile/wakeup channel.
typedef uint64_t nni_time; // Absolute time (usec).
-typedef int nni_duration; // Relative time (usec).
+typedef int64_t nni_duration; // Relative time (usec).
// Some default timing things.
#define NNI_TIME_NEVER ((nni_time) 0xffffffffull)
#define NNI_TIME_ZERO ((nni_time) 0)
+#define NNI_SECOND (1000000)
#endif // CORE_DEFS_H
diff --git a/src/core/socket.c b/src/core/socket.c
index dee1561d..f3afb7bb 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -9,6 +9,8 @@
#include "core/nng_impl.h"
+#include <string.h>
+
// Socket implementation.
// nni_socket_sendq and nni_socket_recvq are called by the protocol to obtain
@@ -42,7 +44,11 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
return (NNG_ENOMEM);
}
sock->s_ops = *ops;
- sock->s_linger = NNG_LINGER_DEFAULT;
+ sock->s_linger = 0;
+ sock->s_sndtimeo = -1;
+ sock->s_rcvtimeo = -1;
+ sock->s_reconn = NNI_SECOND;
+ sock->s_reconnmax = NNI_SECOND;
if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
@@ -174,19 +180,10 @@ nni_socket_close(nni_socket *sock)
int
-nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
+nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_time expire)
{
int rv;
int besteffort;
- nni_time expire;
-
- if (tmout > 0) {
- expire = nni_clock() + tmout;
- } else if (tmout < 0) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = NNI_TIME_ZERO;
- }
// Senderr is typically set by protocols when the state machine
// indicates that it is no longer valid to send a message. E.g.
@@ -226,12 +223,12 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
int
-nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
+nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_time expire)
{
int rv;
- nni_time expire;
nni_msg *msg;
+#if 0
if (tmout > 0) {
expire = nni_clock() + tmout;
} else if (tmout < 0) {
@@ -239,6 +236,7 @@ nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
} else {
expire = NNI_TIME_ZERO;
}
+#endif
nni_mutex_enter(&sock->s_mx);
if (sock->s_closing) {
@@ -551,3 +549,57 @@ out:
return (rv);
}
+
+
+static int
+nni_setopt_duration(nni_duration *ptr, const void *val, size_t size)
+{
+ nni_duration dur;
+
+ if (size != sizeof (*ptr)) {
+ return (NNG_EINVAL);
+ }
+ memcpy(&dur, val, sizeof (dur));
+ if (dur < -1) {
+ return (-EINVAL);
+ }
+ *ptr = dur;
+ return (0);
+}
+
+
+int
+nni_socket_setopt(nni_socket *sock, int opt, const void *val, size_t size)
+{
+ size_t rsz;
+ void *ptr;
+ int rv = ENOTSUP;
+
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_ops.proto_setopt != NULL) {
+ rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size);
+ if (rv != NNG_ENOTSUP) {
+ nni_mutex_exit(&sock->s_mx);
+ return (rv);
+ }
+ }
+ switch (opt) {
+ case NNG_OPT_LINGER:
+ rv = nni_setopt_duration(&sock->s_linger, val, size);
+ break;
+ case NNG_OPT_SNDTIMEO:
+ rv = nni_setopt_duration(&sock->s_sndtimeo, val, size);
+ break;
+ case NNG_OPT_RCVTIMEO:
+ rv = nni_setopt_duration(&sock->s_rcvtimeo, val, size);
+ break;
+ case NNG_OPT_RECONN_TIME:
+ rv = nni_setopt_duration(&sock->s_reconn, val, size);
+ break;
+ case NNG_OPT_RECONN_MAXTIME:
+ rv = nni_setopt_duration(&sock->s_reconnmax, val, size);
+ break;
+ }
+ nni_mutex_exit(&sock->s_mx);
+ return (rv);
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 4fa59355..a31e09e5 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -25,7 +25,11 @@ struct nng_socket {
void * s_data; // Protocol private
// XXX: options
- nni_duration s_linger;
+ nni_duration s_linger; // linger time
+ nni_duration s_sndtimeo; // send timeout
+ nni_duration s_rcvtimeo; // receive timeout
+ nni_duration s_reconn; // reconnect time
+ nni_duration s_reconnmax; // max reconnect time
nni_list s_eps; // active endpoints
nni_list s_pipes; // pipes for this socket
@@ -45,6 +49,7 @@ extern void nni_socket_rem_pipe(nni_socket *, nni_pipe *);
extern uint16_t nni_socket_proto(nni_socket *);
extern int nni_socket_setopt(nni_socket *, int, const void *, size_t);
extern int nni_socket_getopt(nni_socket *, int, void *, size_t *);
-extern int nni_socket_recvmsg(nni_socket *, nni_msg **, int);
+extern int nni_socket_recvmsg(nni_socket *, nni_msg **, nni_time);
+extern int nni_socket_sendmsg(nni_socket *, nni_msg *, nni_time);
#endif // CORE_SOCKET_H
diff --git a/src/nng.c b/src/nng.c
index 6363636d..fc57c037 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -18,7 +18,7 @@
// Pretty much every function calls the nni_platform_init to check against
// fork related activity.
int
-nng_socket_create(nng_socket **s, uint16_t proto)
+nng_open(nng_socket **s, uint16_t proto)
{
int rv;
@@ -30,7 +30,7 @@ nng_socket_create(nng_socket **s, uint16_t proto)
int
-nng_socket_close(nng_socket *s)
+nng_close(nng_socket *s)
{
int rv;
@@ -42,7 +42,7 @@ nng_socket_close(nng_socket *s)
uint16_t
-nng_socket_protocol(nng_socket *s)
+nng_protocol(nng_socket *s)
{
nni_init();
return (nni_socket_proto(s));
@@ -53,21 +53,36 @@ int
nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags)
{
int rv;
- nni_duration expire;
+ nni_time expire;
if ((rv = nni_init()) != 0) {
return (rv);
}
- if (flags == NNG_FLAG_NONBLOCK) {
- expire = 0;
+
+ if ((flags == NNG_FLAG_NONBLOCK) || (s->s_rcvtimeo == 0)) {
+ expire = NNI_TIME_ZERO;
+ } else if (s->s_rcvtimeo < 0) {
+ expire = NNI_TIME_NEVER;
} else {
- // XXX: revise this timeout from socket option!!
- expire = 1000000;
+ expire = nni_clock() + s->s_rcvtimeo;
}
+
return (nni_socket_recvmsg(s, msgp, expire));
}
+int
+nng_setopt(nng_socket *s, int opt, const void *val, size_t sz)
+{
+ int rv;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ return (nni_socket_setopt(s, opt, val, sz));
+}
+
+
// Misc.
const char *
nng_strerror(int num)
@@ -75,7 +90,7 @@ nng_strerror(int num)
nni_init();
switch (num) {
case 0:
- return ("Hunky dory"); /* what did you expect? */
+ return ("Hunky dory"); // What did you expect?
case NNG_EINTR:
return ("Interrupted");
diff --git a/src/nng.h b/src/nng.h
index aa2ecd62..653fa3cf 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -43,36 +43,45 @@ typedef struct nng_notify nng_notify;
typedef struct nng_snapshot nng_snapshot;
typedef struct nng_stat nng_stat;
-// nng_socket simply creates a socket of the given class. It returns an
+// nng_open simply creates a socket of the given class. It returns an
// error code on failure, or zero on success. The socket starts in cooked
// mode.
-NNG_DECL int nng_socket_create(nng_socket **, uint16_t proto);
+NNG_DECL int nng_open(nng_socket **, uint16_t proto);
-// nng_socket_close closes the socket, terminating all activity and
+// nng_close closes the socket, terminating all activity and
// closing any underlying connections and releasing any associated
// resources. Memory associated with the socket is freed, so it is an
// error to reference the socket in any way after this is called. Likewise,
// it is an error to reference any resources such as endpoints or
// pipes associated with the socket.
-NNG_DECL int nng_socket_close(nng_socket *);
+NNG_DECL int nng_close(nng_socket *);
-// nng_socket_protocol returns the protocol number of the socket.
-uint16_t nng_socket_protocol(nng_socket *);
+// nng_protocol returns the protocol number of the socket.
+uint16_t nng_protocol(nng_socket *);
-// nng_socket_setopt sets an option for a specific socket.
-NNG_DECL int nng_socket_setopt(nng_socket *, int, const void *, size_t);
+// nng_setopt sets an option for a specific socket.
+NNG_DECL int nng_setopt(nng_socket *, int, const void *, size_t);
// nng_socket_getopt obtains the option for a socket.
-NNG_DECL int nng_socket_getopt(nng_socket *, int, void *, size_t *);
-
-// nng_notify_register sets a notification callback. The callback will be
-// called for any of the requested events. The callback can be deregistered
-// by calling nng_notify_unregister with the same handle. These notification
-// callbacks are executed on a separate thread, to avoid potential lock
-// recursion.
-NNG_DECL nng_notify *nng_notify_register(nng_socket *, int,
- void (*)(nng_event *, void *), void *);
-NNG_DECL int nng_notify_unregister(nng_socket *, nng_notify *);
+NNG_DECL int nng_getopt(nng_socket *, int, void *, size_t *);
+
+// nng_notify_func is a user function that is executed upon certain
+// events. See below.
+typedef void (*nng_notify_func)(nng_event *, void *);
+
+// nng_setnotify sets a notification callback. The callback will be
+// called for any of the requested events, and will be executed on a
+// separate thread. Event delivery is not guaranteed, and can fail
+// if events occur more quickly than the callback can handle, or
+// if memory or other resources are scarce.
+NNG_DECL nng_notify *nng_setnotify(nng_socket *, int, nng_notify_func, void *);
+
+// nng_unsetnotify unregisters a previously registered notification callback.
+// Once this returns, the associated callback will not be executed any longer.
+// If the callback is running when this called, then it will wait until that
+// callback completes. (The caller of this function should not hold any
+// locks acqured by the callback, in order to avoid a deadlock.)
+NNG_DECL int nng_unsetnotify(nng_socket *, nng_notify *);
// Event types. Sockets can have multiple different kind of events.
// Note that these are edge triggered -- therefore the status indicated
@@ -103,20 +112,20 @@ NNG_DECL nng_endpt *nng_event_endpt(nng_event *);
NNG_DECL nng_pipe *nng_event_pipe(nng_event *);
NNG_DECL const char *nng_event_reason(nng_event *);
-// nng_socket_listen creates a listening endpoint with no special options,
+// nng_listen creates a listening endpoint with no special options,
// and starts it listening. It is functionally equivalent to the legacy
// nn_bind(). The underlying endpoint is returned back to the caller.
-NNG_DECL int nng_socket_listen(nng_endpt **, nng_socket *, const char *);
+NNG_DECL int nng_listen(nng_endpt **, nng_socket *, const char *);
-// nng_socket_dial creates a dialing endpoint, with no special options,
-// and starts it dialing. Dialers have at most one active connection at a
-// time. This is similar to the legacy nn_connect(). The underlying endpoint
+// nng_dial creates a dialing endpoint, with no special options, and
+// starts it dialing. Dialers have at most one active connection at a time
+// This is similar to the legacy nn_connect(). The underlying endpoint
// is returned back to the caller.
-NNG_DECL int nng_socket_dial(nng_endpt **, nng_socket *, const char *);
+NNG_DECL int nng_dial(nng_endpt **, nng_socket *, const char *);
-// nng_socket_endpt creates an endpoint on the socket, but does not
+// nng_endpt_create creates an endpoint on the socket, but does not
// start it either dialing or listening.
-NNG_DECL int nng_socket_endpt(nng_endpt **, nng_socket *, const char *);
+NNG_DECL int nng_endpt_create(nng_endpt **, nng_socket *, const char *);
// nng_endpt_dial starts the endpoint dialing. This is only possible if
// the endpoint is not already dialing or listening.
@@ -350,11 +359,6 @@ NNG_DECL int nng_device(nng_socket *, nng_socket *);
// This limit is built into other implementations, so do not change it.
#define NNG_MAXADDRLEN (128)
-// Default linger time in microseconds. The framework will wait up until
-// this long for outgoing message queues to drain before closing underlying
-// connections, when closing the socket itself.
-#define NNG_LINGER_DEFAULT (1000000)
-
#ifdef __cplusplus
}
#endif