diff options
| author | Garrett D'Amore <garrett@damore.org> | 2016-12-11 19:31:43 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2016-12-11 19:31:43 -0800 |
| commit | 660decae9ba5e2756950267587724c2d25953d00 (patch) | |
| tree | f5d346b972e5e1736c16faf56ece1eb95bcf0c26 /src/core | |
| parent | 877588b7448b2da2c6079c87528404e1f712e3e9 (diff) | |
| download | nng-660decae9ba5e2756950267587724c2d25953d00.tar.gz nng-660decae9ba5e2756950267587724c2d25953d00.tar.bz2 nng-660decae9ba5e2756950267587724c2d25953d00.zip | |
Work in progress on sendmsg.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/socket.c | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/src/core/socket.c b/src/core/socket.c index 247c5640..e9963b74 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -32,9 +32,83 @@ struct nng_socket { int s_proto; nni_mutex_t s_mx; + nni_msgqueue_t s_uwq; /* Upper write queue. */ + nni_msgqueue_t s_urq; /* Upper read queue. */ + /* uwq */ /* urq */ /* options */ /* pipes */ /* endpoints */ + + int s_besteffort; /* Best effort mode delivery. */ + int s_senderr; /* Protocol state machine use. */ }; + +int +nng_socket_create(nng_socket_t *sockp, int proto) +{ + return (NNG_EAGAIN); /* XXX: IMPLEMENT ME */ +} + +int +nng_socket_close(nng_socket_t sock) +{ + nni_msgqueue_close(sock->s_urq); + /* XXX: drain this? */ + nni_msgqueue_close(sock->s_uwq); + + /* XXX: close endpoints - no new pipes made... */ + + /* XXX: protocol shutdown */ + + /* XXX: close remaining pipes */ + + /* XXX: wait for workers to cease activity */ + + return (0); +} + +int +nng_socket_sendmsg(nng_socket_t sock, nng_msg_t msg, int tmout) +{ + int rv; + int besteffort; + + /* + * Senderr is typically set by protocols when the state machine + * indicates that it is no longer valid to send a message. E.g. + * a REP socket with no REQ pending. + */ + nni_mutex_enter(sock->s_mx); + if ((rv = sock->s_senderr) != 0) { + nni_mutex_exit(sock->s_mx); + return (rv); + } + besteffort = sock->s_besteffort; + nni_mutex_exit(sock->s_mx); + +#if 0 + if (s.ops.p_sendhook != NULL) { + if ((rv = s.ops.p_sendhook(sock->s_proto, msg)) != 0) { + nng_msg_free(msg); + return (0); + } + } +#endif + + if (besteffort) { + /* + * BestEffort mode -- if we cannot handle the message due + * to backpressure, we just throw it away, and don't complain. + */ + tmout = 0; + } + rv = nni_msgqueue_put(sock->s_uwq, msg, tmout); + if (besteffort && (rv == NNG_EAGAIN)) { + /* Pretend this worked... it didn't, but pretend. */ + nng_msg_free(msg); + return (0); + } + return (rv); +} |
