aboutsummaryrefslogtreecommitdiff
path: root/src/core/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/socket.c')
-rw-r--r--src/core/socket.c157
1 files changed, 84 insertions, 73 deletions
diff --git a/src/core/socket.c b/src/core/socket.c
index bc1f446d..dfa49317 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -70,23 +70,102 @@ struct nni_socket {
int s_closing; // Socket is closing
int s_closed; // Socket closed, protected by global lock
- nni_event s_recv_ev; // Event for readability
- nni_event s_send_ev; // Event for sendability
-
nni_notifyfd s_send_fd;
nni_notifyfd s_recv_fd;
};
+static void
+nni_sock_can_send_cb(void *arg, int flags)
+{
+ nni_notifyfd *fd = arg;
+
+ if ((flags & nni_msgq_f_can_put) == 0) {
+ nni_plat_pipe_clear(fd->sn_rfd);
+ } else {
+ nni_plat_pipe_raise(fd->sn_wfd);
+ }
+}
+
+static void
+nni_sock_can_recv_cb(void *arg, int flags)
+{
+ nni_notifyfd *fd = arg;
+
+ if ((flags & nni_msgq_f_can_get) == 0) {
+ nni_plat_pipe_clear(fd->sn_rfd);
+ } else {
+ nni_plat_pipe_raise(fd->sn_wfd);
+ }
+}
+
+static int
+nni_sock_getopt_fd(nni_sock *s, int flag, void *val, size_t *szp)
+{
+ int rv;
+ uint32_t flags;
+ nni_notifyfd *fd;
+ nni_msgq * mq;
+ nni_msgq_cb cb;
+
+ if ((*szp < sizeof(int))) {
+ return (NNG_EINVAL);
+ }
+
+ if ((flag & nni_sock_flags(s)) == 0) {
+ return (NNG_ENOTSUP);
+ }
+
+ switch (flag) {
+ case NNI_PROTO_FLAG_SND:
+ fd = &s->s_send_fd;
+ mq = s->s_uwq;
+ cb = nni_sock_can_send_cb;
+ break;
+ case NNI_PROTO_FLAG_RCV:
+ fd = &s->s_recv_fd;
+ mq = s->s_urq;
+ cb = nni_sock_can_recv_cb;
+ break;
+ default:
+ nni_panic("default case!");
+ }
+
+ // If we already inited this, just give back the same file descriptor.
+ if (fd->sn_init) {
+ memcpy(val, &fd->sn_rfd, sizeof(int));
+ *szp = sizeof(int);
+ return (0);
+ }
+
+ if ((rv = nni_plat_pipe_open(&fd->sn_wfd, &fd->sn_rfd)) != 0) {
+ return (rv);
+ }
+
+ nni_msgq_set_cb(mq, cb, fd);
+
+#if 0
+ if (nni_sock_notify(s, mask, nni_notifyfd_push, fd) == NULL) {
+ nni_plat_pipe_close(fd->sn_wfd, fd->sn_rfd);
+ return (NNG_ENOMEM);
+ }
+#endif
+
+ fd->sn_init = 1;
+ *szp = sizeof(int);
+ memcpy(val, &fd->sn_rfd, sizeof(int));
+ return (0);
+}
+
static int
nni_sock_getopt_sendfd(nni_sock *s, void *buf, size_t *szp)
{
- return (nni_getopt_fd(s, &s->s_send_fd, NNG_EV_CAN_SND, buf, szp));
+ return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_SND, buf, szp));
}
static int
nni_sock_getopt_recvfd(nni_sock *s, void *buf, size_t *szp)
{
- return (nni_getopt_fd(s, &s->s_recv_fd, NNG_EV_CAN_RCV, buf, szp));
+ return (nni_sock_getopt_fd(s, NNI_PROTO_FLAG_RCV, buf, szp));
}
static int
@@ -387,70 +466,6 @@ nni_sock_recv_pending(nni_sock *sock)
}
static void
-nni_sock_cansend_cb(void *arg)
-{
- nni_notify *notify = arg;
- nni_sock * sock = notify->n_sock;
-
- if (nni_aio_result(notify->n_aio) != 0) {
- return;
- }
-
- notify->n_func(&sock->s_send_ev, notify->n_arg);
-}
-
-static void
-nni_sock_canrecv_cb(void *arg)
-{
- nni_notify *notify = arg;
- nni_sock * sock = notify->n_sock;
-
- if (nni_aio_result(notify->n_aio) != 0) {
- return;
- }
-
- notify->n_func(&sock->s_recv_ev, notify->n_arg);
-}
-
-nni_notify *
-nni_sock_notify(nni_sock *sock, int type, nng_notify_func fn, void *arg)
-{
- nni_notify *notify;
-
- if ((notify = NNI_ALLOC_STRUCT(notify)) == NULL) {
- return (NULL);
- }
-
- notify->n_func = fn;
- notify->n_arg = arg;
- notify->n_type = type;
- notify->n_sock = sock;
-
- switch (type) {
- case NNG_EV_CAN_RCV:
- nni_aio_init(&notify->n_aio, nni_sock_canrecv_cb, notify);
- nni_msgq_aio_notify_get(sock->s_urq, notify->n_aio);
- break;
- case NNG_EV_CAN_SND:
- nni_aio_init(&notify->n_aio, nni_sock_cansend_cb, notify);
- nni_msgq_aio_notify_put(sock->s_uwq, notify->n_aio);
- break;
- default:
- NNI_FREE_STRUCT(notify);
- return (NULL);
- }
-
- return (notify);
-}
-
-void
-nni_sock_unnotify(nni_sock *sock, nni_notify *notify)
-{
- nni_aio_fini(notify->n_aio);
- NNI_FREE_STRUCT(notify);
-}
-
-static void
nni_sock_destroy(nni_sock *s)
{
nni_sockopt *sopt;
@@ -481,8 +496,6 @@ nni_sock_destroy(nni_sock *s)
nni_mtx_lock(&s->s_mx);
nni_mtx_unlock(&s->s_mx);
- nni_ev_fini(&s->s_send_ev);
- nni_ev_fini(&s->s_recv_ev);
nni_msgq_fini(s->s_urq);
nni_msgq_fini(s->s_uwq);
nni_cv_fini(&s->s_close_cv);
@@ -530,8 +543,6 @@ nni_sock_create(nni_sock **sp, const nni_proto *proto)
nni_mtx_init(&s->s_mx);
nni_cv_init(&s->s_cv, &s->s_mx);
nni_cv_init(&s->s_close_cv, &nni_sock_lk);
- nni_ev_init(&s->s_recv_ev, NNG_EV_CAN_RCV, s);
- nni_ev_init(&s->s_send_ev, NNG_EV_CAN_SND, s);
if (((rv = nni_msgq_init(&s->s_uwq, 0)) != 0) ||
((rv = nni_msgq_init(&s->s_urq, 0)) != 0) ||