diff --git a/connectd/Makefile b/connectd/Makefile index 76cb7079f..343a3624c 100644 --- a/connectd/Makefile +++ b/connectd/Makefile @@ -5,6 +5,7 @@ CONNECTD_HEADERS := connectd/connectd_wiregen.h \ connectd/connectd.h \ connectd/peer_exchange_initmsg.h \ connectd/handshake.h \ + connectd/multiplex.h \ connectd/netaddress.h \ connectd/tor_autoservice.h \ connectd/tor.h diff --git a/connectd/connectd.c b/connectd/connectd.c index c2482df82..c53811862 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -59,13 +60,14 @@ #define INITIAL_WAIT_SECONDS 1 #define MAX_WAIT_SECONDS 300 -/*~ We keep a hash table (ccan/htable) of public keys, which tells us what - * peers are already connected. The HTABLE_DEFINE_TYPE() macro needs a - * keyof() function to extract the key. For this simple use case, that's the - * identity function: */ -static const struct node_id *node_id_keyof(const struct node_id *pc) +/*~ We keep a hash table (ccan/htable) of peers, which tells us what peers are + * already connected (by peer->id). */ + +/*~ The HTABLE_DEFINE_TYPE() macro needs a keyof() function to extract the key: + */ +static const struct node_id *peer_keyof(const struct peer *peer) { - return pc; + return &peer->id; } /*~ We also need to define a hashing function. siphash24 is a fast yet @@ -79,12 +81,19 @@ static size_t node_id_hash(const struct node_id *id) return siphash24(siphash_seed(), id->k, sizeof(id->k)); } -/*~ This defines 'struct node_set' which contains 'struct node_id' pointers. */ -HTABLE_DEFINE_TYPE(struct node_id, - node_id_keyof, +/*~ We also define an equality function: is this element equal to this key? */ +static bool peer_eq_node_id(const struct peer *peer, + const struct node_id *id) +{ + return node_id_eq(&peer->id, id); +} + +/*~ This defines 'struct peer_htable' which contains 'struct peer' pointers. */ +HTABLE_DEFINE_TYPE(struct peer, + peer_keyof, node_id_hash, - node_id_eq, - node_set); + peer_eq_node_id, + peer_htable); /*~ This is the global state, like `struct lightningd *ld` in lightningd. */ struct daemon { @@ -100,7 +109,7 @@ struct daemon { /* Peers that we've handed to `lightningd`, which it hasn't told us * have disconnected. */ - struct node_set peers; + struct peer_htable peers; /* Peers we are trying to reach */ struct list_head connecting; @@ -414,7 +423,7 @@ static struct io_plan *peer_reconnected(struct io_conn *conn, /*~ ccan/io supports waiting on an address: in this case, the key in * the peer set. When someone calls `io_wake()` on that address, it * will call retry_peer_connected above. */ - return io_wait(conn, node_set_get(&daemon->peers, id), + return io_wait(conn, peer_htable_get(&daemon->peers, id), /*~ The notleak() wrapper is a DEVELOPER-mode hack so * that our memory leak detection doesn't consider 'pr' * (which is not referenced from our code) to be a @@ -422,6 +431,49 @@ static struct io_plan *peer_reconnected(struct io_conn *conn, retry_peer_connected, notleak(pr)); } +/*~ When we free a peer, we remove it from the daemon's hashtable */ +static void destroy_peer(struct peer *peer, struct daemon *daemon) +{ + peer_htable_del(&daemon->peers, peer); +} + +/*~ This is where we create a new peer. */ +static struct peer *new_peer(struct daemon *daemon, + const struct node_id *id, + const struct crypto_state *cs, + const u8 *their_features, + struct io_conn *conn STEALS, + int *fd_for_subd) +{ + struct peer *peer = tal(daemon, struct peer); + + peer->id = *id; + peer->pps = new_per_peer_state(peer, cs); + peer->final_msg = NULL; + peer->subd_in = NULL; + peer->peer_in = NULL; + peer->sent_to_subd = NULL; + peer->sent_to_peer = NULL; + peer->peer_outq = msg_queue_new(peer); + peer->subd_outq = msg_queue_new(peer); + + /* Aim for connection to shuffle data back and forth: sets up + * peer->to_subd */ + if (!multiplex_subd_setup(peer, fd_for_subd)) + return tal_free(peer); + + /* If gossipd can't give us a file descriptor, we give up connecting. */ + if (!get_gossipfds(daemon, id, their_features, peer->pps)) { + close(*fd_for_subd); + return tal_free(peer); + } + + peer->to_peer = tal_steal(peer, conn); + peer_htable_add(&daemon->peers, peer); + tal_add_destructor2(peer, destroy_peer, daemon); + return peer; +} + /*~ Note the lack of static: this is called by peer_exchange_initmsg.c once the * INIT messages are exchanged, and also by the retry code above. */ struct io_plan *peer_connected(struct io_conn *conn, @@ -433,11 +485,13 @@ struct io_plan *peer_connected(struct io_conn *conn, bool incoming) { u8 *msg; - struct per_peer_state *pps; + struct peer *peer; int unsup; size_t depender, missing; + int subd_fd; - if (node_set_get(&daemon->peers, id)) + peer = peer_htable_get(&daemon->peers, id); + if (peer) return peer_reconnected(conn, daemon, id, addr, cs, their_features, incoming); @@ -487,35 +541,28 @@ struct io_plan *peer_connected(struct io_conn *conn, conn, find_connecting(daemon, id)->conn); /* This contains the per-peer state info; gossipd fills in pps->gs */ - pps = new_per_peer_state(tmpctx, cs); - - /* If gossipd can't give us a file descriptor, we give up connecting. */ - if (!get_gossipfds(daemon, id, their_features, pps)) + peer = new_peer(daemon, id, cs, their_features, conn, &subd_fd); + /* Only takes over conn if it succeeds. */ + if (!peer) return io_close(conn); /* Create message to tell master peer has connected. */ msg = towire_connectd_peer_connected(NULL, id, addr, incoming, - pps, their_features); + peer->pps, their_features); /*~ daemon_conn is a message queue for inter-daemon communication: we * queue up the `connect_peer_connected` message to tell lightningd * we have connected, and give the peer and gossip fds. */ daemon_conn_send(daemon->master, take(msg)); - /* io_conn_fd() extracts the fd from ccan/io's io_conn */ - daemon_conn_send_fd(daemon->master, io_conn_fd(conn)); - daemon_conn_send_fd(daemon->master, pps->gossip_fd); - daemon_conn_send_fd(daemon->master, pps->gossip_store_fd); + daemon_conn_send_fd(daemon->master, subd_fd); + daemon_conn_send_fd(daemon->master, peer->pps->gossip_fd); + daemon_conn_send_fd(daemon->master, peer->pps->gossip_store_fd); /* Don't try to close these on freeing. */ - pps->gossip_store_fd = pps->gossip_fd = -1; + peer->pps->gossip_store_fd = peer->pps->gossip_fd = -1; - /*~ Finally, we add it to the set of pubkeys: tal_dup will handle - * take() args for us, by simply tal_steal()ing it. */ - node_set_add(&daemon->peers, tal_dup(daemon, struct node_id, id)); - - /*~ We want to free the connection, but not close the fd (which is - * queued to go to lightningd), so use this variation on io_close: */ - return io_close_taken_fd(conn); + /*~ Now we set up this connection to read/write from subd */ + return multiplex_peer_setup(conn, peer); } /*~ handshake.c's handles setting up the crypto state once we get a connection @@ -1774,14 +1821,14 @@ static void add_gossip_addrs(struct wireaddr_internal **addrs, static void try_connect_peer(struct daemon *daemon, const struct node_id *id, u32 seconds_waited, - struct wireaddr_internal *addrhint) + struct wireaddr_internal *addrhint STEALS) { struct wireaddr_internal *addrs; bool use_proxy = daemon->always_use_proxy; struct connecting *connect; /* Already done? May happen with timer. */ - if (node_set_get(&daemon->peers, id)) + if (peer_htable_get(&daemon->peers, id)) return; /* If we're trying to connect it right now, that's OK. */ @@ -1874,23 +1921,24 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg) /* A peer is gone: clean things up. */ static void cleanup_dead_peer(struct daemon *daemon, const struct node_id *id) { - struct node_id *node; + struct peer *peer; /* We should stay in sync with lightningd at all times. */ - node = node_set_get(&daemon->peers, id); - if (!node) + peer = peer_htable_get(&daemon->peers, id); + if (!peer) status_failed(STATUS_FAIL_INTERNAL_ERROR, "peer_disconnected unknown peer: %s", type_to_string(tmpctx, struct node_id, id)); - node_set_del(&daemon->peers, node); status_peer_debug(id, "disconnect"); /* Wake up in case there's a reconnecting peer waiting in io_wait. */ - io_wake(node); + io_wake(peer); /* Note: deleting from a htable (a-la node_set_del) does not free it: - * htable doesn't assume it's a tal object at all. */ - tal_free(node); + * htable doesn't assume it's a tal object at all. That's why we have + * a destructor attached to peer (called destroy_peer by + * convention). */ + tal_free(peer); } /* lightningd tells us a peer has disconnected. */ @@ -1904,40 +1952,20 @@ static void peer_disconnected(struct daemon *daemon, const u8 *msg) cleanup_dead_peer(daemon, &id); } -/* lightningd tells us to send a final (usually error) message to peer, then - * disconnect. */ -struct final_msg_data { - struct daemon *daemon; - struct node_id id; -}; - -static void destroy_final_msg_data(struct final_msg_data *f) -{ - cleanup_dead_peer(f->daemon, &f->id); -} - -static struct io_plan *send_final_msg(struct io_conn *conn, u8 *msg) -{ - return io_write(conn, msg, tal_bytelen(msg), io_close_cb, NULL); -} - /* lightningd tells us to send a msg and disconnect. */ static void peer_final_msg(struct io_conn *conn, struct daemon *daemon, const u8 *msg) { + struct peer *peer; struct per_peer_state *pps; - struct final_msg_data *f = tal(NULL, struct final_msg_data); + struct node_id id; u8 *finalmsg; int fds[3]; - f->daemon = daemon; /* pps is allocated off f, so fds are closed when f freed. */ - if (!fromwire_connectd_peer_final_msg(f, msg, &f->id, &pps, &finalmsg)) + if (!fromwire_connectd_peer_final_msg(tmpctx, msg, &id, &pps, &finalmsg)) master_badmsg(WIRE_CONNECTD_PEER_FINAL_MSG, msg); - /* When f is freed, we want to mark node as dead. */ - tal_add_destructor(f, destroy_final_msg_data); - /* Get the fds for this peer. */ io_fd_block(io_conn_fd(conn), true); for (size_t i = 0; i < ARRAY_SIZE(fds); i++) { @@ -1949,16 +1977,20 @@ static void peer_final_msg(struct io_conn *conn, } io_fd_block(io_conn_fd(conn), false); + /* Close fd to ourselves. */ + close(fds[0]); + /* We put peer fd into conn, but pps needs to free the rest */ per_peer_state_set_fds(pps, -1, fds[1], fds[2]); - /* Log and encrypt message for peer. */ - status_peer_io(LOG_IO_OUT, &f->id, finalmsg); - finalmsg = cryptomsg_encrypt_msg(f, &pps->cs, take(finalmsg)); - - /* Organize io loop to write out that message, it will free f - * once closed */ - tal_steal(io_new_conn(daemon, fds[0], send_final_msg, finalmsg), f); + /* This can happen if peer hung up on us. */ + peer = peer_htable_get(&daemon->peers, &id); + if (peer) { + /* Log and encrypt message for peer. */ + status_peer_io(LOG_IO_OUT, &id, finalmsg); + finalmsg = cryptomsg_encrypt_msg(NULL, &pps->cs, take(finalmsg)); + multiplex_final_msg(peer, take(finalmsg)); + } } #if DEVELOPER @@ -1971,6 +2003,7 @@ static void dev_connect_memleak(struct daemon *daemon, const u8 *msg) /* Now delete daemon and those which it has pointers to. */ memleak_remove_region(memtable, daemon, sizeof(daemon)); + memleak_remove_htable(memtable, &daemon->peers.raw); found_leak = dump_memleak(memtable, memleak_status_broken); daemon_conn_send(daemon->master, @@ -2064,7 +2097,7 @@ int main(int argc, char *argv[]) /* Allocate and set up our simple top-level structure. */ daemon = tal(NULL, struct daemon); - node_set_init(&daemon->peers); + peer_htable_init(&daemon->peers); memleak_add_helper(daemon, memleak_daemon_cb); list_head_init(&daemon->connecting); daemon->listen_fds = tal_arr(daemon, struct listen_fd, 0); diff --git a/connectd/multiplex.c b/connectd/multiplex.c new file mode 100644 index 000000000..5ae912671 --- /dev/null +++ b/connectd/multiplex.c @@ -0,0 +1,212 @@ +/*~ This contains all the code to shuffle data between socket to the peer + * itself, and the subdaemons. */ +#include "config.h" +#include +#include +#include +#include +#include +#include +#include +#include + +/* These four function handle subd->peer */ +static struct io_plan *after_final_msg(struct io_conn *peer_conn, + struct peer *peer) +{ + /* io_close will want to free this itself! */ + assert(peer->to_peer == peer_conn); + + /* Invert ownership, so io_close frees peer for us */ + tal_steal(NULL, peer_conn); + tal_steal(peer_conn, peer); + + return io_close(peer_conn); +} + +static struct io_plan *write_to_peer(struct io_conn *peer_conn, + struct peer *peer) +{ + assert(peer->to_peer == peer_conn); + + /* Free last sent one (if any) */ + tal_free(peer->sent_to_peer); + + /* Pop tail of send queue */ + peer->sent_to_peer = msg_dequeue(peer->peer_outq); + + /* Nothing to send? */ + if (!peer->sent_to_peer) { + /* Send final once subd is not longer connected */ + if (peer->final_msg && !peer->to_subd) { + return io_write(peer_conn, + peer->final_msg, + tal_bytelen(peer->final_msg), + after_final_msg, peer); + } + /* Tell them to read again, */ + io_wake(&peer->subd_in); + + /* Wait for them to wake us */ + return msg_queue_wait(peer_conn, peer->peer_outq, + write_to_peer, peer); + } + + return io_write(peer_conn, + peer->sent_to_peer, + tal_bytelen(peer->sent_to_peer), + write_to_peer, peer); +} + +static struct io_plan *read_from_subd(struct io_conn *subd_conn, + struct peer *peer); +static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, + struct peer *peer) +{ + size_t len = ((size_t *)peer->subd_in)[1023]; + assert(peer->to_subd == subd_conn); + + /* Trim to length */ + tal_resize(&peer->subd_in, len); + + /* Tell them to write. */ + msg_enqueue(peer->peer_outq, take(peer->subd_in)); + peer->subd_in = NULL; + /* Wait for them to wake us */ + return io_wait(subd_conn, &peer->subd_in, read_from_subd, peer); +} + +static struct io_plan *read_from_subd(struct io_conn *subd_conn, + struct peer *peer) +{ + /* We stash the length at the end */ + size_t *buf = tal_arr(peer, size_t, 1024); + assert(peer->to_subd == subd_conn); + + peer->subd_in = (u8 *)buf; + return io_read_partial(subd_conn, peer->subd_in, + sizeof(size_t) * 1023, + &buf[1023], + read_from_subd_done, peer); +} + +/* These four function handle peer->subd */ +static struct io_plan *write_to_subd(struct io_conn *subd_conn, + struct peer *peer) +{ + assert(peer->to_subd == subd_conn); + + /* Free last sent one (if any) */ + tal_free(peer->sent_to_subd); + + /* Pop tail of send queue */ + peer->sent_to_subd = msg_dequeue(peer->subd_outq); + + /* Nothing to send? */ + if (!peer->sent_to_subd) { + /* Tell them to read again, */ + io_wake(&peer->peer_in); + + /* Wait for them to wake us */ + return msg_queue_wait(subd_conn, peer->subd_outq, + write_to_subd, peer); + } + + return io_write(subd_conn, + peer->sent_to_subd, + tal_bytelen(peer->sent_to_subd), + write_to_subd, peer); +} + +static struct io_plan *read_from_peer(struct io_conn *peer_conn, + struct peer *peer); +static struct io_plan *read_from_peer_done(struct io_conn *peer_conn, + struct peer *peer) +{ + size_t len = ((size_t *)peer->peer_in)[1023]; + assert(peer->to_peer == peer_conn); + + /* Trim to length */ + tal_resize(&peer->peer_in, len); + + /* Tell them to write. */ + msg_enqueue(peer->subd_outq, take(peer->peer_in)); + peer->peer_in = NULL; + /* Wait for them to wake us */ + return io_wait(peer_conn, &peer->peer_in, read_from_peer, peer); +} + +static struct io_plan *read_from_peer(struct io_conn *peer_conn, + struct peer *peer) +{ + /* We stash the length at the end */ + size_t *buf = tal_arr(peer, size_t, 1024); + assert(peer->to_peer == peer_conn); + + peer->peer_in = (u8 *)buf; + return io_read_partial(peer_conn, peer->peer_in, + sizeof(size_t) * 1023, + &buf[1023], + read_from_peer_done, peer); +} + +static struct io_plan *subd_conn_init(struct io_conn *subd_conn, struct peer *peer) +{ + peer->to_subd = subd_conn; + return io_duplex(subd_conn, + read_from_subd(subd_conn, peer), + write_to_subd(subd_conn, peer)); +} + +static void destroy_subd_conn(struct io_conn *subd_conn, struct peer *peer) +{ + assert(subd_conn == peer->to_subd); + peer->to_subd = NULL; + /* In case they were waiting for this to send final_msg */ + if (peer->final_msg) + msg_wake(peer->peer_outq); +} + +bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd) +{ + int fds[2]; + + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fds) != 0) { + status_broken("Failed to create socketpair: %s", + strerror(errno)); + return false; + } + peer->to_subd = io_new_conn(peer, fds[0], subd_conn_init, peer); + tal_add_destructor2(peer->to_subd, destroy_subd_conn, peer); + *fd_for_subd = fds[1]; + return true; +} + +static void destroy_peer_conn(struct io_conn *peer_conn, struct peer *peer) +{ + assert(peer->to_peer == peer_conn); + peer->to_peer = NULL; + + /* Close internal connections if not already. */ + if (peer->to_subd) + io_close(peer->to_subd); +} + +struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, + struct peer *peer) +{ + /*~ If conn closes, we close the subd connections and wait for + * lightningd to tell us to close with the peer */ + tal_add_destructor2(peer_conn, destroy_peer_conn, peer); + + return io_duplex(peer_conn, + read_from_peer(peer_conn, peer), + write_to_peer(peer_conn, peer)); +} + +void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES) +{ + peer->final_msg = tal_dup_talarr(peer, u8, final_msg); + if (!peer->to_subd) + io_wake(peer->peer_outq); +} diff --git a/connectd/multiplex.h b/connectd/multiplex.h new file mode 100644 index 000000000..84be1f008 --- /dev/null +++ b/connectd/multiplex.h @@ -0,0 +1,41 @@ +#ifndef LIGHTNING_CONNECTD_MULTIPLEX_H +#define LIGHTNING_CONNECTD_MULTIPLEX_H +#include "config.h" +#include +#include +#include + +struct peer { + struct node_id id; + struct per_peer_state *pps; + + /* Connection to the peer */ + struct io_conn *to_peer; + + /* Connection to the subdaemon */ + struct io_conn *to_subd; + + /* Final message to send to peer (and hangup) */ + u8 *final_msg; + + /* Input buffers. */ + u8 *subd_in, *peer_in; + + /* Output buffers. */ + struct msg_queue *subd_outq, *peer_outq; + + /* Sent buffers (for freeing after sending) */ + const u8 *sent_to_subd, *sent_to_peer; +}; + +/* Set up peer->to_subd; sets fd_for_subd to pass to lightningd. */ +bool multiplex_subd_setup(struct peer *peer, int *fd_for_subd); + +/* Take over peer_conn as peer->to_peer */ +struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, + struct peer *peer); + +/* Send this message to peer and disconnect. */ +void multiplex_final_msg(struct peer *peer, + const u8 *final_msg TAKES); +#endif /* LIGHTNING_CONNECTD_MULTIPLEX_H */