diff --git a/connectd/connect.c b/connectd/connect.c index a5e730313..7cd4e4f4e 100644 --- a/connectd/connect.c +++ b/connectd/connect.c @@ -196,31 +196,13 @@ struct reaching { const char *connstate; }; -/* Things we need when we're talking direct to the peer. */ -struct local_peer_state { - /* Cryptostate */ - struct peer_crypto_state pcs; - - /* File descriptor corresponding to conn. */ - int fd; - - /* File descriptor for talking to gossipd. */ - int gossip_fd; - - /* Our connection (and owner) */ - struct io_conn *conn; - - /* Message queue for outgoing. */ - struct msg_queue peer_out; -}; - struct peer { struct daemon *daemon; /* For reconnecting peers, this is in daemon->reconnecting. */ struct list_node list; - /* The ID of the peer (not necessarily unique, in transit!) */ + /* The ID of the peer */ struct pubkey id; /* Where it's connected to. */ @@ -229,7 +211,11 @@ struct peer { /* Feature bitmaps. */ u8 *gfeatures, *lfeatures; - struct local_peer_state *local; + /* Cryptostate */ + struct peer_crypto_state pcs; + + /* Our connection (and owner) */ + struct io_conn *conn; }; struct addrhint { @@ -242,7 +228,6 @@ struct addrhint { }; /* FIXME: Reorder */ -static void send_peer_with_fds(struct peer *peer, const u8 *msg); static void retry_important(struct important_peerid *imp); static struct peer *find_reconnecting_peer(struct daemon *daemon, @@ -261,7 +246,7 @@ static void destroy_reconnecting_peer(struct peer *peer) list_del_from(&peer->daemon->reconnecting, &peer->list); /* This is safe even if we're being destroyed because of peer->conn, * since tal_free protects against loops. */ - io_close(peer->local->conn); + io_close(peer->conn); } static void add_reconnecting_peer(struct daemon *daemon, struct peer *peer) @@ -290,18 +275,6 @@ static struct addrhint *find_addrhint(struct daemon *daemon, return NULL; } -static struct local_peer_state * -new_local_peer_state(struct peer *peer, const struct crypto_state *cs) -{ - struct local_peer_state *lps = tal(peer, struct local_peer_state); - - init_peer_crypto_state(peer, &lps->pcs); - lps->pcs.cs = *cs; - msg_queue_init(&lps->peer_out, lps); - - return lps; -} - /** * Some ISP resolvers will reply with a dummy IP to queries that would otherwise * result in an NXDOMAIN reply. This just checks whether we have one such @@ -344,7 +317,8 @@ static struct peer *new_peer(const tal_t *ctx, peer->id = *their_id; peer->addr = *addr; peer->daemon = daemon; - peer->local = new_local_peer_state(peer, cs); + init_peer_crypto_state(peer, &peer->pcs); + peer->pcs.cs = *cs; return peer; } @@ -390,16 +364,6 @@ static void reached_peer(struct peer *peer, struct io_conn *conn) tal_free(r); } -static void queue_peer_msg(struct peer *peer, const u8 *msg TAKES) -{ - if (peer->local) { - msg_enqueue(&peer->local->peer_out, msg); - } else { /* Waiting to die. */ - if (taken(msg)) - tal_free(msg); - } -} - static int get_gossipfd(struct peer *peer) { bool gossip_queries_feature, initial_routing_sync, success; @@ -433,17 +397,6 @@ static int get_gossipfd(struct peer *peer) return fdpass_recv(GOSSIPCTL_FD); } -static bool is_all_channel_error(const u8 *msg) -{ - struct channel_id channel_id; - u8 *data; - - if (!fromwire_error(msg, msg, &channel_id, &data)) - return false; - tal_free(data); - return channel_id_is_all(&channel_id); -} - static struct io_plan *peer_close_after_error(struct io_conn *conn, struct peer *peer) { @@ -469,31 +422,40 @@ static struct io_plan *retry_peer_connected(struct io_conn *conn, static struct io_plan *peer_connected(struct io_conn *conn, struct peer *peer) { + struct daemon *daemon = peer->daemon; u8 *msg; + int gossip_fd; /* FIXME: We could do this before exchanging init msgs. */ - if (pubkey_set_get(&peer->daemon->peers, &peer->id)) { + if (pubkey_set_get(&daemon->peers, &peer->id)) { status_trace("peer %s: reconnect", type_to_string(tmpctx, struct pubkey, &peer->id)); /* Tell master to kill it: will send peer_disconnect */ msg = towire_connect_reconnected(NULL, &peer->id); - daemon_conn_send(&peer->daemon->master, take(msg)); - add_reconnecting_peer(peer->daemon, peer); + daemon_conn_send(&daemon->master, take(msg)); + add_reconnecting_peer(daemon, peer); return io_wait(conn, peer, retry_peer_connected, peer); } reached_peer(peer, conn); - peer->local->gossip_fd = get_gossipfd(peer); - if (peer->local->gossip_fd < 0) + gossip_fd = get_gossipfd(peer); + if (gossip_fd < 0) return io_close(conn); - /* We will not have anything queued, since we're not duplex. */ - msg = towire_connect_peer_connected(peer, &peer->id, &peer->addr, - &peer->local->pcs.cs, - peer->gfeatures, peer->lfeatures); - send_peer_with_fds(peer, msg); + msg = towire_connect_peer_connected(tmpctx, &peer->id, &peer->addr, + &peer->pcs.cs, + peer->gfeatures, peer->lfeatures); + daemon_conn_send(&daemon->master, msg); + daemon_conn_send_fd(&daemon->master, io_conn_fd(conn)); + daemon_conn_send_fd(&daemon->master, gossip_fd); + + pubkey_set_add(&daemon->peers, + tal_dup(daemon, struct pubkey, &peer->id)); + + /* We keep peer around until master says peer_disconnected */ + tal_steal(daemon, peer); return io_close_taken_fd(conn); } @@ -523,10 +485,8 @@ static struct io_plan *peer_init_received(struct io_conn *conn, tal_hexstr(msg, local_features, tal_count(local_features))); - queue_peer_msg(peer, take(msg)); - - /* Don't read any more */ - return io_wait(conn, peer, io_never, peer); + return peer_write_message(conn, &peer->pcs, take(msg), + peer_close_after_error); } return peer_connected(conn, peer); @@ -539,7 +499,7 @@ static struct io_plan *read_init(struct io_conn *conn, struct peer *peer) * The receiving node: * - MUST wait to receive `init` before sending any other messages. */ - return peer_read_message(conn, &peer->local->pcs, peer_init_received); + return peer_read_message(conn, &peer->pcs, peer_init_received); } /* This creates a temporary peer which is not in the list and is owner @@ -554,8 +514,6 @@ static struct io_plan *init_new_peer(struct io_conn *conn, struct peer *peer = new_peer(conn, daemon, their_id, addr, cs); u8 *initmsg; - peer->local->fd = io_conn_fd(conn); - /* BOLT #1: * * The sending node: @@ -564,39 +522,7 @@ static struct io_plan *init_new_peer(struct io_conn *conn, */ initmsg = towire_init(NULL, daemon->globalfeatures, daemon->localfeatures); - return peer_write_message(conn, &peer->local->pcs, - take(initmsg), read_init); -} - -static struct io_plan *peer_pkt_out(struct io_conn *conn, struct peer *peer) -{ - /* First priority is queued packets, if any */ - const u8 *out; - - assert(peer->local); - - out = msg_dequeue(&peer->local->peer_out); - if (out) { - if (is_all_channel_error(out)) - return peer_write_message(conn, &peer->local->pcs, - take(out), - peer_close_after_error); - return peer_write_message(conn, &peer->local->pcs, take(out), - peer_pkt_out); - } - - return msg_queue_wait(conn, &peer->local->peer_out, peer_pkt_out, peer); -} - -/* When a peer is to be owned by another daemon: will be freed by caller */ -static void send_peer_with_fds(struct peer *peer, const u8 *msg) -{ - daemon_conn_send(&peer->daemon->master, msg); - daemon_conn_send_fd(&peer->daemon->master, peer->local->fd); - daemon_conn_send_fd(&peer->daemon->master, peer->local->gossip_fd); - - pubkey_set_add(&peer->daemon->peers, - tal_dup(peer->daemon, struct pubkey, &peer->id)); + return peer_write_message(conn, &peer->pcs, take(initmsg), read_init); } static int make_listen_fd(int domain, void *addr, socklen_t len, bool mayfail)