diff options
| author | Garrett D'Amore <garrett@damore.org> | 2017-01-02 14:29:47 -0800 |
|---|---|---|
| committer | Garrett D'Amore <garrett@damore.org> | 2017-01-02 14:29:47 -0800 |
| commit | b6374f9d9b07c929522066f27ed9a7a05c6bb23b (patch) | |
| tree | 9195694f13261ba5cd4d8f2446743f815a06619f /src/core | |
| parent | f729db021a4fd7c782cc08a07185c955f3567ea2 (diff) | |
| download | nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.gz nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.tar.bz2 nng-b6374f9d9b07c929522066f27ed9a7a05c6bb23b.zip | |
Protocol initialization restructuring.
Diffstat (limited to 'src/core')
| -rw-r--r-- | src/core/defs.h | 3 | ||||
| -rw-r--r-- | src/core/pipe.c | 37 | ||||
| -rw-r--r-- | src/core/pipe.h | 4 | ||||
| -rw-r--r-- | src/core/protocol.c | 34 | ||||
| -rw-r--r-- | src/core/protocol.h | 77 | ||||
| -rw-r--r-- | src/core/socket.c | 95 | ||||
| -rw-r--r-- | src/core/socket.h | 2 |
7 files changed, 163 insertions, 89 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index b10701e0..dd225567 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -28,7 +28,8 @@ typedef struct nni_tran nni_tran; typedef struct nni_tran_ep nni_tran_ep; typedef struct nni_tran_pipe nni_tran_pipe; -typedef struct nni_protocol nni_protocol; +typedef struct nni_proto_pipe nni_proto_pipe; +typedef struct nni_proto nni_proto; typedef int nni_signal; // Turnstile/wakeup channel. typedef uint64_t nni_time; // Absolute time (usec). diff --git a/src/core/pipe.c b/src/core/pipe.c index e8864d58..768b7240 100644 --- a/src/core/pipe.c +++ b/src/core/pipe.c @@ -76,8 +76,8 @@ nni_pipe_destroy(nni_pipe *p) if (p->p_tran_data != NULL) { p->p_tran_ops.pipe_destroy(p->p_tran_data); } - if (p->p_pdata != NULL) { - nni_free(p->p_pdata, p->p_psize); + if (p->p_proto_data != NULL) { + p->p_proto_ops.pipe_fini(p->p_proto_data); } NNI_FREE_STRUCT(p); } @@ -88,7 +88,8 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) { nni_pipe *p; nni_sock *sock = ep->ep_sock; - nni_protocol *proto = &sock->s_ops; + nni_proto *proto = &sock->s_proto; + const nni_proto_pipe *pops = proto->proto_pipe; int rv; if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { @@ -96,32 +97,34 @@ nni_pipe_create(nni_pipe **pp, nni_ep *ep) } p->p_sock = sock; p->p_tran_data = NULL; + p->p_proto_data = NULL; p->p_active = 0; - p->p_psize = proto->proto_pipe_size; NNI_LIST_NODE_INIT(&p->p_node); // Make a copy of the transport ops. We can override entry points // and we avoid an extra dereference on hot code paths. p->p_tran_ops = *ep->ep_tran->tran_pipe; - if ((p->p_pdata = nni_alloc(p->p_psize)) == NULL) { + // Make a copy of the protocol ops. Same rationale. + p->p_proto_ops = *pops; + + if ((rv = pops->pipe_init(&p->p_proto_data, p, sock->s_data)) != 0) { NNI_FREE_STRUCT(p); - return (NNG_ENOMEM); + return (rv); } - rv = nni_thr_init(&p->p_recv_thr, proto->proto_pipe_recv, p->p_pdata); + rv = nni_thr_init(&p->p_recv_thr, pops->pipe_recv, p->p_proto_data); if (rv != 0) { - nni_free(p->p_pdata, p->p_psize); + pops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - rv = nni_thr_init(&p->p_send_thr, proto->proto_pipe_send, p->p_pdata); + rv = nni_thr_init(&p->p_send_thr, pops->pipe_send, p->p_proto_data); if (rv != 0) { nni_thr_fini(&p->p_recv_thr); - nni_free(p->p_pdata, p->p_psize); + pops->pipe_fini(&p->p_proto_data); NNI_FREE_STRUCT(p); return (rv); } - p->p_psize = sock->s_ops.proto_pipe_size; nni_mtx_lock(&sock->s_mx); nni_list_append(&sock->s_pipes, p); nni_mtx_unlock(&sock->s_mx); @@ -171,9 +174,11 @@ nni_pipe_start(nni_pipe *pipe) } } while (collide); - rv = sock->s_ops.proto_add_pipe(sock->s_data, pipe, pipe->p_pdata); + rv = pipe->p_proto_ops.pipe_add(pipe->p_proto_data); if (rv != 0) { - goto fail; + nni_mtx_unlock(&sock->s_mx); + nni_pipe_close(pipe); + return (rv); } nni_thr_run(&pipe->p_send_thr); nni_thr_run(&pipe->p_recv_thr); @@ -183,10 +188,4 @@ nni_pipe_start(nni_pipe *pipe) nni_mtx_unlock(&sock->s_mx); return (0); - -fail: - pipe->p_reap = 1; - nni_mtx_unlock(&sock->s_mx); - nni_pipe_close(pipe); - return (rv); } diff --git a/src/core/pipe.h b/src/core/pipe.h index f037bb3b..d2819a31 100644 --- a/src/core/pipe.h +++ b/src/core/pipe.h @@ -21,8 +21,8 @@ struct nng_pipe { uint32_t p_id; nni_tran_pipe p_tran_ops; void * p_tran_data; - void * p_pdata; // protocol specific data - size_t p_psize; // size of protocol data + nni_proto_pipe p_proto_ops; + void * p_proto_data; nni_list_node p_node; nni_sock * p_sock; nni_ep * p_ep; diff --git a/src/core/protocol.c b/src/core/protocol.c index 6399cd41..8669275a 100644 --- a/src/core/protocol.c +++ b/src/core/protocol.c @@ -16,18 +16,18 @@ // The list of protocols is hardwired. This is reasonably unlikely to // change, as adding new protocols is not something intended to be done // outside of the core. -extern nni_protocol nni_pair_protocol; +extern nni_proto nni_pair_proto; -static nni_protocol *protocols[] = { - &nni_pair_protocol, +static nni_proto *protocols[] = { + &nni_pair_proto, NULL }; -nni_protocol * -nni_protocol_find(uint16_t num) +nni_proto * +nni_proto_find(uint16_t num) { int i; - nni_protocol *p; + nni_proto *p; for (i = 0; (p = protocols[i]) != NULL; i++) { if (p->proto_self == num) { @@ -39,11 +39,11 @@ nni_protocol_find(uint16_t num) const char * -nni_protocol_name(uint16_t num) +nni_proto_name(uint16_t num) { - nni_protocol *p; + nni_proto *p; - if ((p = nni_protocol_find(num)) == NULL) { + if ((p = nni_proto_find(num)) == NULL) { return (NULL); } return (p->proto_name); @@ -51,9 +51,9 @@ nni_protocol_name(uint16_t num) uint16_t -nni_protocol_number(const char *name) +nni_proto_number(const char *name) { - nni_protocol *p; + nni_proto *p; int i; for (i = 0; (p = protocols[i]) != NULL; i++) { @@ -63,3 +63,15 @@ nni_protocol_number(const char *name) } return (NNG_PROTO_NONE); } + +uint16_t +nni_proto_peer(uint16_t num) +{ + nni_proto *p; + + if ((p = nni_proto_find(num)) == NULL) { + return (NNG_PROTO_NONE); + } + return (p->proto_peer); +} + diff --git a/src/core/protocol.h b/src/core/protocol.h index 301782b4..c858ad39 100644 --- a/src/core/protocol.h +++ b/src/core/protocol.h @@ -19,48 +19,75 @@ // work, and the pipe functions generally assume no locking is needed. // As a consequence, most of the concurrency in nng exists in the protocol // implementations. -struct nni_protocol { - uint16_t proto_self; // our 16-bit protocol ID - uint16_t proto_peer; // who we peer with (ID) - const char * proto_name; // string version of our name - size_t proto_pipe_size; // pipe private data size - //Create protocol instance, which will be stored on the socket. - int (*proto_create)(void **, nni_sock *); +// nni_proto_pipe contains protocol-specific per-pipe operations. +struct nni_proto_pipe { + // pipe_init creates the protocol-specific per pipe data structure. + // The last argument is the per-socket protocol private data. + int (*pipe_init)(void **, nni_pipe *, void *); - // Destroy the protocol instance. - void (*proto_destroy)(void *); + // pipe_fini releases any pipe data structures. This is called after + // the pipe has been removed from the protocol, and the generic + // pipe threads have been stopped. + void (*pipe_fini)(void *); + + // pipe_add is called to register a pipe with the protocol. The + // protocol can reject this, for example if another pipe is already + // active on a 1:1 protocol. The protocol may not block during this, + // as the socket lock is held. + int (*pipe_add)(void *); + + // pipe_rem is called to unregister a pipe from the protocol. + // Threads may still acccess data structures, so the protocol + // should not free anything yet. This is called with the socket + // lock held, so the protocol may not call back into the socket, and + // must not block. + void (*pipe_rem)(void *); + + // pipe_send is a function run in a thread per pipe, to process + // send activity. This can be NULL. + void (*pipe_send)(void *); - // Add and remove pipes. These are called as connections are - // created or destroyed. - int (*proto_add_pipe)(void *, nni_pipe *, void *); - void (*proto_rem_pipe)(void *, void *); + // pipe_recv is a function run in a thread per pipe, to process + // receive activity. While this can be NULL, it should NOT be, as + // otherwise the protocol may not be able to discover the closure of + // the underlying transport (such as a remote disconnect). + void (*pipe_recv)(void *); +}; + +struct nni_proto { + uint16_t proto_self; // our 16-bit protocol ID + uint16_t proto_peer; // who we peer with (ID) + const char * proto_name; // string version of our name + const nni_proto_pipe * proto_pipe; // Per-pipe operations. - // Thread functions for processing send & receive sides of - // protocol pipes. Send may be NULL, but recv should should not. - // (Recv needs to detect a closed pipe, if nothing else.) - void (*proto_pipe_send)(void *); - void (*proto_pipe_recv)(void *); + // Create protocol instance, which will be stored on the socket. + int (*proto_init)(void **, nni_sock *); + + // Destroy the protocol instance. + void (*proto_fini)(void *); // Option manipulation. These may be NULL. - int (*proto_setopt)(void *, int, const void *, size_t); - int (*proto_getopt)(void *, int, void *, size_t *); + int (*proto_setopt)(void *, int, const void *, + size_t); + int (*proto_getopt)(void *, int, void *, size_t *); // Receive filter. This may be NULL, but if it isn't, then // messages coming into the system are routed here just before being // delivered to the application. To drop the message, the prtocol // should return NULL, otherwise the message (possibly modified). - nni_msg * (*proto_recv_filter)(void *, nni_msg *); + nni_msg * (*proto_recv_filter)(void *, nni_msg *); // Send filter. This may be NULL, but if it isn't, then messages // here are filtered just after they come from the application. - nni_msg * (*proto_send_filter)(void *, nni_msg *); + nni_msg * (*proto_send_filter)(void *, nni_msg *); }; // These functions are not used by protocols, but rather by the socket // core implementation. The lookups can be used by transports as well. -extern nni_protocol *nni_protocol_find(uint16_t); -extern const char *nni_protocol_name(uint16_t); -extern uint16_t nni_protocol_number(const char *); +extern nni_proto *nni_proto_find(uint16_t); +extern const char *nni_proto_name(uint16_t); +extern uint16_t nni_proto_number(const char *); +extern uint16_t nni_proto_peer(uint16_t); #endif // CORE_PROTOCOL_H diff --git a/src/core/socket.c b/src/core/socket.c index 6aa40bfc..cad4b10a 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -50,16 +50,15 @@ nni_reaper(void *arg) ep->ep_pipe = NULL; nni_cv_wake(&ep->ep_cv); } - nni_mtx_unlock(&sock->s_mx); // Remove the pipe from the protocol. Protocols may // keep lists of pipes for managing their topologies. // Note that if a protocol has rejected the pipe, it // won't have any data. if (pipe->p_active) { - sock->s_ops.proto_rem_pipe(sock->s_data, - pipe->p_pdata); + pipe->p_proto_ops.pipe_rem(pipe->p_proto_data); } + nni_mtx_unlock(&sock->s_mx); // XXX: also publish event... nni_pipe_destroy(pipe); @@ -79,21 +78,53 @@ nni_reaper(void *arg) } +static nni_msg * +nni_sock_nullfilter(void *arg, nni_msg *mp) +{ + NNI_ARG_UNUSED(arg); + return (mp); +} + + +static int +nni_sock_nullgetopt(void *arg, int num, void *data, size_t *szp) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(num); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(szp); + return (NNG_ENOTSUP); +} + + +static int +nni_sock_nullsetopt(void *arg, int num, const void *data, size_t sz) +{ + NNI_ARG_UNUSED(arg); + NNI_ARG_UNUSED(num); + NNI_ARG_UNUSED(data); + NNI_ARG_UNUSED(sz); + return (NNG_ENOTSUP); +} + + // nn_sock_open creates the underlying socket. int -nni_sock_open(nni_sock **sockp, uint16_t proto) +nni_sock_open(nni_sock **sockp, uint16_t pnum) { nni_sock *sock; - nni_protocol *ops; + nni_proto *proto; int rv; - if ((ops = nni_protocol_find(proto)) == NULL) { + if ((proto = nni_proto_find(pnum)) == NULL) { return (NNG_ENOTSUP); } if ((sock = NNI_ALLOC_STRUCT(sock)) == NULL) { return (NNG_ENOMEM); } - sock->s_ops = *ops; + + // We make a copy of the protocol operations. + sock->s_proto = *proto; sock->s_linger = 0; sock->s_sndtimeo = -1; sock->s_rcvtimeo = -1; @@ -137,7 +168,7 @@ nni_sock_open(nni_sock **sockp, uint16_t proto) return (rv); } - if ((rv = sock->s_ops.proto_create(&sock->s_data, sock)) != 0) { + if ((rv = sock->s_proto.proto_init(&sock->s_data, sock)) != 0) { nni_msgq_fini(sock->s_urq); nni_msgq_fini(sock->s_uwq); nni_thr_fini(&sock->s_reaper); @@ -146,6 +177,18 @@ nni_sock_open(nni_sock **sockp, uint16_t proto) NNI_FREE_STRUCT(sock); return (rv); } + if (sock->s_proto.proto_send_filter == NULL) { + sock->s_proto.proto_send_filter = nni_sock_nullfilter; + } + if (sock->s_proto.proto_recv_filter == NULL) { + sock->s_proto.proto_recv_filter = nni_sock_nullfilter; + } + if (sock->s_proto.proto_getopt == NULL) { + sock->s_proto.proto_getopt = nni_sock_nullgetopt; + } + if (sock->s_proto.proto_setopt == NULL) { + sock->s_proto.proto_setopt = nni_sock_nullsetopt; + } nni_thr_run(&sock->s_reaper); *sockp = sock; return (0); @@ -223,7 +266,7 @@ nni_sock_close(nni_sock *sock) // At this point nothing else should be referencing us. // The protocol needs to clean up its state. - sock->s_ops.proto_destroy(sock->s_data); + sock->s_proto.proto_fini(sock->s_data); // And we need to clean up *our* state. nni_msgq_fini(sock->s_urq); @@ -256,11 +299,9 @@ nni_sock_sendmsg(nni_sock *sock, nni_msg *msg, nni_time expire) besteffort = sock->s_besteffort; nni_mtx_unlock(&sock->s_mx); - if (sock->s_ops.proto_send_filter != NULL) { - msg = sock->s_ops.proto_send_filter(sock->s_data, msg); - if (msg == NULL) { - return (0); - } + msg = sock->s_proto.proto_send_filter(sock->s_data, msg); + if (msg == NULL) { + return (0); } if (besteffort) { @@ -300,9 +341,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) if (rv != 0) { return (rv); } - if (sock->s_ops.proto_recv_filter != NULL) { - msg = sock->s_ops.proto_recv_filter(sock->s_data, msg); - } + msg = sock->s_proto.proto_recv_filter(sock->s_data, msg); if (msg != NULL) { break; } @@ -318,7 +357,7 @@ nni_sock_recvmsg(nni_sock *sock, nni_msg **msgp, nni_time expire) uint16_t nni_sock_proto(nni_sock *sock) { - return (sock->s_ops.proto_self); + return (sock->s_proto.proto_self); } @@ -386,12 +425,10 @@ nni_sock_setopt(nni_sock *sock, int opt, const void *val, size_t size) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); - if (sock->s_ops.proto_setopt != NULL) { - rv = sock->s_ops.proto_setopt(sock->s_data, opt, val, size); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } + rv = sock->s_proto.proto_setopt(sock->s_data, opt, val, size); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); } switch (opt) { case NNG_OPT_LINGER: @@ -429,12 +466,10 @@ nni_sock_getopt(nni_sock *sock, int opt, void *val, size_t *sizep) int rv = ENOTSUP; nni_mtx_lock(&sock->s_mx); - if (sock->s_ops.proto_getopt != NULL) { - rv = sock->s_ops.proto_getopt(sock->s_data, opt, val, sizep); - if (rv != NNG_ENOTSUP) { - nni_mtx_unlock(&sock->s_mx); - return (rv); - } + rv = sock->s_proto.proto_getopt(sock->s_data, opt, val, sizep); + if (rv != NNG_ENOTSUP) { + nni_mtx_unlock(&sock->s_mx); + return (rv); } switch (opt) { case NNG_OPT_LINGER: diff --git a/src/core/socket.h b/src/core/socket.h index 548b4697..8b3c3ff9 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -20,7 +20,7 @@ struct nng_socket { nni_msgq * s_uwq; // Upper write queue nni_msgq * s_urq; // Upper read queue - nni_protocol s_ops; + nni_proto s_proto; void * s_data; // Protocol private |
