aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-08-14 23:25:34 -0700
committerGarrett D'Amore <garrett@damore.org>2017-08-14 23:25:34 -0700
commite7e2a6c14f0317eb77711951c6f1a650d4013dfe (patch)
tree7ed67e4dfbbebce4cc9d88417179c2d58fffb400
parentb47a223bfb2c7114154504ec8d6cdac5abd0b884 (diff)
downloadnng-e7e2a6c14f0317eb77711951c6f1a650d4013dfe.tar.gz
nng-e7e2a6c14f0317eb77711951c6f1a650d4013dfe.tar.bz2
nng-e7e2a6c14f0317eb77711951c6f1a650d4013dfe.zip
Move socket structure to private socket implementation.
We enable a few flags, but now the details of the socket internals are completely private to the socket.
-rw-r--r--src/core/device.c30
-rw-r--r--src/core/options.c9
-rw-r--r--src/core/socket.c81
-rw-r--r--src/core/socket.h55
-rw-r--r--src/nng.c27
5 files changed, 114 insertions, 88 deletions
diff --git a/src/core/device.c b/src/core/device.c
index 81aafc3d..bdfcf0a6 100644
--- a/src/core/device.c
+++ b/src/core/device.c
@@ -28,10 +28,10 @@ nni_device_loop(nni_sock *from, nni_sock *to)
for (;;) {
// Take messages sock[0], and send to sock[1].
// If an error occurs, we close both sockets.
- if ((rv = nni_sock_recvmsg(from, &msg, NNI_TIME_NEVER)) != 0) {
+ if ((rv = nni_sock_recvmsg(from, &msg, 0)) != 0) {
break;
}
- if ((rv = nni_sock_sendmsg(to, msg, NNI_TIME_NEVER)) != 0) {
+ if ((rv = nni_sock_sendmsg(to, msg, 0)) != 0) {
nni_msg_free(msg);
break;
}
@@ -65,6 +65,8 @@ nni_device(nni_sock *sock1, nni_sock *sock2)
{
nni_device_pair pair;
int rv;
+ nni_time never = NNI_TIME_NEVER;
+ size_t sz;
memset(&pair, 0, sizeof(pair));
pair.socks[0] = sock1;
@@ -80,8 +82,19 @@ nni_device(nni_sock *sock1, nni_sock *sock2)
rv = NNG_EINVAL;
goto out;
}
- if ((sock1->s_peer_id.p_id != sock2->s_self_id.p_id) ||
- (sock2->s_peer_id.p_id != sock1->s_self_id.p_id)) {
+ if ((nni_sock_peer(sock1) != nni_sock_proto(sock2)) ||
+ (nni_sock_peer(sock2) != nni_sock_proto(sock1))) {
+ rv = NNG_EINVAL;
+ goto out;
+ }
+
+ // No timeouts.
+ sz = sizeof(never);
+ if ((nni_sock_setopt(sock1, NNG_OPT_RCVTIMEO, &never, sz) != 0) ||
+ (nni_sock_setopt(sock2, NNG_OPT_RCVTIMEO, &never, sz) != 0) ||
+ (nni_sock_setopt(sock1, NNG_OPT_SNDTIMEO, &never, sz) != 0) ||
+ (nni_sock_setopt(sock2, NNG_OPT_SNDTIMEO, &never, sz) != 0)) {
+ // This should never happen.
rv = NNG_EINVAL;
goto out;
}
@@ -96,14 +109,15 @@ nni_device(nni_sock *sock1, nni_sock *sock2)
nni_thr_fini(&pair.thrs[0]);
goto out;
}
- if (((sock1->s_flags & NNI_PROTO_FLAG_RCV) != 0) &&
- ((sock2->s_flags & NNI_PROTO_FLAG_SND) != 0)) {
+ if (((nni_sock_flags(sock1) & NNI_PROTO_FLAG_RCV) != 0) &&
+ ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_SND) != 0)) {
nni_thr_run(&pair.thrs[0]);
}
// If the sockets are the same, then its a simple one way forwarder,
// and we don't need two workers (but would be harmless if we did it).
- if ((sock1 != sock2) && ((sock2->s_flags & NNI_PROTO_FLAG_RCV) != 0) &&
- ((sock1->s_flags & NNI_PROTO_FLAG_SND) != 0)) {
+ if ((sock1 != sock2) &&
+ ((nni_sock_flags(sock2) & NNI_PROTO_FLAG_RCV) != 0) &&
+ ((nni_sock_flags(sock1) & NNI_PROTO_FLAG_SND) != 0)) {
nni_thr_run(&pair.thrs[1]);
}
diff --git a/src/core/options.c b/src/core/options.c
index 403630a3..b243b262 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -155,20 +155,23 @@ nni_notifyfd_push(struct nng_event *ev, void *arg)
int
nni_getopt_fd(nni_sock *s, nni_notifyfd *fd, int mask, void *val, size_t *szp)
{
- int rv;
+ int rv;
+ uint32_t flags;
if ((*szp < sizeof(int))) {
return (NNG_EINVAL);
}
+ flags = nni_sock_flags(s);
+
switch (mask) {
case NNG_EV_CAN_SND:
- if ((s->s_flags & NNI_PROTO_FLAG_SND) == 0) {
+ if ((flags & NNI_PROTO_FLAG_SND) == 0) {
return (NNG_ENOTSUP);
}
break;
case NNG_EV_CAN_RCV:
- if ((s->s_flags & NNI_PROTO_FLAG_RCV) == 0) {
+ if ((flags & NNI_PROTO_FLAG_RCV) == 0) {
return (NNG_ENOTSUP);
}
break;
diff --git a/src/core/socket.c b/src/core/socket.c
index 541c2383..6a243650 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -18,6 +18,51 @@ static nni_list nni_sock_list;
static nni_idhash *nni_sock_hash;
static nni_mtx nni_sock_lk;
+struct nni_socket {
+ nni_list_node s_node;
+ nni_mtx s_mx;
+ nni_cv s_cv;
+ nni_cv s_close_cv;
+
+ uint32_t s_id;
+ uint32_t s_flags;
+ unsigned s_refcnt; // protected by global lock
+ void * s_data; // Protocol private
+
+ nni_msgq *s_uwq; // Upper write queue
+ nni_msgq *s_urq; // Upper read queue
+
+ nni_proto_id s_self_id;
+ nni_proto_id s_peer_id;
+
+ nni_proto_pipe_ops s_pipe_ops;
+ nni_proto_sock_ops s_sock_ops;
+
+ // XXX: options
+ nni_duration s_linger; // linger time
+ nni_duration s_sndtimeo; // send timeout
+ nni_duration s_rcvtimeo; // receive timeout
+ nni_duration s_reconn; // reconnect time
+ nni_duration s_reconnmax; // max reconnect time
+ size_t s_rcvmaxsz; // maximum receive size
+
+ nni_list s_eps; // active endpoints
+ nni_list s_pipes; // active pipes
+
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_closed; // Socket closed, protected by global lock
+ int s_besteffort; // Best effort mode delivery
+ int s_senderr; // Protocol state machine use
+ int s_recverr; // Protocol state machine use
+
+ nni_event s_recv_ev; // Event for readability
+ nni_event s_send_ev; // Event for sendability
+
+ nni_notifyfd s_send_fd;
+ nni_notifyfd s_recv_fd;
+};
+
uint32_t
nni_sock_id(nni_sock *s)
{
@@ -533,10 +578,21 @@ nni_sock_closeall(void)
}
int
-nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
+nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
{
- int rv;
- int besteffort;
+ int rv;
+ int besteffort;
+ nni_time expire;
+ nni_time timeo = sock->s_sndtimeo;
+
+ if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
+ expire = NNI_TIME_ZERO;
+ } else if (timeo == NNI_TIME_NEVER) {
+ expire = NNI_TIME_NEVER;
+ } else {
+ expire = nni_clock();
+ expire += timeo;
+ }
// Senderr is typically set by protocols when the state machine
// indicates that it is no longer valid to send a message. E.g.
@@ -579,10 +635,21 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
}
int
-nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
+nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
{
int rv;
nni_msg *msg;
+ nni_time expire;
+ nni_time timeo = sock->s_rcvtimeo;
+
+ if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
+ expire = NNI_TIME_ZERO;
+ } else if (timeo == NNI_TIME_NEVER) {
+ expire = NNI_TIME_NEVER;
+ } else {
+ expire = nni_clock();
+ expire += timeo;
+ }
nni_mtx_lock(&sock->s_mx);
if (sock->s_closing) {
@@ -791,3 +858,9 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
nni_mtx_unlock(&sock->s_mx);
return (rv);
}
+
+uint32_t
+nni_sock_flags(nni_sock *sock)
+{
+ return (sock->s_flags);
+}
diff --git a/src/core/socket.h b/src/core/socket.h
index 9b7ac1f9..9fa6c0fa 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -11,54 +11,6 @@
#ifndef CORE_SOCKET_H
#define CORE_SOCKET_H
-// NB: This structure is supplied here for use by the CORE. Use of this library
-// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
-// TRANSPORTS.
-struct nni_socket {
- nni_list_node s_node;
- nni_mtx s_mx;
- nni_cv s_cv;
- nni_cv s_close_cv;
-
- uint32_t s_id;
- uint32_t s_flags;
- unsigned s_refcnt; // protected by global lock
- void * s_data; // Protocol private
-
- nni_msgq *s_uwq; // Upper write queue
- nni_msgq *s_urq; // Upper read queue
-
- nni_proto_id s_self_id;
- nni_proto_id s_peer_id;
-
- nni_proto_pipe_ops s_pipe_ops;
- nni_proto_sock_ops s_sock_ops;
-
- // XXX: options
- nni_duration s_linger; // linger time
- nni_duration s_sndtimeo; // send timeout
- nni_duration s_rcvtimeo; // receive timeout
- nni_duration s_reconn; // reconnect time
- nni_duration s_reconnmax; // max reconnect time
- size_t s_rcvmaxsz; // maximum receive size
-
- nni_list s_eps; // active endpoints
- nni_list s_pipes; // active pipes
-
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
- int s_besteffort; // Best effort mode delivery
- int s_senderr; // Protocol state machine use
- int s_recverr; // Protocol state machine use
-
- nni_event s_recv_ev; // Event for readability
- nni_event s_send_ev; // Event for sendability
-
- nni_notifyfd s_send_fd;
- nni_notifyfd s_recv_fd;
-};
-
extern int nni_sock_sys_init(void);
extern void nni_sock_sys_fini(void);
@@ -72,8 +24,8 @@ extern uint16_t nni_sock_proto(nni_sock *);
extern uint16_t nni_sock_peer(nni_sock *);
extern int nni_sock_setopt(nni_sock *, int, const void *, size_t);
extern int nni_sock_getopt(nni_sock *, int, void *, size_t *);
-extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time);
-extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time);
+extern int nni_sock_recvmsg(nni_sock *, nni_msg **, int);
+extern int nni_sock_sendmsg(nni_sock *, nni_msg *, int);
extern uint32_t nni_sock_id(nni_sock *);
extern void nni_sock_lock(nni_sock *);
@@ -115,4 +67,7 @@ extern nni_msgq *nni_sock_recvq(nni_sock *);
extern size_t nni_sock_rcvmaxsz(nni_sock *);
extern void nni_sock_reconntimes(nni_sock *, nni_duration *, nni_duration *);
+// nni_sock_flags returns the socket flags, used to indicate whether read
+// and or write are appropriate for the protocol.
+extern uint32_t nni_sock_flags(nni_sock *);
#endif // CORE_SOCKET_H
diff --git a/src/nng.c b/src/nng.c
index 015da430..822d2713 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -132,28 +132,18 @@ nng_recv(nng_socket sid, void *buf, size_t *szp, int flags)
int
nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags)
{
- nni_time expire;
int rv;
nni_sock *sock;
if ((rv = nni_sock_find(&sock, sid)) != 0) {
return (rv);
}
- if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_rcvtimeo == 0)) {
- expire = NNI_TIME_ZERO;
- } else if (sock->s_rcvtimeo < 0) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = nni_clock();
- expire += sock->s_rcvtimeo;
- }
-
- rv = nni_sock_recvmsg(sock, msgp, expire);
+ rv = nni_sock_recvmsg(sock, msgp, flags);
nni_sock_rele(sock);
// Possibly massage nonblocking attempt. Note that nonblocking is
// still done asynchronously, and the calling thread loses context.
- if ((rv == NNG_ETIMEDOUT) && (expire == NNI_TIME_ZERO)) {
+ if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
rv = NNG_EAGAIN;
}
@@ -201,21 +191,12 @@ nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
if ((rv = nni_sock_find(&sock, sid)) != 0) {
return (rv);
}
- if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_sndtimeo == 0)) {
- expire = NNI_TIME_ZERO;
- } else if (sock->s_sndtimeo < 0) {
- expire = NNI_TIME_NEVER;
- } else {
- expire = nni_clock();
- expire += sock->s_sndtimeo;
- }
-
- rv = nni_sock_sendmsg(sock, msg, expire);
+ rv = nni_sock_sendmsg(sock, msg, flags);
nni_sock_rele(sock);
// Possibly massage nonblocking attempt. Note that nonblocking is
// still done asynchronously, and the calling thread loses context.
- if ((rv == NNG_ETIMEDOUT) && (expire == NNI_TIME_ZERO)) {
+ if ((rv == NNG_ETIMEDOUT) && (flags == NNG_FLAG_NONBLOCK)) {
rv = NNG_EAGAIN;
}