aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/endpt.c2
-rw-r--r--src/core/options.c35
-rw-r--r--src/core/options.h11
-rw-r--r--src/core/socket.c25
-rw-r--r--src/core/socket.h5
-rw-r--r--src/core/transport.h2
6 files changed, 75 insertions, 5 deletions
diff --git a/src/core/endpt.c b/src/core/endpt.c
index c0890cd3..9411c220 100644
--- a/src/core/endpt.c
+++ b/src/core/endpt.c
@@ -131,7 +131,7 @@ nni_ep_create(nni_ep **epp, nni_sock *sock, const char *addr)
return (NNG_ECLOSED);
}
- rv = ep->ep_ops.ep_init(&ep->ep_data, addr, nni_sock_proto(sock));
+ rv = ep->ep_ops.ep_init(&ep->ep_data, addr, sock);
if (rv != 0) {
nni_mtx_unlock(&sock->s_mx);
nni_cv_fini(&ep->ep_cv);
diff --git a/src/core/options.c b/src/core/options.c
index a0b74014..cd67e0d0 100644
--- a/src/core/options.c
+++ b/src/core/options.c
@@ -49,6 +49,27 @@ nni_setopt_int(int *ptr, const void *val, size_t size, int minval, int maxval)
int
+nni_setopt_size(size_t *ptr, const void *val, size_t size, size_t minval,
+ size_t maxval)
+{
+ int v;
+
+ if (size != sizeof (v)) {
+ return (NNG_EINVAL);
+ }
+ memcpy(&v, val, sizeof (v));
+ if (v > maxval) {
+ return (NNG_EINVAL);
+ }
+ if (v < minval) {
+ return (NNG_EINVAL);
+ }
+ *ptr = v;
+ return (0);
+}
+
+
+int
nni_getopt_duration(nni_duration *ptr, void *val, size_t *sizep)
{
size_t sz = sizeof (*ptr);
@@ -77,6 +98,20 @@ nni_getopt_int(int *ptr, void *val, size_t *sizep)
int
+nni_getopt_size(size_t *ptr, void *val, size_t *sizep)
+{
+ size_t sz = sizeof (*ptr);
+
+ if (sz > *sizep) {
+ sz = *sizep;
+ }
+ *sizep = sizeof (*ptr);
+ memcpy(val, ptr, sz);
+ return (0);
+}
+
+
+int
nni_setopt_buf(nni_msgq *mq, const void *val, size_t sz)
{
int len;
diff --git a/src/core/options.h b/src/core/options.h
index 2d843a4c..ec5cce90 100644
--- a/src/core/options.h
+++ b/src/core/options.h
@@ -37,6 +37,17 @@ extern int nni_setopt_int(int *, const void *, size_t, int, int);
// nni_getopt_int gets an integer.
extern int nni_getopt_int(int *, void *, size_t *);
+// nni_setopt_size sets a size_t option.
+extern int nni_setopt_size(size_t *, const void *, size_t, size_t, size_t);
+
+// We limit the maximum size to 4GB. That's intentional because some of the
+// underlying protocols cannot cope with anything bigger than 32-bits.
+#define NNI_MINSZ (0)
+#define NNI_MAXSZ ((size_t) 0xffffffff)
+
+// nni_getopt_size obtains a size_t option.
+extern int nni_getopt_size(size_t *, void *, size_t *);
+
// nni_getopt_fd obtains a notification file descriptor.
extern int nni_getopt_fd(nni_sock *, nni_notifyfd *, int, void *, size_t *);
diff --git a/src/core/socket.c b/src/core/socket.c
index b0ce172f..6ccf3025 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -102,9 +102,6 @@ nni_sock_hold_close(nni_sock **sockp, uint32_t id)
}
-// XXX: don't expose the upper queues to protocols, because we need to
-// trap on activity in those queues!
-
// Because we have to call back into the socket, and possibly also the proto,
// and wait for threads to terminate, we do this in a special thread. The
// assumption is that closing is always a "fast" operation.
@@ -276,6 +273,7 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
sock->s_reconn = NNI_SECOND;
sock->s_reconnmax = 0;
sock->s_reapexit = 0;
+ sock->s_rcvmaxsz = 1024 * 1024; // 1 MB by default
NNI_LIST_INIT(&sock->s_pipes, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_reaps, nni_pipe, p_node);
NNI_LIST_INIT(&sock->s_eps, nni_ep, ep_node);
@@ -684,6 +682,20 @@ nni_sock_peer(nni_sock *sock)
}
+nni_duration
+nni_sock_linger(nni_sock *sock)
+{
+ return (sock->s_linger);
+}
+
+
+size_t
+nni_sock_rcvmaxsz(nni_sock *sock)
+{
+ return (sock->s_rcvmaxsz);
+}
+
+
int
nni_sock_dial(nni_sock *sock, const char *addr, nni_ep **epp, int flags)
{
@@ -775,6 +787,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
case NNG_OPT_RCVBUF:
rv = nni_setopt_buf(sock->s_urq, val, size);
break;
+ case NNG_OPT_RCVMAXSZ:
+ rv = nni_setopt_size(&sock->s_rcvmaxsz, val, size, 0,
+ NNI_MAXSZ);
+ break;
}
nni_mtx_unlock(&sock->s_mx);
return (rv);
@@ -818,6 +834,9 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
case NNG_OPT_RCVBUF:
rv = nni_getopt_buf(sock->s_urq, val, sizep);
break;
+ case NNG_OPT_RCVMAXSZ:
+ rv = nni_getopt_size(&sock->s_rcvmaxsz, val, sizep);
+ break;
case NNG_OPT_SNDFD:
rv = nni_getopt_fd(sock, &sock->s_send_fd, NNG_EV_CAN_SND,
val, sizep);
diff --git a/src/core/socket.h b/src/core/socket.h
index 42f42371..df11ded6 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -48,6 +48,8 @@ struct nni_socket {
nni_cv s_notify_cv; // wakes notify thread
nni_mtx s_notify_mx; // protects s_notify list
+ size_t s_rcvmaxsz; // maximum receive size
+
nni_list s_reaps; // pipes to reap
nni_thr s_reaper;
nni_thr s_notifier;
@@ -112,4 +114,7 @@ extern nni_msgq *nni_sock_recvq(nni_sock *);
// here so that protocols can use it to initialize condvars.
extern nni_mtx *nni_sock_mtx(nni_sock *);
+extern nni_duration nni_sock_linger(nni_sock *);
+extern size_t nni_sock_rcvmaxsz(nni_sock *);
+
#endif // CORE_SOCKET_H
diff --git a/src/core/transport.h b/src/core/transport.h
index 8544f097..c74ec497 100644
--- a/src/core/transport.h
+++ b/src/core/transport.h
@@ -40,7 +40,7 @@ struct nni_tran {
struct nni_tran_ep {
// ep_init creates a vanilla endpoint. The value created is
// used for the first argument for all other endpoint functions.
- int (*ep_init)(void **, const char *, uint16_t);
+ int (*ep_init)(void **, const char *, nni_sock *);
// ep_fini frees the resources associated with the endpoint.
// The endpoint will already have been closed.