aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2016-12-11 19:31:43 -0800
committerGarrett D'Amore <garrett@damore.org>2016-12-11 19:31:43 -0800
commit660decae9ba5e2756950267587724c2d25953d00 (patch)
treef5d346b972e5e1736c16faf56ece1eb95bcf0c26 /src/core
parent877588b7448b2da2c6079c87528404e1f712e3e9 (diff)
downloadnng-660decae9ba5e2756950267587724c2d25953d00.tar.gz
nng-660decae9ba5e2756950267587724c2d25953d00.tar.bz2
nng-660decae9ba5e2756950267587724c2d25953d00.zip
Work in progress on sendmsg.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/socket.c74
1 files changed, 74 insertions, 0 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 247c5640..e9963b74 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -32,9 +32,83 @@ struct nng_socket {
int s_proto;
nni_mutex_t s_mx;
+ nni_msgqueue_t s_uwq; /* Upper write queue. */
+ nni_msgqueue_t s_urq; /* Upper read queue. */
+
/* uwq */
/* urq */
/* options */
/* pipes */
/* endpoints */
+
+ int s_besteffort; /* Best effort mode delivery. */
+ int s_senderr; /* Protocol state machine use. */
};
+
+int
+nng_socket_create(nng_socket_t *sockp, int proto)
+{
+ return (NNG_EAGAIN); /* XXX: IMPLEMENT ME */
+}
+
+int
+nng_socket_close(nng_socket_t sock)
+{
+ nni_msgqueue_close(sock->s_urq);
+ /* XXX: drain this? */
+ nni_msgqueue_close(sock->s_uwq);
+
+ /* XXX: close endpoints - no new pipes made... */
+
+ /* XXX: protocol shutdown */
+
+ /* XXX: close remaining pipes */
+
+ /* XXX: wait for workers to cease activity */
+
+ return (0);
+}
+
+int
+nng_socket_sendmsg(nng_socket_t sock, nng_msg_t msg, int tmout)
+{
+ int rv;
+ int besteffort;
+
+ /*
+ * Senderr is typically set by protocols when the state machine
+ * indicates that it is no longer valid to send a message. E.g.
+ * a REP socket with no REQ pending.
+ */
+ nni_mutex_enter(sock->s_mx);
+ if ((rv = sock->s_senderr) != 0) {
+ nni_mutex_exit(sock->s_mx);
+ return (rv);
+ }
+ besteffort = sock->s_besteffort;
+ nni_mutex_exit(sock->s_mx);
+
+#if 0
+ if (s.ops.p_sendhook != NULL) {
+ if ((rv = s.ops.p_sendhook(sock->s_proto, msg)) != 0) {
+ nng_msg_free(msg);
+ return (0);
+ }
+ }
+#endif
+
+ if (besteffort) {
+ /*
+ * BestEffort mode -- if we cannot handle the message due
+ * to backpressure, we just throw it away, and don't complain.
+ */
+ tmout = 0;
+ }
+ rv = nni_msgqueue_put(sock->s_uwq, msg, tmout);
+ if (besteffort && (rv == NNG_EAGAIN)) {
+ /* Pretend this worked... it didn't, but pretend. */
+ nng_msg_free(msg);
+ return (0);
+ }
+ return (rv);
+}