aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c142
1 files changed, 56 insertions, 86 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 8895f7a7..bc1f446d 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -66,12 +66,9 @@ struct nni_socket {
nni_list s_eps; // active endpoints
nni_list s_pipes; // active pipes
- int s_ep_pend; // EP dial/listen in progress
- int s_closing; // Socket is closing
- int s_closed; // Socket closed, protected by global lock
- int s_besteffort; // Best effort mode delivery
- int s_senderr; // Protocol state machine use
- int s_recverr; // Protocol state machine use
+ int s_ep_pend; // EP dial/listen in progress
+ int s_closing; // Socket is closing
+ int s_closed; // Socket closed, protected by global lock
nni_event s_recv_ev; // Event for readability
nni_event s_send_ev; // Event for sendability
@@ -374,15 +371,19 @@ nni_sock_pipe_remove(nni_sock *sock, nni_pipe *pipe)
}
void
-nni_sock_lock(nni_sock *sock)
+nni_sock_send_pending(nni_sock *sock)
{
- nni_mtx_lock(&sock->s_mx);
+ if (sock->s_send_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
+ }
}
void
-nni_sock_unlock(nni_sock *sock)
+nni_sock_recv_pending(nni_sock *sock)
{
- nni_mtx_unlock(&sock->s_mx);
+ if (sock->s_recv_fd.sn_init) {
+ nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
+ }
}
static void
@@ -551,6 +552,11 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
return (rv);
}
+ if (s->s_sock_ops.sock_filter != NULL) {
+ nni_msgq_set_filter(
+ s->s_urq, s->s_sock_ops.sock_filter, s->s_data);
+ }
+
*sp = s;
return (rv);
}
@@ -779,13 +785,23 @@ nni_sock_closeall(void)
}
}
+void
+nni_sock_send(nni_sock *sock, nni_aio *aio)
+{
+ sock->s_sock_ops.sock_send(sock->s_data, aio);
+}
+
int
nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
{
int rv;
- int besteffort;
nni_time expire;
nni_time timeo = sock->s_sndtimeo;
+ nni_aio *aio;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
expire = NNI_TIME_ZERO;
@@ -795,47 +811,24 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, int flags)
expire = nni_clock();
expire += timeo;
}
+ nni_aio_set_timeout(aio, expire);
+ nni_aio_set_msg(aio, msg);
- // Senderr is typically set by protocols when the state machine
- // indicates that it is no longer valid to send a message. E.g.
- // a REP socket with no REQ pending.
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if ((rv = sock->s_senderr) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
- besteffort = sock->s_besteffort;
-
- if (sock->s_sock_ops.sock_sfilter != NULL) {
- msg = sock->s_sock_ops.sock_sfilter(sock->s_data, msg);
- }
- nni_mtx_unlock(&sock->s_mx);
+ nni_sock_send(sock, aio);
+ nni_aio_wait(aio);
- if (msg == NULL) {
- return (0);
- }
+ rv = nni_aio_result(aio);
+ nni_aio_fini(aio);
- if (besteffort) {
- // BestEffort mode -- if we cannot handle the message due to
- // backpressure, we just throw it away, and don't complain.
- expire = NNI_TIME_ZERO;
- }
- if (sock->s_send_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_send_fd.sn_rfd);
- }
- rv = nni_msgq_put_until(sock->s_uwq, msg, expire);
- if (besteffort && (rv == NNG_ETIMEDOUT)) {
- // Pretend this worked... it didn't, but pretend.
- nni_msg_free(msg);
- return (0);
- }
return (rv);
}
+void
+nni_sock_recv(nni_sock *sock, nni_aio *aio)
+{
+ sock->s_sock_ops.sock_recv(sock->s_data, aio);
+}
+
int
nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
{
@@ -843,6 +836,11 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
nni_msg *msg;
nni_time expire;
nni_time timeo = sock->s_rcvtimeo;
+ nni_aio *aio;
+
+ if ((rv = nni_aio_init(&aio, NULL, NULL)) != 0) {
+ return (rv);
+ }
if ((flags == NNG_FLAG_NONBLOCK) || (timeo == 0)) {
expire = NNI_TIME_ZERO;
@@ -853,39 +851,23 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, int flags)
expire += timeo;
}
- nni_mtx_lock(&sock->s_mx);
- if (sock->s_closing) {
- nni_mtx_unlock(&sock->s_mx);
- return (NNG_ECLOSED);
- }
- if ((rv = sock->s_recverr) != 0) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
- nni_mtx_unlock(&sock->s_mx);
-
- if (sock->s_recv_fd.sn_init) {
- nni_plat_pipe_clear(sock->s_recv_fd.sn_rfd);
- }
-
for (;;) {
- rv = nni_msgq_get_until(sock->s_urq, &msg, expire);
+ nni_aio_set_timeout(aio, expire);
+ nni_sock_recv(sock, aio);
+ nni_aio_wait(aio);
+
+ rv = nni_aio_result(aio);
if (rv != 0) {
- return (rv);
- }
- if (sock->s_sock_ops.sock_rfilter != NULL) {
- nni_mtx_lock(&sock->s_mx);
- msg = sock->s_sock_ops.sock_rfilter(sock->s_data, msg);
- nni_mtx_unlock(&sock->s_mx);
- }
- if (msg != NULL) {
break;
}
- // Protocol dropped the message; try again.
- }
+ msg = nni_aio_get_msg(aio);
+ nni_aio_set_msg(aio, NULL);
- *msgp = msg;
- return (0);
+ *msgp = msg;
+ break;
+ }
+ nni_aio_fini(aio);
+ return (rv);
}
// nni_sock_protocol returns the socket's 16-bit protocol number.
@@ -949,18 +931,6 @@ nni_sock_ep_remove(nni_sock *sock, nni_ep *ep)
nni_mtx_unlock(&sock->s_mx);
}
-void
-nni_sock_recverr(nni_sock *sock, int err)
-{
- sock->s_recverr = err;
-}
-
-void
-nni_sock_senderr(nni_sock *sock, int err)
-{
- sock->s_senderr = err;
-}
-
int
nni_sock_setopt(nni_sock *s, const char *name, const void *val, size_t size)
{