aboutsummaryrefslogtreecommitdiff
path: root/src/core
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2017-01-02 14:29:47 -0800
committerGarrett D'Amore <garrett@damore.org>2017-01-02 14:29:47 -0800
commitb6374f9d9b07c929522066f27ed9a7a05c6bb23b (patch)
tree9195694f13261ba5cd4d8f2446743f815a06619f /src/core
parentf729db021a4fd7c782cc08a07185c955f3567ea2 (diff)
downloadnng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.gz
nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.bz2
nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.zip
Protocol initialization restructuring.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/defs.h3
-rw-r--r--src/core/pipe.c37
-rw-r--r--src/core/pipe.h4
-rw-r--r--src/core/protocol.c34
-rw-r--r--src/core/protocol.h77
-rw-r--r--src/core/socket.c95
-rw-r--r--src/core/socket.h2
7 files changed, 163 insertions, 89 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index b10701e0..dd225567 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -28,7 +28,8 @@ typedef struct nni_tran nni_tran;
typedef struct nni_tran_ep nni_tran_ep;
typedef struct nni_tran_pipe nni_tran_pipe;
-typedef struct nni_protocol nni_protocol;
+typedef struct nni_proto_pipe nni_proto_pipe;
+typedef struct nni_proto nni_proto;
typedef int nni_signal; // Turnstile/wakeup channel.
typedef uint64_t nni_time; // Absolute time (usec).
diff --git a/src/core/pipe.c b/src/core/pipe.c
index e8864d58..768b7240 100644
--- a/src/core/pipe.c
+++ b/src/core/pipe.c
@@ -76,8 +76,8 @@ nni_pipe_destroy(nni_pipe *p)
if (p->p_tran_data != NULL) {
p->p_tran_ops.pipe_destroy(p->p_tran_data);
}
- if (p->p_pdata != NULL) {
- nni_free(p->p_pdata, p->p_psize);
+ if (p->p_proto_data != NULL) {
+ p->p_proto_ops.pipe_fini(p->p_proto_data);
}
NNI_FREE_STRUCT(p);
}
@@ -88,7 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
{
nni_pipe *p;
nni_sock *sock = ep->ep_sock;
- nni_protocol *proto = &sock->s_ops;
+ nni_proto *proto = &sock->s_proto;
+ const nni_proto_pipe *pops = proto->proto_pipe;
int rv;
if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
@@ -96,32 +97,34 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep)
}
p->p_sock = sock;
p->p_tran_data = NULL;
+ p->p_proto_data = NULL;
p->p_active = 0;
- p->p_psize = proto->proto_pipe_size;
NNI_LIST_NODE_INIT(&p->p_node);
// Make a copy of the transport ops. We can override entry points
// and we avoid an extra dereference on hot code paths.
p->p_tran_ops = *ep->ep_tran->tran_pipe;
- if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) {
+ // Make a copy of the protocol ops. Same rationale.
+ p->p_proto_ops = *pops;
+
+ if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) {
NNI_FREE_STRUCT(p);
- return (NNG_ENOMEM);
+ return (rv);
}
- rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata);
+ rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data);
if (rv != 0) {
- nni_free(p->p_pdata, p->p_psize);
+ pops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
- rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata);
+ rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data);
if (rv != 0) {
nni_thr_fini(&p->p_recv_thr);
- nni_free(p->p_pdata, p->p_psize);
+ pops->pipe_fini(&p->p_proto_data);
NNI_FREE_STRUCT(p);
return (rv);
}
- p->p_psize = sock->s_ops.proto_pipe_size;
nni_mtx_lock(&sock->s_mx);
nni_list_append(&sock->s_pipes, p);
nni_mtx_unlock(&sock->s_mx);
@@ -171,9 +174,11 @@ nni_pipe_start(nni_pipe *pipe)
}
} while (collide);
- rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata);
+ rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data);
if (rv != 0) {
- goto fail;
+ nni_mtx_unlock(&sock->s_mx);
+ nni_pipe_close(pipe);
+ return (rv);
}
nni_thr_run(&pipe->p_send_thr);
nni_thr_run(&pipe->p_recv_thr);
@@ -183,10 +188,4 @@ nni_pipe_start(nni_pipe *pipe)
nni_mtx_unlock(&sock->s_mx);
return (0);
-
-fail:
- pipe->p_reap = 1;
- nni_mtx_unlock(&sock->s_mx);
- nni_pipe_close(pipe);
- return (rv);
}
diff --git a/src/core/pipe.h b/src/core/pipe.h
index f037bb3b..d2819a31 100644
--- a/src/core/pipe.h
+++ b/src/core/pipe.h
@@ -21,8 +21,8 @@ struct nng_pipe {
uint32_t p_id;
nni_tran_pipe p_tran_ops;
void * p_tran_data;
- void * p_pdata; // protocol specific data
- size_t p_psize; // size of protocol data
+ nni_proto_pipe p_proto_ops;
+ void * p_proto_data;
nni_list_node p_node;
nni_sock * p_sock;
nni_ep * p_ep;
diff --git a/src/core/protocol.c b/src/core/protocol.c
index 6399cd41..8669275a 100644
--- a/src/core/protocol.c
+++ b/src/core/protocol.c
@@ -16,18 +16,18 @@
// The list of protocols is hardwired. This is reasonably unlikely to
// change, as adding new protocols is not something intended to be done
// outside of the core.
-extern nni_protocol nni_pair_protocol;
+extern nni_proto nni_pair_proto;
-static nni_protocol *protocols[] = {
- &nni_pair_protocol,
+static nni_proto *protocols[] = {
+ &nni_pair_proto,
NULL
};
-nni_protocol *
-nni_protocol_find(uint16_t num)
+nni_proto *
+nni_proto_find(uint16_t num)
{
int i;
- nni_protocol *p;
+ nni_proto *p;
for (i = 0; (p = protocols[i]) != NULL; i++) {
if (p->proto_self == num) {
@@ -39,11 +39,11 @@ nni_protocol_find(uint16_t num)
const char *
-nni_protocol_name(uint16_t num)
+nni_proto_name(uint16_t num)
{
- nni_protocol *p;
+ nni_proto *p;
- if ((p = nni_protocol_find(num)) == NULL) {
+ if ((p = nni_proto_find(num)) == NULL) {
return (NULL);
}
return (p->proto_name);
@@ -51,9 +51,9 @@ nni_protocol_name(uint16_t num)
uint16_t
-nni_protocol_number(const char *name)
+nni_proto_number(const char *name)
{
- nni_protocol *p;
+ nni_proto *p;
int i;
for (i = 0; (p = protocols[i]) != NULL; i++) {
@@ -63,3 +63,15 @@ nni_protocol_number(const char *name)
}
return (NNG_PROTO_NONE);
}
+
+uint16_t
+nni_proto_peer(uint16_t num)
+{
+ nni_proto *p;
+
+ if ((p = nni_proto_find(num)) == NULL) {
+ return (NNG_PROTO_NONE);
+ }
+ return (p->proto_peer);
+}
+
diff --git a/src/core/protocol.h b/src/core/protocol.h
index 301782b4..c858ad39 100644
--- a/src/core/protocol.h
+++ b/src/core/protocol.h
@@ -19,48 +19,75 @@
// work, and the pipe functions generally assume no locking is needed.
// As a consequence, most of the concurrency in nng exists in the protocol
// implementations.
-struct nni_protocol {
- uint16_t proto_self; // our 16-bit protocol ID
- uint16_t proto_peer; // who we peer with (ID)
- const char * proto_name; // string version of our name
- size_t proto_pipe_size; // pipe private data size
- //Create protocol instance, which will be stored on the socket.
- int (*proto_create)(void **, nni_sock *);
+// nni_proto_pipe contains protocol-specific per-pipe operations.
+struct nni_proto_pipe {
+ // pipe_init creates the protocol-specific per pipe data structure.
+ // The last argument is the per-socket protocol private data.
+ int (*pipe_init)(void **, nni_pipe *, void *);
- // Destroy the protocol instance.
- void (*proto_destroy)(void *);
+ // pipe_fini releases any pipe data structures. This is called after
+ // the pipe has been removed from the protocol, and the generic
+ // pipe threads have been stopped.
+ void (*pipe_fini)(void *);
+
+ // pipe_add is called to register a pipe with the protocol. The
+ // protocol can reject this, for example if another pipe is already
+ // active on a 1:1 protocol. The protocol may not block during this,
+ // as the socket lock is held.
+ int (*pipe_add)(void *);
+
+ // pipe_rem is called to unregister a pipe from the protocol.
+ // Threads may still acccess data structures, so the protocol
+ // should not free anything yet. This is called with the socket
+ // lock held, so the protocol may not call back into the socket, and
+ // must not block.
+ void (*pipe_rem)(void *);
+
+ // pipe_send is a function run in a thread per pipe, to process
+ // send activity. This can be NULL.
+ void (*pipe_send)(void *);
- // Add and remove pipes. These are called as connections are
- // created or destroyed.
- int (*proto_add_pipe)(void *, nni_pipe *, void *);
- void (*proto_rem_pipe)(void *, void *);
+ // pipe_recv is a function run in a thread per pipe, to process
+ // receive activity. While this can be NULL, it should NOT be, as
+ // otherwise the protocol may not be able to discover the closure of
+ // the underlying transport (such as a remote disconnect).
+ void (*pipe_recv)(void *);
+};
+
+struct nni_proto {
+ uint16_t proto_self; // our 16-bit protocol ID
+ uint16_t proto_peer; // who we peer with (ID)
+ const char * proto_name; // string version of our name
+ const nni_proto_pipe * proto_pipe; // Per-pipe operations.
- // Thread functions for processing send & receive sides of
- // protocol pipes. Send may be NULL, but recv should should not.
- // (Recv needs to detect a closed pipe, if nothing else.)
- void (*proto_pipe_send)(void *);
- void (*proto_pipe_recv)(void *);
+ // Create protocol instance, which will be stored on the socket.
+ int (*proto_init)(void **, nni_sock *);
+
+ // Destroy the protocol instance.
+ void (*proto_fini)(void *);
// Option manipulation. These may be NULL.
- int (*proto_setopt)(void *, int, const void *, size_t);
- int (*proto_getopt)(void *, int, void *, size_t *);
+ int (*proto_setopt)(void *, int, const void *,
+ size_t);
+ int (*proto_getopt)(void *, int, void *, size_t *);
// Receive filter. This may be NULL, but if it isn't, then
// messages coming into the system are routed here just before being
// delivered to the application. To drop the message, the prtocol
// should return NULL, otherwise the message (possibly modified).
- nni_msg * (*proto_recv_filter)(void *, nni_msg *);
+ nni_msg * (*proto_recv_filter)(void *, nni_msg *);
// Send filter. This may be NULL, but if it isn't, then messages
// here are filtered just after they come from the application.
- nni_msg * (*proto_send_filter)(void *, nni_msg *);
+ nni_msg * (*proto_send_filter)(void *, nni_msg *);
};
// These functions are not used by protocols, but rather by the socket
// core implementation. The lookups can be used by transports as well.
-extern nni_protocol *nni_protocol_find(uint16_t);
-extern const char *nni_protocol_name(uint16_t);
-extern uint16_t nni_protocol_number(const char *);
+extern nni_proto *nni_proto_find(uint16_t);
+extern const char *nni_proto_name(uint16_t);
+extern uint16_t nni_proto_number(const char *);
+extern uint16_t nni_proto_peer(uint16_t);
#endif // CORE_PROTOCOL_H
diff --git a/src/core/socket.c b/src/core/socket.c
index 6aa40bfc..cad4b10a 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -50,16 +50,15 @@ nni_reaper(void *arg)
ep->ep_pipe = NULL;
nni_cv_wake(&ep->ep_cv);
}
- nni_mtx_unlock(&sock->s_mx);
// Remove the pipe from the protocol. Protocols may
// keep lists of pipes for managing their topologies.
// Note that if a protocol has rejected the pipe, it
// won't have any data.
if (pipe->p_active) {
- sock->s_ops.proto_rem_pipe(sock->s_data,
- pipe->p_pdata);
+ pipe->p_proto_ops.pipe_rem(pipe->p_proto_data);
}
+ nni_mtx_unlock(&sock->s_mx);
// XXX: also publish event...
nni_pipe_destroy(pipe);
@@ -79,21 +78,53 @@ nni_reaper(void *arg)
}
+static nni_msg *
+nni_sock_nullfilter(void *arg, nni_msg *mp)
+{
+ NNI_ARG_UNUSED(arg);
+ return (mp);
+}
+
+
+static int
+nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp)
+{
+ NNI_ARG_UNUSED(arg);
+ NNI_ARG_UNUSED(num);
+ NNI_ARG_UNUSED(data);
+ NNI_ARG_UNUSED(szp);
+ return (NNG_ENOTSUP);
+}
+
+
+static int
+nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz)
+{
+ NNI_ARG_UNUSED(arg);
+ NNI_ARG_UNUSED(num);
+ NNI_ARG_UNUSED(data);
+ NNI_ARG_UNUSED(sz);
+ return (NNG_ENOTSUP);
+}
+
+
// nn_sock_open creates the underlying socket.
int
-nni_sock_open(nni_sock **sockp, uint16_t proto)
+nni_sock_open(nni_sock **sockp, uint16_t pnum)
{
nni_sock *sock;
- nni_protocol *ops;
+ nni_proto *proto;
int rv;
- if ((ops = nni_protocol_find(proto)) == NULL) {
+ if ((proto = nni_proto_find(pnum)) == NULL) {
return (NNG_ENOTSUP);
}
if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) {
return (NNG_ENOMEM);
}
- sock->s_ops = *ops;
+
+ // We make a copy of the protocol operations.
+ sock->s_proto = *proto;
sock->s_linger = 0;
sock->s_sndtimeo = -1;
sock->s_rcvtimeo = -1;
@@ -137,7 +168,7 @@ nni_sock_open(nni_sock **sockp, uint16_t proto)
return (rv);
}
- if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) {
+ if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) {
nni_msgq_fini(sock->s_urq);
nni_msgq_fini(sock->s_uwq);
nni_thr_fini(&sock->s_reaper);
@@ -146,6 +177,18 @@ nni_sock_open(nni_sock **sockp, uint16_t proto)
NNI_FREE_STRUCT(sock);
return (rv);
}
+ if (sock->s_proto.proto_send_filter == NULL) {
+ sock->s_proto.proto_send_filter = nni_sock_nullfilter;
+ }
+ if (sock->s_proto.proto_recv_filter == NULL) {
+ sock->s_proto.proto_recv_filter = nni_sock_nullfilter;
+ }
+ if (sock->s_proto.proto_getopt == NULL) {
+ sock->s_proto.proto_getopt = nni_sock_nullgetopt;
+ }
+ if (sock->s_proto.proto_setopt == NULL) {
+ sock->s_proto.proto_setopt = nni_sock_nullsetopt;
+ }
nni_thr_run(&sock->s_reaper);
*sockp = sock;
return (0);
@@ -223,7 +266,7 @@ nni_sock_close(nni_sock *sock)
// At this point nothing else should be referencing us.
// The protocol needs to clean up its state.
- sock->s_ops.proto_destroy(sock->s_data);
+ sock->s_proto.proto_fini(sock->s_data);
// And we need to clean up *our* state.
nni_msgq_fini(sock->s_urq);
@@ -256,11 +299,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire)
besteffort = sock->s_besteffort;
nni_mtx_unlock(&sock->s_mx);
- if (sock->s_ops.proto_send_filter != NULL) {
- msg = sock->s_ops.proto_send_filter(sock->s_data, msg);
- if (msg == NULL) {
- return (0);
- }
+ msg = sock->s_proto.proto_send_filter(sock->s_data, msg);
+ if (msg == NULL) {
+ return (0);
}
if (besteffort) {
@@ -300,9 +341,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
if (rv != 0) {
return (rv);
}
- if (sock->s_ops.proto_recv_filter != NULL) {
- msg = sock->s_ops.proto_recv_filter(sock->s_data, msg);
- }
+ msg = sock->s_proto.proto_recv_filter(sock->s_data, msg);
if (msg != NULL) {
break;
}
@@ -318,7 +357,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire)
uint16_t
nni_sock_proto(nni_sock *sock)
{
- return (sock->s_ops.proto_self);
+ return (sock->s_proto.proto_self);
}
@@ -386,12 +425,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
- if (sock->s_ops.proto_setopt != NULL) {
- rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size);
- if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
+ rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size);
+ if (rv != NNG_ENOTSUP) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
}
switch (opt) {
case NNG_OPT_LINGER:
@@ -429,12 +466,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep)
int rv = ENOTSUP;
nni_mtx_lock(&sock->s_mx);
- if (sock->s_ops.proto_getopt != NULL) {
- rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep);
- if (rv != NNG_ENOTSUP) {
- nni_mtx_unlock(&sock->s_mx);
- return (rv);
- }
+ rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep);
+ if (rv != NNG_ENOTSUP) {
+ nni_mtx_unlock(&sock->s_mx);
+ return (rv);
}
switch (opt) {
case NNG_OPT_LINGER:
diff --git a/src/core/socket.h b/src/core/socket.h
index 548b4697..8b3c3ff9 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -20,7 +20,7 @@ struct nng_socket {
nni_msgq * s_uwq; // Upper write queue
nni_msgq * s_urq; // Upper read queue
- nni_protocol s_ops;
+ nni_proto s_proto;
void * s_data; // Protocol private