diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/core/defs.h | 2 | ||||
| -rw-r--r-- | src/core/init.c | 1 | ||||
| -rw-r--r-- | src/core/socket.c | 72 | ||||
| -rw-r--r-- | src/core/socket.h | 8 | ||||
| -rw-r--r-- | src/nng.c | 181 | ||||
| -rw-r--r-- | src/nng.h | 40 |
6 files changed, 232 insertions, 72 deletions
diff --git a/src/core/defs.h b/src/core/defs.h index eb1a6475..ff36cb3f 100644 --- a/src/core/defs.h +++ b/src/core/defs.h @@ -22,13 +22,13 @@ nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, # x) // These types are common but have names shared with user space. -typedef struct nng_socket nni_sock; typedef struct nng_msg nni_msg; typedef struct nng_sockaddr nni_sockaddr; typedef struct nng_event nni_event; typedef struct nng_notify nni_notify; // These are our own names. +typedef struct nni_socket nni_sock; typedef struct nni_ep nni_ep; typedef struct nni_pipe nni_pipe; typedef struct nni_tran nni_tran; diff --git a/src/core/init.c b/src/core/init.c index e0ce3bae..48a5096c 100644 --- a/src/core/init.c +++ b/src/core/init.c @@ -37,6 +37,7 @@ nni_init_helper(void) } nni_idhash_set_limits(nni_pipes, 1, 0x7fffffff, nni_random() & 0x7fffffff); + nni_idhash_set_limits(nni_sockets, 1, 0xffffffff, 1); nni_idlock = &nni_idlock_x; nni_tran_init(); return (0); diff --git a/src/core/socket.c b/src/core/socket.c index d9119a64..b8c5544b 100644 --- a/src/core/socket.c +++ b/src/core/socket.c @@ -13,6 +13,13 @@ // Socket implementation. +uint32_t +nni_sock_id(nni_sock *s) +{ + return (s->s_id); +} + + // nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain // the upper read and write queues. nni_msgq * @@ -29,6 +36,48 @@ nni_sock_recvq(nni_sock *s) } +int +nni_sock_hold(nni_sock **sockp, uint32_t id) +{ + int rv; + nni_sock *sock; + + if ((rv = nni_init()) != 0) { + return (rv); + } + nni_mtx_lock(nni_idlock); + rv = nni_idhash_find(nni_sockets, id, (void **) &sock); + if (rv == 0) { + if (sock->s_closing) { + rv = NNG_ECLOSED; + } else { + nni_mtx_lock(&sock->s_mx); + sock->s_refcnt++; + nni_mtx_unlock(&sock->s_mx); + *sockp = sock; + } + } + nni_mtx_unlock(nni_idlock); + + if (rv == NNG_ENOENT) { + rv = NNG_ECLOSED; + } + return (rv); +} + + +void +nni_sock_rele(nni_sock *sock) +{ + nni_mtx_lock(&sock->s_mx); + sock->s_refcnt--; + if (sock->s_closing) { + nni_cv_wake(&sock->s_cv); + } + nni_mtx_unlock(&sock->s_mx); +} + + // XXX: don't expose the upper queues to protocols, because we need to // trap on activity in those queues! @@ -182,6 +231,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) nni_proto_sock_ops *sops; nni_proto_pipe_ops *pops; + if ((rv = nni_init()) != 0) { + return (rv); + } if ((proto = nni_proto_find(pnum)) == NULL) { return (NNG_ENOTSUP); } @@ -257,6 +309,16 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum) goto fail; } + nni_mtx_lock(nni_idlock); + rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock); + nni_mtx_unlock(nni_idlock); + if (rv != 0) { + goto fail; + } + + // Caller always gets the socket held. + sock->s_refcnt = 1; + if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) { goto fail; } @@ -290,6 +352,11 @@ fail: for (i = 0; i < NNI_MAXWORKERS; i++) { nni_thr_fini(&sock->s_worker_thr[i]); } + if (sock->s_id != 0) { + nni_mtx_lock(nni_idlock); + nni_idhash_remove(nni_sockets, sock->s_id); + nni_mtx_unlock(nni_idlock); + } nni_thr_fini(&sock->s_notifier); nni_thr_fini(&sock->s_reaper); nni_ev_fini(&sock->s_send_ev); @@ -415,6 +482,11 @@ nni_sock_close(nni_sock *sock) // Shutdown everything if not already done. This operation // is idempotent. nni_sock_shutdown(sock); + nni_mtx_lock(&sock->s_mx); + while (sock->s_refcnt > 1) { + nni_cv_wait(&sock->s_cv); + } + nni_mtx_unlock(&sock->s_mx); // At this point nothing else should be referencing us. // As with UNIX close, it is a gross error for the caller diff --git a/src/core/socket.h b/src/core/socket.h index 1743869a..2a30fae5 100644 --- a/src/core/socket.h +++ b/src/core/socket.h @@ -13,10 +13,13 @@ // NB: This structure is supplied here for use by the CORE. Use of this library // OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR // TRANSPORTS. -struct nng_socket { +struct nni_socket { nni_mtx s_mx; nni_cv s_cv; + uint32_t s_id; + uint32_t s_refcnt; + nni_msgq * s_uwq; // Upper write queue nni_msgq * s_urq; // Upper read queue @@ -60,6 +63,8 @@ struct nng_socket { uint32_t s_nextid; // Next Pipe ID. }; +extern int nni_sock_hold(nni_sock **, uint32_t); +extern void nni_sock_rele(nni_sock *); extern int nni_sock_open(nni_sock **, uint16_t); extern void nni_sock_close(nni_sock *); extern int nni_sock_shutdown(nni_sock *); @@ -71,6 +76,7 @@ extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time); extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time); extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int); extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int); +extern uint32_t nni_sock_id(nni_sock *); // Set error codes for applications. These are only ever // called from the filter functions in protocols, and thus @@ -30,156 +30,237 @@ (void) nni_init() int -nng_open(nng_socket **s, uint16_t proto) +nng_open(nng_socket *sidp, uint16_t proto) { int rv; + nni_sock *sock; - if ((rv = nni_init()) != 0) { + if ((rv = nni_sock_open(&sock, proto)) != 0) { return (rv); } - return (nni_sock_open(s, proto)); + *sidp = nni_sock_id(sock); + nni_sock_rele(sock); + return (0); } int -nng_shutdown(nng_socket *s) +nng_shutdown(nng_socket sid) { - NNI_INIT_INT(); - return (nni_sock_shutdown(s)); + int rv; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + rv = nni_sock_shutdown(sock); + nni_sock_rele(sock); + return (rv); } -void -nng_close(nng_socket *s) +int +nng_close(nng_socket sid) { - NNI_INIT_VOID(); - nni_sock_close(s); + int rv; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + // No release -- close releases it. + nni_sock_close(sock); + return (rv); } uint16_t -nng_protocol(nng_socket *s) +nng_protocol(nng_socket sid) { - NNI_INIT_VOID(); - return (nni_sock_proto(s)); + int rv; + uint16_t pnum; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + pnum = nni_sock_proto(sock); + nni_sock_rele(sock); + return (pnum); } uint16_t -nng_peer(nng_socket *s) +nng_peer(nng_socket sid) { - NNI_INIT_VOID(); - return (nni_sock_peer(s)); + int rv; + uint16_t pnum; + nni_sock *sock; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + pnum = nni_sock_peer(sock); + nni_sock_rele(sock); + return (pnum); } int -nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags) +nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags) { nni_time expire; + int rv; + nni_sock *sock; - NNI_INIT_INT(); - if ((flags == NNG_FLAG_NONBLOCK) || (s->s_rcvtimeo == 0)) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_rcvtimeo == 0)) { expire = NNI_TIME_ZERO; - } else if (s->s_rcvtimeo < 0) { + } else if (sock->s_rcvtimeo < 0) { expire = NNI_TIME_NEVER; } else { expire = nni_clock(); - expire += s->s_rcvtimeo; + expire += sock->s_rcvtimeo; } - return (nni_sock_recvmsg(s, msgp, expire)); + rv = nni_sock_recvmsg(sock, msgp, expire); + nni_sock_rele(sock); + return (rv); } int -nng_sendmsg(nng_socket *s, nng_msg *msg, int flags) +nng_sendmsg(nng_socket sid, nng_msg *msg, int flags) { nni_time expire; + int rv; + nni_sock *sock; - NNI_INIT_INT(); - - if ((flags == NNG_FLAG_NONBLOCK) || (s->s_sndtimeo == 0)) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_sndtimeo == 0)) { expire = NNI_TIME_ZERO; - } else if (s->s_sndtimeo < 0) { + } else if (sock->s_sndtimeo < 0) { expire = NNI_TIME_NEVER; } else { expire = nni_clock(); - expire += s->s_sndtimeo; + expire += sock->s_sndtimeo; } - return (nni_sock_sendmsg(s, msg, expire)); + rv = nni_sock_sendmsg(sock, msg, expire); + nni_sock_rele(sock); + return (rv); } int -nng_dial(nng_socket *s, const char *addr, nng_endpoint *epp, int flags) +nng_dial(nng_socket sid, const char *addr, nng_endpoint *epp, int flags) { nni_ep *ep; int rv; + nni_sock *sock; - NNI_INIT_INT(); - if ((rv = nni_sock_dial(s, addr, &ep, flags)) == 0) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((rv = nni_sock_dial(sock, addr, &ep, flags)) == 0) { if (epp != NULL) { *epp = ep->ep_id; } } + nni_sock_rele(sock); return (rv); } int -nng_listen(nng_socket *s, const char *addr, nng_endpoint *epp, int flags) +nng_listen(nng_socket sid, const char *addr, nng_endpoint *epp, int flags) { nni_ep *ep; int rv; + nni_sock *sock; - NNI_INIT_INT(); - if ((rv = nni_sock_listen(s, addr, &ep, flags)) == 0) { + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + if ((rv = nni_sock_listen(sock, addr, &ep, flags)) == 0) { if (epp != NULL) { *epp = ep->ep_id; } } + nni_sock_rele(sock); return (rv); } int -nng_setopt(nng_socket *s, int opt, const void *val, size_t sz) +nng_setopt(nng_socket sid, int opt, const void *val, size_t sz) { - NNI_INIT_INT(); - return (nni_sock_setopt(s, opt, val, sz)); + nni_sock *sock; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + rv = nni_sock_setopt(sock, opt, val, sz); + nni_sock_rele(sock); + return (rv); } int -nng_getopt(nng_socket *s, int opt, void *val, size_t *szp) +nng_getopt(nng_socket sid, int opt, void *val, size_t *szp) { - NNI_INIT_INT(); - return (nni_sock_getopt(s, opt, val, szp)); + nni_sock *sock; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (rv); + } + rv = nni_sock_getopt(sock, opt, val, szp); + nni_sock_rele(sock); + return (rv); } nng_notify * -nng_setnotify(nng_socket *sock, int mask, nng_notify_func fn, void *arg) +nng_setnotify(nng_socket sid, int mask, nng_notify_func fn, void *arg) { - NNI_INIT_VOID(); - return (nni_add_notify(sock, mask, fn, arg)); + nni_sock *sock; + nng_notify *notify; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return (NULL); + } + notify = nni_add_notify(sock, mask, fn, arg); + nni_sock_rele(sock); + return (notify); } void -nng_unsetnotify(nng_socket *sock, nng_notify *notify) +nng_unsetnotify(nng_socket sid, nng_notify *notify) { - NNI_INIT_VOID(); + nni_sock *sock; + int rv; + + if ((rv = nni_sock_hold(&sock, sid)) != 0) { + return; + } nni_rem_notify(sock, notify); + nni_sock_rele(sock); } -nng_socket * +nng_socket nng_event_socket(nng_event *ev) { - return (ev->e_sock); + // FOR NOW.... maybe evnet should contain socket Id instead? + return (nni_sock_id(ev->e_sock)); } @@ -424,7 +505,7 @@ nng_snapshot_destroy(nng_snapshot *snap) int -nng_snapshot_update(nng_socket *sock, nng_snapshot *snap) +nng_snapshot_update(nng_socket sock, nng_snapshot *snap) { // Stats TBD. NNI_INIT_INT(); @@ -467,7 +548,7 @@ nng_stat_value(nng_stat *stat) int -nng_device(nng_socket *sock1, nng_socket *sock2) +nng_device(nng_socket sock1, nng_socket sock2) { // Device TBD. NNI_INIT_INT(); @@ -42,7 +42,7 @@ extern "C" { #endif // NNG_DECL // Types common to nng. -typedef struct nng_socket nng_socket; +typedef uint32_t nng_socket; typedef uint32_t nng_endpoint; typedef uint32_t nng_pipe; typedef struct nng_msg nng_msg; @@ -54,7 +54,7 @@ typedef struct nng_stat nng_stat; // nng_open simply creates a socket of the given class. It returns an // error code on failure, or zero on success. The socket starts in cooked // mode. -NNG_DECL int nng_open(nng_socket **, uint16_t proto); +NNG_DECL int nng_open(nng_socket *, uint16_t proto); // nng_close closes the socket, terminating all activity and // closing any underlying connections and releasing any associated @@ -62,26 +62,26 @@ NNG_DECL int nng_open(nng_socket **, uint16_t proto); // error to reference the socket in any way after this is called. Likewise, // it is an error to reference any resources such as endpoints or // pipes associated with the socket. -NNG_DECL void nng_close(nng_socket *); +NNG_DECL int nng_close(nng_socket); // nng_shutdown shuts down the socket. This causes any threads doing // work for the socket or blocked in socket functions to be woken (and // return NNG_ECLOSED). The socket resources are still present, so it // is safe to call other functions; they will just return NNG_ECLOSED. // A call to nng_close is still required to release the resources. -NNG_DECL int nng_shutdown(nng_socket *); +NNG_DECL int nng_shutdown(nng_socket); // nng_protocol returns the protocol number of the socket. -NNG_DECL uint16_t nng_protocol(nng_socket *); +NNG_DECL uint16_t nng_protocol(nng_socket); // nng_peer returns the protocol number for the socket's peer. -NNG_DECL uint16_t nng_peer(nng_socket *); +NNG_DECL uint16_t nng_peer(nng_socket); // nng_setopt sets an option for a specific socket. -NNG_DECL int nng_setopt(nng_socket *, int, const void *, size_t); +NNG_DECL int nng_setopt(nng_socket, int, const void *, size_t); // nng_socket_getopt obtains the option for a socket. -NNG_DECL int nng_getopt(nng_socket *, int, void *, size_t *); +NNG_DECL int nng_getopt(nng_socket, int, void *, size_t *); // nng_notify_func is a user function that is executed upon certain // events. See below. @@ -92,14 +92,14 @@ typedef void (*nng_notify_func)(nng_event *, void *); // separate thread. Event delivery is not guaranteed, and can fail // if events occur more quickly than the callback can handle, or // if memory or other resources are scarce. -NNG_DECL nng_notify *nng_setnotify(nng_socket *, int, nng_notify_func, void *); +NNG_DECL nng_notify *nng_setnotify(nng_socket, int, nng_notify_func, void *); // nng_unsetnotify unregisters a previously registered notification callback. // Once this returns, the associated callback will not be executed any longer. // If the callback is running when this called, then it will wait until that // callback completes. (The caller of this function should not hold any // locks acqured by the callback, in order to avoid a deadlock.) -NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *); +NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *); // Event types. Sockets can have multiple different kind of events. // Note that these are edge triggered -- therefore the status indicated @@ -125,7 +125,7 @@ NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *); // Some of the values will not make sense for some event types, in which case // the value returned will be NULL. NNG_DECL int nng_event_type(nng_event *); -NNG_DECL nng_socket *nng_event_socket(nng_event *); +NNG_DECL nng_socket nng_event_socket(nng_event *); NNG_DECL nng_endpoint nng_event_endpoint(nng_event *); NNG_DECL nng_pipe nng_event_pipe(nng_event *); NNG_DECL const char *nng_event_reason(nng_event *); @@ -136,7 +136,7 @@ NNG_DECL const char *nng_event_reason(nng_event *); // endpoint pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to // indicate that a failure setting the socket up should return an error // back to the caller immediately. -NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpoint *, int); +NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int); // nng_dial creates a dialing endpoint, with no special options, and // starts it dialing. Dialers have at most one active connection at a time @@ -146,11 +146,11 @@ NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpoint *, int); // dial will be made synchronously, and a failure condition returned back // to the caller. (If the connection is dropped, it will still be // reconnected in the background -- only the initial connect is synchronous.) -NNG_DECL int nng_dial(nng_socket *, const char *, nng_endpoint *, int); +NNG_DECL int nng_dial(nng_socket, const char *, nng_endpoint *, int); // nng_endpoint_create creates an endpoint on the socket, but does not // start it either dialing or listening. -NNG_DECL int nng_endpoint_create(nng_endpoint *, nng_socket *, const char *); +NNG_DECL int nng_endpoint_create(nng_endpoint *, nng_socket, const char *); // nng_endpoint_dial starts the endpoint dialing. This is only possible if // the endpoint is not already dialing or listening. @@ -181,7 +181,7 @@ NNG_DECL const char *nng_strerror(int); // received the data. The return value will be zero to indicate that the // socket has accepted the entire data for send, or an errno to indicate // failure. The flags may include NNG_FLAG_NONBLOCK. -NNG_DECL int nng_send(nng_socket *, const void *, size_t, int); +NNG_DECL int nng_send(nng_socket, const void *, size_t, int); // nng_recv receives message data into the socket, up to the supplied size. // The actual size of the message data will be written to the value pointed @@ -190,19 +190,19 @@ NNG_DECL int nng_send(nng_socket *, const void *, size_t, int); // the caller. In that case the pointer to the allocated will be stored // instead of the data itself. The caller is responsible for freeing the // associated memory with free(). -NNG_DECL int nng_recv(nng_socket *, void *, size_t *, int); +NNG_DECL int nng_recv(nng_socket, void *, size_t *, int); // nng_sendmsg is like nng_send, but offers up a message structure, which // gives the ability to provide more control over the message, including // providing backtrace information. It also can take a message that was // obtain via nn_recvmsg, allowing for zero copy forwarding. -NNG_DECL int nng_sendmsg(nng_socket *, nng_msg *, int); +NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int); // nng_recvmsg is like nng_recv, but is used to obtain a message structure // as well as the data buffer. This can be used to obtain more information // about where the message came from, access raw headers, etc. It also // can be passed off directly to nng_sendmsg. -NNG_DECL int nng_recvmsg(nng_socket *, nng_msg **, int); +NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int); // Message API. NNG_DECL int nng_msg_alloc(nng_msg **, size_t); @@ -306,7 +306,7 @@ NNG_DECL void nng_snapshot_free(nng_snapshot *); // relevant to a particular socket. All prior values are overwritten. // It is acceptable to use the same snapshot object with different // sockets. -NNG_DECL int nng_snapshot_update(nng_socket *, nng_snapshot *); +NNG_DECL int nng_snapshot_update(nng_socket, nng_snapshot *); // nng_snapshot_next is used to iterate over the individual statistic // objects inside the snapshot. Note that the statistic object, and the @@ -356,7 +356,7 @@ NNG_DECL int64_t nng_stat_value(nng_stat *); // Device functionality. This connects two sockets together in a device, // which means that messages from one side are forwarded to the other. -NNG_DECL int nng_device(nng_socket *, nng_socket *); +NNG_DECL int nng_device(nng_socket, nng_socket); // Pollset functionality. TBD. (Note that I'd rather avoid this // altogether, because I believe that the notification mechanism I've |
