aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c115
1 files changed, 85 insertions, 30 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index 0272f454..c500db31 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -32,7 +32,7 @@ int
nni_socket_create(nni_socket **sockp, uint16_t proto)
{
nni_socket *sock;
- struct nni_protocol *ops;
+ nni_protocol *ops;
int rv;
if ((ops = nni_protocol_find(proto)) == NULL) {
@@ -43,12 +43,12 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
}
sock->s_ops = *ops;
- if ((rv = nni_mutex_create(&sock->s_mx)) != 0) {
+ if ((rv = nni_mutex_init(&sock->s_mx)) != 0) {
nni_free(sock, sizeof (*sock));
return (rv);
}
- if ((rv = nni_cond_create(&sock->s_cv, sock->s_mx)) != 0) {
- nni_mutex_destroy(sock->s_mx);
+ if ((rv = nni_cond_init(&sock->s_cv, &sock->s_mx)) != 0) {
+ nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -57,8 +57,8 @@ nni_socket_create(nni_socket **sockp, uint16_t proto)
// TODO: NNI_LIST_INIT(&sock->s_eps, nni_endpt_t, ep_node);
if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
- nni_cond_destroy(sock->s_cv);
- nni_mutex_destroy(sock->s_mx);
+ nni_cond_fini(&sock->s_cv);
+ nni_mutex_fini(&sock->s_mx);
nni_free(sock, sizeof (*sock));
return (rv);
}
@@ -73,9 +73,10 @@ nni_socket_close(nni_socket *sock)
{
nni_pipe *pipe;
nni_endpt *ep;
+ uint64_t linger;
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
// Mark us closing, so no more EPs or changes can occur.
sock->s_closing = 1;
@@ -90,9 +91,8 @@ nni_socket_close(nni_socket *sock)
or nni_ep_shutdown(ep);
#endif
- break; /* REMOVE ME */
}
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
// XXX: TODO. This is a place where we should drain the write side
// msgqueue, effectively getting a linger on the socket. The
@@ -110,11 +110,10 @@ nni_socket_close(nni_socket *sock)
// Now we should attempt to wait for the list of pipes to drop to
// zero -- indicating that the protocol has shut things down
// cleanly, voluntarily. (I.e. it finished its drain.)
- nni_mutex_enter(sock->s_mx);
+ linger = nni_clock();
+ nni_mutex_enter(&sock->s_mx);
while (nni_list_first(&sock->s_pipes) != NULL) {
- // rv = nn_cond_timedwait(sock->s_cv, sock->s_linger);
- int rv = NNG_ETIMEDOUT;
- if (rv == NNG_ETIMEDOUT) {
+ if (nni_cond_waituntil(&sock->s_cv, linger) == NNG_ETIMEDOUT) {
break;
}
}
@@ -130,35 +129,50 @@ nni_socket_close(nni_socket *sock)
// quickly too! If this blocks for any non-trivial amount of time
// here, it indicates a protocol implementation bug.
while (nni_list_first(&sock->s_pipes) != NULL) {
- nni_cond_wait(sock->s_cv);
+ nni_cond_wait(&sock->s_cv);
}
// Wait to make sure endpoint listeners have shutdown and exited
// as well. They should have done so *long* ago.
while (nni_list_first(&sock->s_eps) != NULL) {
- nni_cond_wait(sock->s_cv);
+ nni_cond_wait(&sock->s_cv);
}
+ nni_mutex_exit(&sock->s_mx);
+
return (0);
}
int
-nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout)
+nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, nni_duration tmout)
{
int rv;
int besteffort;
+ nni_time expire;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ } else if (tmout < 0) {
+ expire = NNI_TIME_NEVER;
+ } else {
+ expire = NNI_TIME_ZERO;
+ }
// Senderr is typically set by protocols when the state machine
// indicates that it is no longer valid to send a message. E.g.
// a REP socket with no REQ pending.
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
if ((rv = sock->s_senderr) != 0) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
return (rv);
}
besteffort = sock->s_besteffort;
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
if (sock->s_ops.proto_send_filter != NULL) {
msg = sock->s_ops.proto_send_filter(sock->s_data, msg);
@@ -170,9 +184,9 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout)
if (besteffort) {
// BestEffort mode -- if we cannot handle the message due to
// backpressure, we just throw it away, and don't complain.
- tmout = 0;
+ expire = NNI_TIME_ZERO;
}
- rv = nni_msgqueue_put(sock->s_uwq, msg, tmout);
+ rv = nni_msgqueue_put_until(sock->s_uwq, msg, expire);
if (besteffort && (rv == NNG_EAGAIN)) {
// Pretend this worked... it didn't, but pretend.
nni_msg_free(msg);
@@ -181,6 +195,47 @@ nni_socket_sendmsg(nni_socket *sock, nni_msg *msg, int tmout)
return (rv);
}
+int
+nni_socket_recvmsg(nni_socket *sock, nni_msg **msgp, nni_duration tmout)
+{
+ int rv;
+ nni_time expire;
+ nni_msg *msg;
+
+ if (tmout > 0) {
+ expire = nni_clock() + tmout;
+ } else if (tmout < 0) {
+ expire = NNI_TIME_NEVER;
+ } else {
+ expire = NNI_TIME_ZERO;
+ }
+
+ nni_mutex_enter(&sock->s_mx);
+ if (sock->s_closing) {
+ nni_mutex_exit(&sock->s_mx);
+ return (NNG_ECLOSED);
+ }
+ if ((rv = sock->s_recverr) != 0) {
+ nni_mutex_exit(&sock->s_mx);
+ return (rv);
+ }
+ nni_mutex_exit(&sock->s_mx);
+
+ for (;;) {
+ rv = nni_msgqueue_get_until(sock->s_urq, &msg, expire);
+ if (rv != 0) {
+ return (rv);
+ }
+ msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
+ if (msg != NULL) {
+ break;
+ }
+ // Protocol dropped the message; try again.
+ }
+
+ *msgp = msg;
+ return (0);
+}
// nni_socket_protocol returns the socket's 16-bit protocol number.
uint16_t
@@ -195,9 +250,9 @@ nni_socket_proto(nni_socket *sock)
void
nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
{
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
if (pipe->p_sock != sock) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
}
// Remove the pipe from the protocol. Protocols may
@@ -215,9 +270,9 @@ nni_socket_rem_pipe(nni_socket *sock, nni_pipe *pipe)
// If we're closing, wake the socket if we finished draining.
if (sock->s_closing && (nni_list_first(&sock->s_pipes) == NULL)) {
- nni_cond_broadcast(sock->s_cv);
+ nni_cond_broadcast(&sock->s_cv);
}
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
}
@@ -226,18 +281,18 @@ nni_socket_add_pipe(nni_socket *sock, nni_pipe *pipe)
{
int rv;
- nni_mutex_enter(sock->s_mx);
+ nni_mutex_enter(&sock->s_mx);
if (sock->s_closing) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
return (NNG_ECLOSED);
}
if ((rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe)) != 0) {
- nni_mutex_exit(sock->s_mx);
+ nni_mutex_exit(&sock->s_mx);
return (rv);
}
nni_list_append(&sock->s_pipes, pipe);
pipe->p_sock = sock;
- /* XXX: Publish event */
- nni_mutex_exit(sock->s_mx);
+ // XXX: Publish event
+ nni_mutex_exit(&sock->s_mx);
return (0);
}