summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/defs.h2
-rw-r--r--src/core/init.c1
-rw-r--r--src/core/socket.c72
-rw-r--r--src/core/socket.h8
-rw-r--r--src/nng.c181
-rw-r--r--src/nng.h40
6 files changed, 232 insertions, 72 deletions
diff --git a/src/core/defs.h b/src/core/defs.h
index eb1a6475..ff36cb3f 100644
--- a/src/core/defs.h
+++ b/src/core/defs.h
@@ -22,13 +22,13 @@
nni_panic("%s: %d: assert err: %s", __FILE__, __LINE__, # x)
// These types are common but have names shared with user space.
-typedef struct nng_socket nni_sock;
typedef struct nng_msg nni_msg;
typedef struct nng_sockaddr nni_sockaddr;
typedef struct nng_event nni_event;
typedef struct nng_notify nni_notify;
// These are our own names.
+typedef struct nni_socket nni_sock;
typedef struct nni_ep nni_ep;
typedef struct nni_pipe nni_pipe;
typedef struct nni_tran nni_tran;
diff --git a/src/core/init.c b/src/core/init.c
index e0ce3bae..48a5096c 100644
--- a/src/core/init.c
+++ b/src/core/init.c
@@ -37,6 +37,7 @@ nni_init_helper(void)
}
nni_idhash_set_limits(nni_pipes, 1, 0x7fffffff,
nni_random() & 0x7fffffff);
+ nni_idhash_set_limits(nni_sockets, 1, 0xffffffff, 1);
nni_idlock = &nni_idlock_x;
nni_tran_init();
return (0);
diff --git a/src/core/socket.c b/src/core/socket.c
index d9119a64..b8c5544b 100644
--- a/src/core/socket.c
+++ b/src/core/socket.c
@@ -13,6 +13,13 @@
// Socket implementation.
+uint32_t
+nni_sock_id(nni_sock *s)
+{
+ return (s->s_id);
+}
+
+
// nni_sock_sendq and nni_sock_recvq are called by the protocol to obtain
// the upper read and write queues.
nni_msgq *
@@ -29,6 +36,48 @@ nni_sock_recvq(nni_sock *s)
}
+int
+nni_sock_hold(nni_sock **sockp, uint32_t id)
+{
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_find(nni_sockets, id, (void **) &sock);
+ if (rv == 0) {
+ if (sock->s_closing) {
+ rv = NNG_ECLOSED;
+ } else {
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_refcnt++;
+ nni_mtx_unlock(&sock->s_mx);
+ *sockp = sock;
+ }
+ }
+ nni_mtx_unlock(nni_idlock);
+
+ if (rv == NNG_ENOENT) {
+ rv = NNG_ECLOSED;
+ }
+ return (rv);
+}
+
+
+void
+nni_sock_rele(nni_sock *sock)
+{
+ nni_mtx_lock(&sock->s_mx);
+ sock->s_refcnt--;
+ if (sock->s_closing) {
+ nni_cv_wake(&sock->s_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
+}
+
+
// XXX: don't expose the upper queues to protocols, because we need to
// trap on activity in those queues!
@@ -182,6 +231,9 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
nni_proto_sock_ops *sops;
nni_proto_pipe_ops *pops;
+ if ((rv = nni_init()) != 0) {
+ return (rv);
+ }
if ((proto = nni_proto_find(pnum)) == NULL) {
return (NNG_ENOTSUP);
}
@@ -257,6 +309,16 @@ nni_sock_open(nni_sock **sockp, uint16_t pnum)
goto fail;
}
+ nni_mtx_lock(nni_idlock);
+ rv = nni_idhash_alloc(nni_sockets, &sock->s_id, sock);
+ nni_mtx_unlock(nni_idlock);
+ if (rv != 0) {
+ goto fail;
+ }
+
+ // Caller always gets the socket held.
+ sock->s_refcnt = 1;
+
if ((rv = sops->sock_init(&sock->s_data, sock)) != 0) {
goto fail;
}
@@ -290,6 +352,11 @@ fail:
for (i = 0; i < NNI_MAXWORKERS; i++) {
nni_thr_fini(&sock->s_worker_thr[i]);
}
+ if (sock->s_id != 0) {
+ nni_mtx_lock(nni_idlock);
+ nni_idhash_remove(nni_sockets, sock->s_id);
+ nni_mtx_unlock(nni_idlock);
+ }
nni_thr_fini(&sock->s_notifier);
nni_thr_fini(&sock->s_reaper);
nni_ev_fini(&sock->s_send_ev);
@@ -415,6 +482,11 @@ nni_sock_close(nni_sock *sock)
// Shutdown everything if not already done. This operation
// is idempotent.
nni_sock_shutdown(sock);
+ nni_mtx_lock(&sock->s_mx);
+ while (sock->s_refcnt > 1) {
+ nni_cv_wait(&sock->s_cv);
+ }
+ nni_mtx_unlock(&sock->s_mx);
// At this point nothing else should be referencing us.
// As with UNIX close, it is a gross error for the caller
diff --git a/src/core/socket.h b/src/core/socket.h
index 1743869a..2a30fae5 100644
--- a/src/core/socket.h
+++ b/src/core/socket.h
@@ -13,10 +13,13 @@
// NB: This structure is supplied here for use by the CORE. Use of this library
// OUSIDE of the core is STRICTLY VERBOTEN. NO DIRECT ACCESS BY PROTOCOLS OR
// TRANSPORTS.
-struct nng_socket {
+struct nni_socket {
nni_mtx s_mx;
nni_cv s_cv;
+ uint32_t s_id;
+ uint32_t s_refcnt;
+
nni_msgq * s_uwq; // Upper write queue
nni_msgq * s_urq; // Upper read queue
@@ -60,6 +63,8 @@ struct nng_socket {
uint32_t s_nextid; // Next Pipe ID.
};
+extern int nni_sock_hold(nni_sock **, uint32_t);
+extern void nni_sock_rele(nni_sock *);
extern int nni_sock_open(nni_sock **, uint16_t);
extern void nni_sock_close(nni_sock *);
extern int nni_sock_shutdown(nni_sock *);
@@ -71,6 +76,7 @@ extern int nni_sock_recvmsg(nni_sock *, nni_msg **, nni_time);
extern int nni_sock_sendmsg(nni_sock *, nni_msg *, nni_time);
extern int nni_sock_dial(nni_sock *, const char *, nni_ep **, int);
extern int nni_sock_listen(nni_sock *, const char *, nni_ep **, int);
+extern uint32_t nni_sock_id(nni_sock *);
// Set error codes for applications. These are only ever
// called from the filter functions in protocols, and thus
diff --git a/src/nng.c b/src/nng.c
index 77ebebaf..03d5a98c 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -30,156 +30,237 @@
(void) nni_init()
int
-nng_open(nng_socket **s, uint16_t proto)
+nng_open(nng_socket *sidp, uint16_t proto)
{
int rv;
+ nni_sock *sock;
- if ((rv = nni_init()) != 0) {
+ if ((rv = nni_sock_open(&sock, proto)) != 0) {
return (rv);
}
- return (nni_sock_open(s, proto));
+ *sidp = nni_sock_id(sock);
+ nni_sock_rele(sock);
+ return (0);
}
int
-nng_shutdown(nng_socket *s)
+nng_shutdown(nng_socket sid)
{
- NNI_INIT_INT();
- return (nni_sock_shutdown(s));
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ rv = nni_sock_shutdown(sock);
+ nni_sock_rele(sock);
+ return (rv);
}
-void
-nng_close(nng_socket *s)
+int
+nng_close(nng_socket sid)
{
- NNI_INIT_VOID();
- nni_sock_close(s);
+ int rv;
+ nni_sock *sock;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ // No release -- close releases it.
+ nni_sock_close(sock);
+ return (rv);
}
uint16_t
-nng_protocol(nng_socket *s)
+nng_protocol(nng_socket sid)
{
- NNI_INIT_VOID();
- return (nni_sock_proto(s));
+ int rv;
+ uint16_t pnum;
+ nni_sock *sock;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ pnum = nni_sock_proto(sock);
+ nni_sock_rele(sock);
+ return (pnum);
}
uint16_t
-nng_peer(nng_socket *s)
+nng_peer(nng_socket sid)
{
- NNI_INIT_VOID();
- return (nni_sock_peer(s));
+ int rv;
+ uint16_t pnum;
+ nni_sock *sock;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ pnum = nni_sock_peer(sock);
+ nni_sock_rele(sock);
+ return (pnum);
}
int
-nng_recvmsg(nng_socket *s, nng_msg **msgp, int flags)
+nng_recvmsg(nng_socket sid, nng_msg **msgp, int flags)
{
nni_time expire;
+ int rv;
+ nni_sock *sock;
- NNI_INIT_INT();
- if ((flags == NNG_FLAG_NONBLOCK) || (s->s_rcvtimeo == 0)) {
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_rcvtimeo == 0)) {
expire = NNI_TIME_ZERO;
- } else if (s->s_rcvtimeo < 0) {
+ } else if (sock->s_rcvtimeo < 0) {
expire = NNI_TIME_NEVER;
} else {
expire = nni_clock();
- expire += s->s_rcvtimeo;
+ expire += sock->s_rcvtimeo;
}
- return (nni_sock_recvmsg(s, msgp, expire));
+ rv = nni_sock_recvmsg(sock, msgp, expire);
+ nni_sock_rele(sock);
+ return (rv);
}
int
-nng_sendmsg(nng_socket *s, nng_msg *msg, int flags)
+nng_sendmsg(nng_socket sid, nng_msg *msg, int flags)
{
nni_time expire;
+ int rv;
+ nni_sock *sock;
- NNI_INIT_INT();
-
- if ((flags == NNG_FLAG_NONBLOCK) || (s->s_sndtimeo == 0)) {
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ if ((flags == NNG_FLAG_NONBLOCK) || (sock->s_sndtimeo == 0)) {
expire = NNI_TIME_ZERO;
- } else if (s->s_sndtimeo < 0) {
+ } else if (sock->s_sndtimeo < 0) {
expire = NNI_TIME_NEVER;
} else {
expire = nni_clock();
- expire += s->s_sndtimeo;
+ expire += sock->s_sndtimeo;
}
- return (nni_sock_sendmsg(s, msg, expire));
+ rv = nni_sock_sendmsg(sock, msg, expire);
+ nni_sock_rele(sock);
+ return (rv);
}
int
-nng_dial(nng_socket *s, const char *addr, nng_endpoint *epp, int flags)
+nng_dial(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
{
nni_ep *ep;
int rv;
+ nni_sock *sock;
- NNI_INIT_INT();
- if ((rv = nni_sock_dial(s, addr, &ep, flags)) == 0) {
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_sock_dial(sock, addr, &ep, flags)) == 0) {
if (epp != NULL) {
*epp = ep->ep_id;
}
}
+ nni_sock_rele(sock);
return (rv);
}
int
-nng_listen(nng_socket *s, const char *addr, nng_endpoint *epp, int flags)
+nng_listen(nng_socket sid, const char *addr, nng_endpoint *epp, int flags)
{
nni_ep *ep;
int rv;
+ nni_sock *sock;
- NNI_INIT_INT();
- if ((rv = nni_sock_listen(s, addr, &ep, flags)) == 0) {
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ if ((rv = nni_sock_listen(sock, addr, &ep, flags)) == 0) {
if (epp != NULL) {
*epp = ep->ep_id;
}
}
+ nni_sock_rele(sock);
return (rv);
}
int
-nng_setopt(nng_socket *s, int opt, const void *val, size_t sz)
+nng_setopt(nng_socket sid, int opt, const void *val, size_t sz)
{
- NNI_INIT_INT();
- return (nni_sock_setopt(s, opt, val, sz));
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ rv = nni_sock_setopt(sock, opt, val, sz);
+ nni_sock_rele(sock);
+ return (rv);
}
int
-nng_getopt(nng_socket *s, int opt, void *val, size_t *szp)
+nng_getopt(nng_socket sid, int opt, void *val, size_t *szp)
{
- NNI_INIT_INT();
- return (nni_sock_getopt(s, opt, val, szp));
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (rv);
+ }
+ rv = nni_sock_getopt(sock, opt, val, szp);
+ nni_sock_rele(sock);
+ return (rv);
}
nng_notify *
-nng_setnotify(nng_socket *sock, int mask, nng_notify_func fn, void *arg)
+nng_setnotify(nng_socket sid, int mask, nng_notify_func fn, void *arg)
{
- NNI_INIT_VOID();
- return (nni_add_notify(sock, mask, fn, arg));
+ nni_sock *sock;
+ nng_notify *notify;
+ int rv;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return (NULL);
+ }
+ notify = nni_add_notify(sock, mask, fn, arg);
+ nni_sock_rele(sock);
+ return (notify);
}
void
-nng_unsetnotify(nng_socket *sock, nng_notify *notify)
+nng_unsetnotify(nng_socket sid, nng_notify *notify)
{
- NNI_INIT_VOID();
+ nni_sock *sock;
+ int rv;
+
+ if ((rv = nni_sock_hold(&sock, sid)) != 0) {
+ return;
+ }
nni_rem_notify(sock, notify);
+ nni_sock_rele(sock);
}
-nng_socket *
+nng_socket
nng_event_socket(nng_event *ev)
{
- return (ev->e_sock);
+ // FOR NOW.... maybe evnet should contain socket Id instead?
+ return (nni_sock_id(ev->e_sock));
}
@@ -424,7 +505,7 @@ nng_snapshot_destroy(nng_snapshot *snap)
int
-nng_snapshot_update(nng_socket *sock, nng_snapshot *snap)
+nng_snapshot_update(nng_socket sock, nng_snapshot *snap)
{
// Stats TBD.
NNI_INIT_INT();
@@ -467,7 +548,7 @@ nng_stat_value(nng_stat *stat)
int
-nng_device(nng_socket *sock1, nng_socket *sock2)
+nng_device(nng_socket sock1, nng_socket sock2)
{
// Device TBD.
NNI_INIT_INT();
diff --git a/src/nng.h b/src/nng.h
index 9db80f68..5eb0edea 100644
--- a/src/nng.h
+++ b/src/nng.h
@@ -42,7 +42,7 @@ extern "C" {
#endif // NNG_DECL
// Types common to nng.
-typedef struct nng_socket nng_socket;
+typedef uint32_t nng_socket;
typedef uint32_t nng_endpoint;
typedef uint32_t nng_pipe;
typedef struct nng_msg nng_msg;
@@ -54,7 +54,7 @@ typedef struct nng_stat nng_stat;
// nng_open simply creates a socket of the given class. It returns an
// error code on failure, or zero on success. The socket starts in cooked
// mode.
-NNG_DECL int nng_open(nng_socket **, uint16_t proto);
+NNG_DECL int nng_open(nng_socket *, uint16_t proto);
// nng_close closes the socket, terminating all activity and
// closing any underlying connections and releasing any associated
@@ -62,26 +62,26 @@ NNG_DECL int nng_open(nng_socket **, uint16_t proto);
// error to reference the socket in any way after this is called. Likewise,
// it is an error to reference any resources such as endpoints or
// pipes associated with the socket.
-NNG_DECL void nng_close(nng_socket *);
+NNG_DECL int nng_close(nng_socket);
// nng_shutdown shuts down the socket. This causes any threads doing
// work for the socket or blocked in socket functions to be woken (and
// return NNG_ECLOSED). The socket resources are still present, so it
// is safe to call other functions; they will just return NNG_ECLOSED.
// A call to nng_close is still required to release the resources.
-NNG_DECL int nng_shutdown(nng_socket *);
+NNG_DECL int nng_shutdown(nng_socket);
// nng_protocol returns the protocol number of the socket.
-NNG_DECL uint16_t nng_protocol(nng_socket *);
+NNG_DECL uint16_t nng_protocol(nng_socket);
// nng_peer returns the protocol number for the socket's peer.
-NNG_DECL uint16_t nng_peer(nng_socket *);
+NNG_DECL uint16_t nng_peer(nng_socket);
// nng_setopt sets an option for a specific socket.
-NNG_DECL int nng_setopt(nng_socket *, int, const void *, size_t);
+NNG_DECL int nng_setopt(nng_socket, int, const void *, size_t);
// nng_socket_getopt obtains the option for a socket.
-NNG_DECL int nng_getopt(nng_socket *, int, void *, size_t *);
+NNG_DECL int nng_getopt(nng_socket, int, void *, size_t *);
// nng_notify_func is a user function that is executed upon certain
// events. See below.
@@ -92,14 +92,14 @@ typedef void (*nng_notify_func)(nng_event *, void *);
// separate thread. Event delivery is not guaranteed, and can fail
// if events occur more quickly than the callback can handle, or
// if memory or other resources are scarce.
-NNG_DECL nng_notify *nng_setnotify(nng_socket *, int, nng_notify_func, void *);
+NNG_DECL nng_notify *nng_setnotify(nng_socket, int, nng_notify_func, void *);
// nng_unsetnotify unregisters a previously registered notification callback.
// Once this returns, the associated callback will not be executed any longer.
// If the callback is running when this called, then it will wait until that
// callback completes. (The caller of this function should not hold any
// locks acqured by the callback, in order to avoid a deadlock.)
-NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *);
+NNG_DECL void nng_unsetnotify(nng_socket, nng_notify *);
// Event types. Sockets can have multiple different kind of events.
// Note that these are edge triggered -- therefore the status indicated
@@ -125,7 +125,7 @@ NNG_DECL void nng_unsetnotify(nng_socket *, nng_notify *);
// Some of the values will not make sense for some event types, in which case
// the value returned will be NULL.
NNG_DECL int nng_event_type(nng_event *);
-NNG_DECL nng_socket *nng_event_socket(nng_event *);
+NNG_DECL nng_socket nng_event_socket(nng_event *);
NNG_DECL nng_endpoint nng_event_endpoint(nng_event *);
NNG_DECL nng_pipe nng_event_pipe(nng_event *);
NNG_DECL const char *nng_event_reason(nng_event *);
@@ -136,7 +136,7 @@ NNG_DECL const char *nng_event_reason(nng_event *);
// endpoint pointer, if it is not NULL. The flags may be NNG_FLAG_SYNCH to
// indicate that a failure setting the socket up should return an error
// back to the caller immediately.
-NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpoint *, int);
+NNG_DECL int nng_listen(nng_socket, const char *, nng_endpoint *, int);
// nng_dial creates a dialing endpoint, with no special options, and
// starts it dialing. Dialers have at most one active connection at a time
@@ -146,11 +146,11 @@ NNG_DECL int nng_listen(nng_socket *, const char *, nng_endpoint *, int);
// dial will be made synchronously, and a failure condition returned back
// to the caller. (If the connection is dropped, it will still be
// reconnected in the background -- only the initial connect is synchronous.)
-NNG_DECL int nng_dial(nng_socket *, const char *, nng_endpoint *, int);
+NNG_DECL int nng_dial(nng_socket, const char *, nng_endpoint *, int);
// nng_endpoint_create creates an endpoint on the socket, but does not
// start it either dialing or listening.
-NNG_DECL int nng_endpoint_create(nng_endpoint *, nng_socket *, const char *);
+NNG_DECL int nng_endpoint_create(nng_endpoint *, nng_socket, const char *);
// nng_endpoint_dial starts the endpoint dialing. This is only possible if
// the endpoint is not already dialing or listening.
@@ -181,7 +181,7 @@ NNG_DECL const char *nng_strerror(int);
// received the data. The return value will be zero to indicate that the
// socket has accepted the entire data for send, or an errno to indicate
// failure. The flags may include NNG_FLAG_NONBLOCK.
-NNG_DECL int nng_send(nng_socket *, const void *, size_t, int);
+NNG_DECL int nng_send(nng_socket, const void *, size_t, int);
// nng_recv receives message data into the socket, up to the supplied size.
// The actual size of the message data will be written to the value pointed
@@ -190,19 +190,19 @@ NNG_DECL int nng_send(nng_socket *, const void *, size_t, int);
// the caller. In that case the pointer to the allocated will be stored
// instead of the data itself. The caller is responsible for freeing the
// associated memory with free().
-NNG_DECL int nng_recv(nng_socket *, void *, size_t *, int);
+NNG_DECL int nng_recv(nng_socket, void *, size_t *, int);
// nng_sendmsg is like nng_send, but offers up a message structure, which
// gives the ability to provide more control over the message, including
// providing backtrace information. It also can take a message that was
// obtain via nn_recvmsg, allowing for zero copy forwarding.
-NNG_DECL int nng_sendmsg(nng_socket *, nng_msg *, int);
+NNG_DECL int nng_sendmsg(nng_socket, nng_msg *, int);
// nng_recvmsg is like nng_recv, but is used to obtain a message structure
// as well as the data buffer. This can be used to obtain more information
// about where the message came from, access raw headers, etc. It also
// can be passed off directly to nng_sendmsg.
-NNG_DECL int nng_recvmsg(nng_socket *, nng_msg **, int);
+NNG_DECL int nng_recvmsg(nng_socket, nng_msg **, int);
// Message API.
NNG_DECL int nng_msg_alloc(nng_msg **, size_t);
@@ -306,7 +306,7 @@ NNG_DECL void nng_snapshot_free(nng_snapshot *);
// relevant to a particular socket. All prior values are overwritten.
// It is acceptable to use the same snapshot object with different
// sockets.
-NNG_DECL int nng_snapshot_update(nng_socket *, nng_snapshot *);
+NNG_DECL int nng_snapshot_update(nng_socket, nng_snapshot *);
// nng_snapshot_next is used to iterate over the individual statistic
// objects inside the snapshot. Note that the statistic object, and the
@@ -356,7 +356,7 @@ NNG_DECL int64_t nng_stat_value(nng_stat *);
// Device functionality. This connects two sockets together in a device,
// which means that messages from one side are forwarded to the other.
-NNG_DECL int nng_device(nng_socket *, nng_socket *);
+NNG_DECL int nng_device(nng_socket, nng_socket);
// Pollset functionality. TBD. (Note that I'd rather avoid this
// altogether, because I believe that the notification mechanism I've