subdaemons: remove gossipd fd from per-peer daemons.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
This commit is contained in:
Rusty Russell
2022-01-29 14:03:05 +10:30
parent 1abbc3d06a
commit 3c5d27e3e9
27 changed files with 120 additions and 342 deletions

View File

@@ -45,9 +45,9 @@
#include <wire/peer_wire.h>
#include <wire/wire_sync.h>
/* stdin == requests, 3 == peer, 4 = gossip, 5 = HSM */
/* stdin == requests, 3 == peer, 4 = HSM */
#define MASTER_FD STDIN_FILENO
#define HSM_FD 5
#define HSM_FD 4
struct peer {
struct per_peer_state *pps;
@@ -2132,13 +2132,12 @@ static void peer_in(struct peer *peer, const u8 *msg)
{
enum peer_wire type = fromwire_peektype(msg);
if (handle_peer_gossip_or_error(peer->pps, &peer->channel_id, msg))
if (handle_peer_error(peer->pps, &peer->channel_id, msg))
return;
/* Must get funding_locked before almost anything. */
if (!peer->funding_locked[REMOTE]) {
if (type != WIRE_FUNDING_LOCKED
&& type != WIRE_PONG
&& type != WIRE_SHUTDOWN
/* We expect these for v2 !! */
&& type != WIRE_TX_SIGNATURES
@@ -2215,7 +2214,7 @@ static void peer_in(struct peer *peer, const u8 *msg)
handle_unexpected_reestablish(peer, msg);
return;
/* These are all swallowed by handle_peer_gossip_or_error */
/* These are all swallowed by connectd */
case WIRE_CHANNEL_ANNOUNCEMENT:
case WIRE_CHANNEL_UPDATE:
case WIRE_NODE_ANNOUNCEMENT:
@@ -2798,7 +2797,7 @@ skip_tlvs:
do {
clean_tmpctx();
msg = peer_read(tmpctx, peer->pps);
} while (handle_peer_gossip_or_error(peer->pps, &peer->channel_id, msg) ||
} while (handle_peer_error(peer->pps, &peer->channel_id, msg) ||
capture_premature_msg(&premature_msgs, msg));
got_reestablish:
@@ -3752,9 +3751,9 @@ static void init_channel(struct peer *peer)
tal_dup(peer, struct penalty_base, &pbases[i]));
tal_free(pbases);
/* stdin == requests, 3 == peer, 4 = gossip */
/* stdin == requests, 3 == peer */
peer->pps = new_per_peer_state(peer);
per_peer_state_set_fds(peer->pps, 3, 4);
per_peer_state_set_fd(peer->pps, 3);
status_debug("init %s: remote_per_commit = %s, old_remote_per_commit = %s"
" next_idx_local = %"PRIu64
@@ -3895,11 +3894,10 @@ int main(int argc, char *argv[])
FD_ZERO(&fds_in);
FD_SET(MASTER_FD, &fds_in);
FD_SET(peer->pps->peer_fd, &fds_in);
FD_SET(peer->pps->gossip_fd, &fds_in);
FD_ZERO(&fds_out);
FD_SET(peer->pps->peer_fd, &fds_out);
nfds = peer->pps->gossip_fd+1;
nfds = peer->pps->peer_fd+1;
while (!shutdown_complete(peer)) {
struct timemono first;
@@ -3958,13 +3956,6 @@ int main(int argc, char *argv[])
/* This could take forever, but who cares? */
msg = peer_read(tmpctx, peer->pps);
peer_in(peer, msg);
} else if (FD_ISSET(peer->pps->gossip_fd, &rfds)) {
msg = wire_sync_read(tmpctx, peer->pps->gossip_fd);
/* Gossipd hangs up on us to kill us when a new
* connection comes in. */
if (!msg)
peer_failed_connection_lost();
handle_gossip_msg(peer->pps, take(msg));
}
}

View File

@@ -29,9 +29,9 @@
#include <wire/peer_wire.h>
#include <wire/wire_sync.h>
/* stdin == requests, 3 == peer, 4 = gossip, 5 = hsmd */
/* stdin == requests, 3 == peer, 4 = hsmd */
#define REQ_FD STDIN_FILENO
#define HSM_FD 5
#define HSM_FD 4
static void notify(enum log_level level, const char *fmt, ...)
{
@@ -117,15 +117,10 @@ static u8 *closing_read_peer_msg(const tal_t *ctx,
{
for (;;) {
u8 *msg;
bool from_gossipd;
clean_tmpctx();
msg = peer_or_gossip_sync_read(ctx, pps, &from_gossipd);
if (from_gossipd) {
handle_gossip_msg(pps, take(msg));
continue;
}
if (!handle_peer_gossip_or_error(pps, channel_id, msg))
msg = peer_read(ctx, pps);
if (!handle_peer_error(pps, channel_id, msg))
return msg;
}
}
@@ -892,9 +887,9 @@ int main(int argc, char *argv[])
&wrong_funding))
master_badmsg(WIRE_CLOSINGD_INIT, msg);
/* stdin == requests, 3 == peer, 4 = gossip, 5 = hsmd */
/* stdin == requests, 3 == peer, 4 = hsmd */
pps = notleak(new_per_peer_state(ctx));
per_peer_state_set_fds(pps, 3, 4);
per_peer_state_set_fd(pps, 3);
funding_wscript = bitcoin_redeem_2of2(ctx,
&funding_pubkey[LOCAL],

View File

@@ -19,7 +19,6 @@ peer_fatal_continue(const u8 *msg TAKES, const struct per_peer_state *pps)
status_send(msg);
status_send_fd(pps->peer_fd);
status_send_fd(pps->gossip_fd);
exit(0x80 | (reason & 0xFF));
}

View File

@@ -12,39 +12,25 @@ static void destroy_per_peer_state(struct per_peer_state *pps)
{
if (pps->peer_fd != -1)
close(pps->peer_fd);
if (pps->gossip_fd != -1)
close(pps->gossip_fd);
}
struct per_peer_state *new_per_peer_state(const tal_t *ctx)
{
struct per_peer_state *pps = tal(ctx, struct per_peer_state);
pps->peer_fd = pps->gossip_fd = -1;
pps->peer_fd = -1;
tal_add_destructor(pps, destroy_per_peer_state);
return pps;
}
void per_peer_state_set_fds(struct per_peer_state *pps,
int peer_fd, int gossip_fd)
void per_peer_state_set_fd(struct per_peer_state *pps, int peer_fd)
{
assert(pps->peer_fd == -1);
assert(pps->gossip_fd == -1);
pps->peer_fd = peer_fd;
pps->gossip_fd = gossip_fd;
}
void per_peer_state_set_fds_arr(struct per_peer_state *pps, const int *fds)
{
/* We expect 2 fds. */
assert(tal_count(fds) == 2);
per_peer_state_set_fds(pps, fds[0], fds[1]);
}
void per_peer_state_fdpass_send(int fd, const struct per_peer_state *pps)
{
assert(pps->peer_fd != -1);
assert(pps->gossip_fd != -1);
fdpass_send(fd, pps->peer_fd);
fdpass_send(fd, pps->gossip_fd);
}

View File

@@ -9,19 +9,15 @@
/* Things we hand between daemons to talk to peers. */
struct per_peer_state {
/* If not -1, closed on freeing */
int peer_fd, gossip_fd;
int peer_fd;
};
/* Allocate a new per-peer state and add destructor to close fds if set;
* sets fds to -1. */
* sets peer_fd to -1. */
struct per_peer_state *new_per_peer_state(const tal_t *ctx);
/* Initialize the fds (must be -1 previous) */
void per_peer_state_set_fds(struct per_peer_state *pps,
int peer_fd, int gossip_fd);
/* Array version of above: tal_count(fds) must be 2 */
void per_peer_state_set_fds_arr(struct per_peer_state *pps, const int *fds);
void per_peer_state_set_fd(struct per_peer_state *pps, int peer_fd);
void per_peer_state_fdpass_send(int fd, const struct per_peer_state *pps);

View File

@@ -12,39 +12,6 @@
#include <wire/peer_wire.h>
#include <wire/wire_sync.h>
u8 *peer_or_gossip_sync_read(const tal_t *ctx,
struct per_peer_state *pps,
bool *from_gossipd)
{
fd_set readfds;
u8 *msg;
FD_ZERO(&readfds);
FD_SET(pps->peer_fd, &readfds);
FD_SET(pps->gossip_fd, &readfds);
if (select(pps->peer_fd > pps->gossip_fd
? pps->peer_fd + 1 : pps->gossip_fd + 1,
&readfds, NULL, NULL, NULL) <= 0) {
status_failed(STATUS_FAIL_GOSSIP_IO,
"select failed?: %s", strerror(errno));
}
if (FD_ISSET(pps->peer_fd, &readfds)) {
msg = peer_read(ctx, pps);
*from_gossipd = false;
return msg;
}
msg = wire_sync_read(ctx, pps->gossip_fd);
if (!msg)
status_failed(STATUS_FAIL_GOSSIP_IO,
"Error reading gossip msg: %s",
strerror(errno));
*from_gossipd = true;
return msg;
}
bool is_peer_error(const tal_t *ctx, const u8 *msg,
const struct channel_id *channel_id,
char **desc, bool *warning)
@@ -94,70 +61,23 @@ bool is_wrong_channel(const u8 *msg, const struct channel_id *expected,
return !channel_id_eq(expected, actual);
}
void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES)
{
u8 *gossip;
/* It's a raw gossip msg: this copies or takes() */
gossip = tal_dup_talarr(tmpctx, u8, msg);
/* Gossipd can send us gossip messages, OR warnings */
if (fromwire_peektype(gossip) == WIRE_WARNING) {
peer_write(pps, gossip);
peer_failed_connection_lost();
} else {
peer_write(pps, gossip);
}
}
bool handle_peer_gossip_or_error(struct per_peer_state *pps,
bool handle_peer_error(struct per_peer_state *pps,
const struct channel_id *channel_id,
const u8 *msg TAKES)
{
char *err;
bool warning;
u8 *pong;
#if DEVELOPER
/* Any odd-typed unknown message is handled by the caller, so if we
* find one here it's an error. */
assert(!is_unknown_msg_discardable(msg));
#else
/* BOLT #1:
*
* A receiving node:
* - upon receiving a message of _odd_, unknown type:
* - MUST ignore the received message.
*/
if (is_unknown_msg_discardable(msg))
goto handled;
#endif
if (check_ping_make_pong(NULL, msg, &pong)) {
if (pong)
peer_write(pps, take(pong));
return true;
} else if (is_msg_for_gossipd(msg)) {
wire_sync_write(pps->gossip_fd, msg);
/* wire_sync_write takes, so don't take again. */
return true;
}
if (is_peer_error(tmpctx, msg, channel_id, &err, &warning)) {
/* Ignore unknown channel errors. */
if (!err)
goto handled;
/* We hang up when a warning is received. */
peer_failed_received_errmsg(pps, err, channel_id, warning);
goto handled;
}
return false;
handled:
if (!err) {
if (taken(msg))
tal_free(msg);
return true;
}
/* We hang up when a warning is received. */
peer_failed_received_errmsg(pps, err, channel_id, warning);
}
return false;
}

View File

@@ -8,25 +8,6 @@ struct crypto_state;
struct channel_id;
struct per_peer_state;
/**
* peer_or_gossip_sync_read - read a peer message, or maybe a gossip msg.
* @ctx: context to allocate return packet from.
* @pps: the per-peer peer state and fds
* @from_gossipd: true if the msg was from gossipd, otherwise false.
*
* Will call peer_failed_connection_lost() or
* status_failed(STATUS_FAIL_GOSSIP_IO) or return a message.
*
* Usually, you should call handle_gossip_msg if *@from_gossipd is
* true, otherwise if is_peer_error() handle the error, otherwise if
* is_msg_for_gossipd() then send to gossipd, otherwise if is
* is_wrong_channel() send that as a reply. Otherwise it should be
* a valid message.
*/
u8 *peer_or_gossip_sync_read(const tal_t *ctx,
struct per_peer_state *pps,
bool *from_gossipd);
/**
* is_peer_error - if it's an error, describe if it applies to this channel.
* @ctx: context to allocate return from.
@@ -55,26 +36,15 @@ bool is_wrong_channel(const u8 *msg, const struct channel_id *expected,
/**
* handle_peer_gossip_or_error - simple handler for all the above cases.
* handle_peer_error - simple handler for errors
* @pps: per-peer state.
* @channel_id: the channel id of the current channel.
* @msg: the peer message (only taken if returns true).
*
* This returns true if it handled the packet: a gossip packet (forwarded
* to gossipd), or an error packet (causes peer_failed_received_errmsg or
* ignored), or a ping (may reply with pong).
* This returns true if it handled the packet.
*/
bool handle_peer_gossip_or_error(struct per_peer_state *pps,
bool handle_peer_error(struct per_peer_state *pps,
const struct channel_id *channel_id,
const u8 *msg TAKES);
/**
* handle_timestamp_filter - deal with timestamp filter requests.
* @pps: per-peer state.
* @msg: the peer message (only taken if returns true).
*/
bool handle_timestamp_filter(struct per_peer_state *pps, const u8 *msg TAKES);
/* We got this message from gossipd: forward/quit as it asks. */
void handle_gossip_msg(struct per_peer_state *pps, const u8 *msg TAKES);
#endif /* LIGHTNING_COMMON_READ_PEER_MSG_H */

View File

@@ -336,6 +336,8 @@ static struct io_plan *peer_reconnected(struct io_conn *conn,
/*~ When we free a peer, we remove it from the daemon's hashtable */
static void destroy_peer(struct peer *peer, struct daemon *daemon)
{
if (peer->gossip_fd >= 0)
close(peer->gossip_fd);
peer_htable_del(&daemon->peers, peer);
}
@@ -395,7 +397,7 @@ struct io_plan *peer_connected(struct io_conn *conn,
struct peer *peer;
int unsup;
size_t depender, missing;
int subd_fd, gossip_fd;
int subd_fd;
peer = peer_htable_get(&daemon->peers, id);
if (peer)
@@ -454,8 +456,8 @@ struct io_plan *peer_connected(struct io_conn *conn,
return io_close(conn);
/* If gossipd can't give us a file descriptor, we give up connecting. */
gossip_fd = get_gossipfd(daemon, id, their_features);
if (gossip_fd < 0) {
peer->gossip_fd = get_gossipfd(daemon, id, their_features);
if (peer->gossip_fd < 0) {
close(subd_fd);
return tal_free(peer);
}
@@ -469,10 +471,9 @@ struct io_plan *peer_connected(struct io_conn *conn,
/*~ daemon_conn is a message queue for inter-daemon communication: we
* queue up the `connect_peer_connected` message to tell lightningd
* we have connected, and give the peer and gossip fds. */
* we have connected, and give the peer fd. */
daemon_conn_send(daemon->master, take(msg));
daemon_conn_send_fd(daemon->master, subd_fd);
daemon_conn_send_fd(daemon->master, gossip_fd);
/*~ Now we set up this connection to read/write from subd */
return multiplex_peer_setup(conn, peer);
@@ -1894,20 +1895,19 @@ static void peer_final_msg(struct io_conn *conn,
struct peer *peer;
struct node_id id;
u8 *finalmsg;
int peer_fd;
if (!fromwire_connectd_peer_final_msg(tmpctx, msg, &id, &finalmsg))
master_badmsg(WIRE_CONNECTD_PEER_FINAL_MSG, msg);
/* Get the peer_fd and gossip_fd for this peer: we don't need them. */
/* Get the peer_fd for this peer: we don't need it though! */
io_fd_block(io_conn_fd(conn), true);
for (size_t i = 0; i < 2; i++) {
int fd = fdpass_recv(io_conn_fd(conn));
if (fd == -1)
peer_fd = fdpass_recv(io_conn_fd(conn));
if (peer_fd == -1)
status_failed(STATUS_FAIL_MASTER_IO,
"Getting fd %zu after peer_final_msg: %s",
i, strerror(errno));
close(fd);
}
"Getting peer fd after peer_final_msg: %s",
strerror(errno));
close(peer_fd);
io_fd_block(io_conn_fd(conn), false);
/* This can happen if peer hung up on us. */

View File

@@ -81,6 +81,9 @@ struct peer {
/* Random ping timer, to detect dead connections. */
struct oneshot *ping_timer;
/* FIXME: remove! */
int gossip_fd;
#if DEVELOPER
bool dev_read_enabled;
/* If non-NULL, this counts down; 0 means disable */

View File

@@ -60,7 +60,7 @@ msgdata,connectd_connect_failed,failreason,wirestring,
msgdata,connectd_connect_failed,seconds_to_delay,u32,
msgdata,connectd_connect_failed,addrhint,?wireaddr_internal,
# Connectd -> master: we got a peer. Three fds: peer, gossip and gossip_store
# Connectd -> master: we got a peer. Plus fd for peer daemon
msgtype,connectd_peer_connected,2002
msgdata,connectd_peer_connected,id,node_id,
msgdata,connectd_peer_connected,addr,wireaddr_internal,
@@ -72,7 +72,7 @@ msgdata,connectd_peer_connected,features,u8,flen
msgtype,connectd_peer_disconnected,2015
msgdata,connectd_peer_disconnected,id,node_id,
# master -> connectd: give message to peer and disconnect. Three fds: peer, gossip and gossip_store
# master -> connectd: give message to peer and disconnect. Plus fd for peer
msgtype,connectd_peer_final_msg,2003
msgdata,connectd_peer_final_msg,id,node_id,
msgdata,connectd_peer_final_msg,len,u16,
1 #include <bitcoin/block.h>
60 msgdata,connectd_peer_connected,flen,u16,
61 msgdata,connectd_peer_connected,features,u8,flen
62 # master -> connectd: peer has disconnected.
63 msgtype,connectd_peer_disconnected,2015
64 msgdata,connectd_peer_disconnected,id,node_id,
65 # master -> connectd: give message to peer and disconnect. Three fds: peer, gossip and gossip_store # master -> connectd: give message to peer and disconnect. Plus fd for peer
66 msgtype,connectd_peer_final_msg,2003
72 msgtype,connectd_dev_memleak_reply,2133
73 msgdata,connectd_dev_memleak_reply,leak,bool,
74 # Ping/pong test. Waits for a reply if it expects one.
75 msgtype,connectd_ping,2030
76 msgdata,connectd_ping,id,node_id,
77 msgdata,connectd_ping,num_pong_bytes,u16,
78 msgdata,connectd_ping,len,u16,

View File

@@ -506,9 +506,9 @@ static unsigned channel_msg(struct subd *sd, const u8 *msg, const int *fds)
peer_got_shutdown(sd->channel, msg);
break;
case WIRE_CHANNELD_SHUTDOWN_COMPLETE:
/* We expect 2 fds. */
/* We expect 1 fd. */
if (!fds)
return 2;
return 1;
peer_start_closingd_after_shutdown(sd->channel, msg, fds);
break;
case WIRE_CHANNELD_FAIL_FALLEN_BEHIND:
@@ -603,7 +603,6 @@ void peer_start_channeld(struct channel *channel,
channel_errmsg,
channel_set_billboard,
take(&peer_fd->fd),
take(&peer_fd->gossip_fd),
take(&hsmfd), NULL));
if (!channel->owner) {

View File

@@ -341,8 +341,7 @@ static unsigned closing_msg(struct subd *sd, const u8 *msg, const int *fds UNUSE
return 0;
}
void peer_start_closingd(struct channel *channel,
struct peer_fd *peer_fd)
void peer_start_closingd(struct channel *channel, struct peer_fd *peer_fd)
{
u8 *initmsg;
u32 min_feerate, feerate, *max_feerate;
@@ -372,7 +371,6 @@ void peer_start_closingd(struct channel *channel,
channel_errmsg,
channel_set_billboard,
take(&peer_fd->fd),
take(&peer_fd->gossip_fd),
take(&hsmfd),
NULL));

View File

@@ -438,9 +438,9 @@ static unsigned connectd_msg(struct subd *connectd, const u8 *msg, const int *fd
break;
case WIRE_CONNECTD_PEER_CONNECTED:
if (tal_count(fds) != 2)
return 2;
peer_connected(connectd->ld, msg, fds[0], fds[1]);
if (tal_count(fds) != 1)
return 1;
peer_connected(connectd->ld, msg, fds[0]);
break;
case WIRE_CONNECTD_CONNECT_FAILED:

View File

@@ -2994,16 +2994,16 @@ static unsigned int dual_opend_msg(struct subd *dualopend,
handle_dry_run_finished(dualopend, msg);
return 0;
case WIRE_DUALOPEND_CHANNEL_LOCKED:
if (tal_count(fds) != 2)
return 2;
if (tal_count(fds) != 1)
return 1;
handle_channel_locked(dualopend, fds, msg);
return 0;
case WIRE_DUALOPEND_GOT_SHUTDOWN:
handle_peer_wants_to_close(dualopend, msg);
return 0;
case WIRE_DUALOPEND_SHUTDOWN_COMPLETE:
if (tal_count(fds) != 2)
return 2;
if (tal_count(fds) != 1)
return 1;
handle_channel_closed(dualopend, fds, msg);
return 0;
case WIRE_DUALOPEND_FAIL_FALLEN_BEHIND:
@@ -3230,7 +3230,6 @@ static void start_fresh_dualopend(struct peer *peer,
channel_errmsg,
channel_set_billboard,
take(&peer_fd->fd),
take(&peer_fd->gossip_fd),
take(&hsmfd), NULL);
if (!channel->owner) {
@@ -3297,7 +3296,6 @@ void peer_restart_dualopend(struct peer *peer,
channel_errmsg,
channel_set_billboard,
take(&peer_fd->fd),
take(&peer_fd->gossip_fd),
take(&hsmfd), NULL));
if (!channel->owner) {
log_broken(channel->log, "Could not subdaemon channel: %s",

View File

@@ -195,9 +195,8 @@ void handle_reestablish(struct lightningd *ld,
take(towire_connectd_peer_final_msg(NULL, peer_id,
err)));
subd_send_fd(ld->connectd, peer_fd->fd);
subd_send_fd(ld->connectd, peer_fd->gossip_fd);
/* Don't close those fds! */
peer_fd->fd = peer_fd->gossip_fd = -1;
/* Don't close this fd! */
peer_fd->fd = -1;
}
}

View File

@@ -838,8 +838,8 @@ static unsigned int openingd_msg(struct subd *openingd,
tal_free(openingd);
return 0;
}
if (tal_count(fds) != 2)
return 2;
if (tal_count(fds) != 1)
return 1;
opening_funder_finished(openingd, msg, fds, uc->fc);
return 0;
case WIRE_OPENINGD_FUNDER_START_REPLY:
@@ -862,8 +862,8 @@ static unsigned int openingd_msg(struct subd *openingd,
return 0;
case WIRE_OPENINGD_FUNDEE:
if (tal_count(fds) != 2)
return 2;
if (tal_count(fds) != 1)
return 1;
opening_fundee_finished(openingd, msg, fds, uc);
return 0;
@@ -872,8 +872,8 @@ static unsigned int openingd_msg(struct subd *openingd,
return 0;
case WIRE_OPENINGD_GOT_REESTABLISH:
if (tal_count(fds) != 2)
return 2;
if (tal_count(fds) != 1)
return 1;
opening_got_reestablish(openingd, msg, fds, uc);
return 0;
@@ -919,7 +919,6 @@ void peer_start_openingd(struct peer *peer, struct peer_fd *peer_fd)
opend_channel_errmsg,
opend_channel_set_billboard,
take(&peer_fd->fd),
take(&peer_fd->gossip_fd),
take(&hsmfd), NULL);
if (!uc->open_daemon) {
uncommitted_channel_disconnect(uc, LOG_BROKEN,

View File

@@ -1054,9 +1054,8 @@ send_error:
take(towire_connectd_peer_final_msg(NULL, &peer->id,
error)));
subd_send_fd(ld->connectd, payload->peer_fd->fd);
subd_send_fd(ld->connectd, payload->peer_fd->gossip_fd);
/* Don't close those fds! */
payload->peer_fd->fd = payload->peer_fd->gossip_fd = -1;
/* Don't close the fd! */
payload->peer_fd->fd = -1;
}
static bool
@@ -1112,8 +1111,7 @@ REGISTER_PLUGIN_HOOK(peer_connected,
/* Connectd tells us a peer has connected: it never hands us duplicates, since
* it holds them until we say peer_died. */
void peer_connected(struct lightningd *ld, const u8 *msg,
int peer_fd, int gossip_fd)
void peer_connected(struct lightningd *ld, const u8 *msg, int peer_fd)
{
struct node_id id;
u8 *their_features;
@@ -1130,7 +1128,7 @@ void peer_connected(struct lightningd *ld, const u8 *msg,
fatal("Connectd gave bad CONNECT_PEER_CONNECTED message %s",
tal_hex(msg, msg));
hook_payload->peer_fd = new_peer_fd(hook_payload, peer_fd, gossip_fd);
hook_payload->peer_fd = new_peer_fd(hook_payload, peer_fd);
/* If we're already dealing with this peer, hand off to correct
* subdaemon. Otherwise, we'll hand to openingd to wait there. */

View File

@@ -64,8 +64,7 @@ struct peer *peer_from_json(struct lightningd *ld,
const char *buffer,
const jsmntok_t *peeridtok);
void peer_connected(struct lightningd *ld, const u8 *msg,
int peer_fd, int gossip_fd);
void peer_connected(struct lightningd *ld, const u8 *msg, int peer_fd);
/* Could be configurable. */
#define OUR_CHANNEL_FLAGS CHANNEL_FLAGS_ANNOUNCE_CHANNEL

View File

@@ -7,23 +7,20 @@ static void destroy_peer_fd(struct peer_fd *peer_fd)
{
if (peer_fd->fd != -1)
close(peer_fd->fd);
if (peer_fd->gossip_fd != -1)
close(peer_fd->gossip_fd);
}
struct peer_fd *new_peer_fd(const tal_t *ctx, int peer_fdnum, int gossip_fd)
struct peer_fd *new_peer_fd(const tal_t *ctx, int peer_fdnum)
{
struct peer_fd *peer_fd = tal(ctx, struct peer_fd);
peer_fd->fd = peer_fdnum;
peer_fd->gossip_fd = gossip_fd;
tal_add_destructor(peer_fd, destroy_peer_fd);
return peer_fd;
}
struct peer_fd *new_peer_fd_arr(const tal_t *ctx, const int *fds)
struct peer_fd *new_peer_fd_arr(const tal_t *ctx, const int *fd)
{
/* We expect 2 fds. */
assert(tal_count(fds) == 2);
return new_peer_fd(ctx, fds[0], fds[1]);
/* We expect 1 fd. */
assert(tal_count(fd) == 1);
return new_peer_fd(ctx, fd[0]);
}

View File

@@ -3,19 +3,16 @@
#include "config.h"
#include <ccan/tal/tal.h>
/* This name is a little preemptive: it still contains the gossip_fd
* for now! */
/* Tal wrapper for fd connecting subd to connectd */
struct peer_fd {
/* If not -1, closed on freeing */
int fd;
int gossip_fd;
};
/* Allocate a new per-peer state and add destructor to close fds if set;
* sets fds to -1. */
struct peer_fd *new_peer_fd(const tal_t *ctx, int peer_fd, int gossip_fd);
/* Allocate a new per-peer state and add destructor to close fd if set. */
struct peer_fd *new_peer_fd(const tal_t *ctx, int peer_fd);
/* Array version of above: tal_count(fds) must be 2 */
struct peer_fd *new_peer_fd_arr(const tal_t *ctx, const int *fds);
/* Array version of above: tal_count(fds) must be 1 */
struct peer_fd *new_peer_fd_arr(const tal_t *ctx, const int *fd);
#endif /* LIGHTNING_LIGHTNINGD_PEER_FD_H */

View File

@@ -414,7 +414,7 @@ static bool log_status_fail(struct subd *sd, const u8 *msg)
return true;
}
static bool handle_peer_error(struct subd *sd, const u8 *msg, int fds[2])
static bool handle_peer_error(struct subd *sd, const u8 *msg, int fds[1])
{
void *channel = sd->channel;
struct channel_id channel_id;
@@ -533,11 +533,11 @@ static struct io_plan *sd_msg_read(struct io_conn *conn, struct subd *sd)
if (sd->channel) {
switch ((enum peer_status_wire)type) {
case WIRE_STATUS_PEER_ERROR:
/* We expect 2 fds after this */
/* We expect 1 fd after this */
if (!sd->fds_in) {
/* Don't free msg_in: we go around again. */
tal_steal(sd, sd->msg_in);
plan = sd_collect_fds(conn, sd, 2);
plan = sd_collect_fds(conn, sd, 1);
goto out;
}
if (!handle_peer_error(sd, sd->msg_in, sd->fds_in))

View File

@@ -168,7 +168,7 @@ struct log *new_log(const tal_t *ctx UNNEEDED, struct log_book *record UNNEEDED,
struct log_book *new_log_book(struct lightningd *ld UNNEEDED, size_t max_mem UNNEEDED)
{ fprintf(stderr, "new_log_book called!\n"); abort(); }
/* Generated stub for new_peer_fd_arr */
struct peer_fd *new_peer_fd_arr(const tal_t *ctx UNNEEDED, const int *fds UNNEEDED)
struct peer_fd *new_peer_fd_arr(const tal_t *ctx UNNEEDED, const int *fd UNNEEDED)
{ fprintf(stderr, "new_peer_fd_arr called!\n"); abort(); }
/* Generated stub for new_topology */
struct chain_topology *new_topology(struct lightningd *ld UNNEEDED, struct log *log UNNEEDED)

View File

@@ -437,7 +437,7 @@ struct height_states *new_height_states(const tal_t *ctx UNNEEDED,
const u32 *blockheight UNNEEDED)
{ fprintf(stderr, "new_height_states called!\n"); abort(); }
/* Generated stub for new_peer_fd */
struct peer_fd *new_peer_fd(const tal_t *ctx UNNEEDED, int peer_fd UNNEEDED, int gossip_fd UNNEEDED)
struct peer_fd *new_peer_fd(const tal_t *ctx UNNEEDED, int peer_fd UNNEEDED)
{ fprintf(stderr, "new_peer_fd called!\n"); abort(); }
/* Generated stub for new_reltimer_ */
struct oneshot *new_reltimer_(struct timers *timers UNNEEDED,

View File

@@ -109,7 +109,7 @@ struct log *new_log(const tal_t *ctx UNNEEDED, struct log_book *record UNNEEDED,
const char *fmt UNNEEDED, ...)
{ fprintf(stderr, "new_log called!\n"); abort(); }
/* Generated stub for new_peer_fd_arr */
struct peer_fd *new_peer_fd_arr(const tal_t *ctx UNNEEDED, const int *fds UNNEEDED)
struct peer_fd *new_peer_fd_arr(const tal_t *ctx UNNEEDED, const int *fd UNNEEDED)
{ fprintf(stderr, "new_peer_fd_arr called!\n"); abort(); }
/* Generated stub for subdaemon_path */
const char *subdaemon_path(const tal_t *ctx UNNEEDED, const struct lightningd *ld UNNEEDED, const char *name UNNEEDED)

View File

@@ -44,9 +44,9 @@
#include <unistd.h>
#include <wire/wire_sync.h>
/* stdin == lightningd, 3 == peer, 4 == gossipd, 5 = hsmd */
/* stdin == lightningd, 3 == peer, 4 = hsmd */
#define REQ_FD STDIN_FILENO
#define HSM_FD 5
#define HSM_FD 4
/* tx_add_input, tx_add_output, tx_rm_input, tx_rm_output */
#define NUM_TX_MSGS (TX_RM_OUTPUT + 1)
@@ -1164,7 +1164,6 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state)
* form, but we use it in a very limited way. */
for (;;) {
u8 *msg;
bool from_gossipd;
char *err;
bool warning;
struct channel_id actual;
@@ -1175,20 +1174,7 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state)
clean_tmpctx();
/* This helper routine polls both the peer and gossipd. */
msg = peer_or_gossip_sync_read(ctx, state->pps, &from_gossipd);
/* Use standard helper for gossip msgs (forwards, if it's an
* error, exits). */
if (from_gossipd) {
handle_gossip_msg(state->pps, take(msg));
continue;
}
/* Some messages go straight to gossipd. */
if (is_msg_for_gossipd(msg)) {
wire_sync_write(state->pps->gossip_fd, take(msg));
continue;
}
msg = peer_read(ctx, state->pps);
/* BOLT #1:
*
@@ -3440,19 +3426,6 @@ static u8 *handle_funding_depth(struct state *state, u8 *msg)
return NULL;
}
/*~ If we see the gossip_fd readable, we read a whole message. Sure, we might
* block, but we trust gossipd. */
static void handle_gossip_in(struct state *state)
{
u8 *msg = wire_sync_read(NULL, state->pps->gossip_fd);
if (!msg)
status_failed(STATUS_FAIL_GOSSIP_IO,
"Reading gossip: %s", strerror(errno));
handle_gossip_msg(state->pps, take(msg));
}
/* BOLT #2:
*
* A receiving node:
@@ -3553,7 +3526,7 @@ static void do_reconnect_dance(struct state *state)
do {
clean_tmpctx();
msg = peer_read(tmpctx, state->pps);
} while (handle_peer_gossip_or_error(state->pps,
} while (handle_peer_error(state->pps,
&state->channel_id,
msg));
@@ -3750,9 +3723,8 @@ static u8 *handle_peer_in(struct state *state)
break;
}
/* Handles standard cases, and legal unknown ones. */
if (handle_peer_gossip_or_error(state->pps,
&state->channel_id, msg))
/* Handles errors. */
if (handle_peer_error(state->pps, &state->channel_id, msg))
return NULL;
peer_write(state->pps,
@@ -3775,7 +3747,7 @@ int main(int argc, char *argv[])
{
common_setup(argv[0]);
struct pollfd pollfd[3];
struct pollfd pollfd[2];
struct state *state = tal(NULL, struct state);
struct secret *none;
struct fee_states *fee_states;
@@ -3901,9 +3873,9 @@ int main(int argc, char *argv[])
/* 3 == peer, 4 == gossipd, 5 = hsmd */
/* 3 == peer, 4 = hsmd */
state->pps = new_per_peer_state(state);
per_peer_state_set_fds(state->pps, 3, 4);
per_peer_state_set_fd(state->pps, 3);
/*~ We need an initial per-commitment point whether we're funding or
* they are, and lightningd has reserved a unique dbid for us already,
@@ -3927,10 +3899,8 @@ int main(int argc, char *argv[])
/*~ We manually run a little poll() loop here. With only three fds */
pollfd[0].fd = REQ_FD;
pollfd[0].events = POLLIN;
pollfd[1].fd = state->pps->gossip_fd;
pollfd[1].fd = state->pps->peer_fd;
pollfd[1].events = POLLIN;
pollfd[2].fd = state->pps->peer_fd;
pollfd[2].events = POLLIN;
/* Do reconnect, if need be */
if (state->channel) {
@@ -3946,7 +3916,7 @@ int main(int argc, char *argv[])
/*~ If we get a signal which aborts the poll() call, valgrind
* complains about revents being uninitialized. I'm not sure
* that's correct, but it's easy to be sure. */
pollfd[0].revents = pollfd[1].revents = pollfd[2].revents = 0;
pollfd[0].revents = pollfd[1].revents = 0;
poll(pollfd, ARRAY_SIZE(pollfd), -1);
/* Subtle: handle_master_in can do its own poll loop, so
@@ -3955,11 +3925,8 @@ int main(int argc, char *argv[])
if (pollfd[0].revents & POLLIN)
msg = handle_master_in(state);
/* Second priority: messages from peer. */
else if (pollfd[2].revents & POLLIN)
msg = handle_peer_in(state);
/* Last priority: chit-chat from gossipd. */
else if (pollfd[1].revents & POLLIN)
handle_gossip_in(state);
msg = handle_peer_in(state);
/* If we've shutdown, we're done */
if (shutdown_complete(state))

View File

@@ -35,9 +35,9 @@
#include <wire/peer_wire.h>
#include <wire/wire_sync.h>
/* stdin == lightningd, 3 == peer, 4 == gossipd, 5 = hsmd */
/* stdin == lightningd, 3 == peer, 4 = hsmd */
#define REQ_FD STDIN_FILENO
#define HSM_FD 5
#define HSM_FD 4
#if DEVELOPER
/* If --dev-force-tmp-channel-id is set, it ends up here */
@@ -184,7 +184,6 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state,
* form, but we use it in a very limited way. */
for (;;) {
u8 *msg;
bool from_gossipd;
char *err;
bool warning;
struct channel_id actual;
@@ -194,20 +193,7 @@ static u8 *opening_negotiate_msg(const tal_t *ctx, struct state *state,
clean_tmpctx();
/* This helper routine polls both the peer and gossipd. */
msg = peer_or_gossip_sync_read(ctx, state->pps, &from_gossipd);
/* Use standard helper for gossip msgs (forwards, if it's an
* error, exits). */
if (from_gossipd) {
handle_gossip_msg(state->pps, take(msg));
continue;
}
/* Some messages go straight to gossipd. */
if (is_msg_for_gossipd(msg)) {
wire_sync_write(state->pps->gossip_fd, take(msg));
continue;
}
msg = peer_read(ctx, state->pps);
/* BOLT #1:
*
@@ -1259,9 +1245,8 @@ static u8 *handle_peer_in(struct state *state)
if (t == WIRE_OPEN_CHANNEL)
return fundee_channel(state, msg);
/* Handles standard cases, and legal unknown ones. */
if (handle_peer_gossip_or_error(state->pps,
&state->channel_id, msg))
/* Handles error cases. */
if (handle_peer_error(state->pps, &state->channel_id, msg))
return NULL;
extracted = extract_channel_id(msg, &channel_id);
@@ -1285,19 +1270,6 @@ static u8 *handle_peer_in(struct state *state)
peer_failed_connection_lost();
}
/*~ If we see the gossip_fd readable, we read a whole message. Sure, we might
* block, but we trust gossipd. */
static void handle_gossip_in(struct state *state)
{
u8 *msg = wire_sync_read(NULL, state->pps->gossip_fd);
if (!msg)
status_failed(STATUS_FAIL_GOSSIP_IO,
"Reading gossip: %s", strerror(errno));
handle_gossip_msg(state->pps, take(msg));
}
/* Memory leak detection is DEVELOPER-only because we go to great lengths to
* record the backtrace when allocations occur: without that, the leak
* detection tends to be useless for diagnosing where the leak came from, but
@@ -1393,7 +1365,7 @@ int main(int argc, char *argv[])
setup_locale();
u8 *msg;
struct pollfd pollfd[3];
struct pollfd pollfd[2];
struct state *state = tal(NULL, struct state);
struct secret *none;
struct channel_id *force_tmp_channel_id;
@@ -1424,9 +1396,9 @@ int main(int argc, char *argv[])
dev_force_tmp_channel_id = force_tmp_channel_id;
#endif
/* 3 == peer, 4 == gossipd, 5 = hsmd */
/* 3 == peer, 4 = hsmd */
state->pps = new_per_peer_state(state);
per_peer_state_set_fds(state->pps, 3, 4);
per_peer_state_set_fd(state->pps, 3);
/*~ Initially we're not associated with a channel, but
* handle_peer_gossip_or_error compares this. */
@@ -1463,10 +1435,8 @@ int main(int argc, char *argv[])
/*~ We manually run a little poll() loop here. With only three fds */
pollfd[0].fd = REQ_FD;
pollfd[0].events = POLLIN;
pollfd[1].fd = state->pps->gossip_fd;
pollfd[1].fd = state->pps->peer_fd;
pollfd[1].events = POLLIN;
pollfd[2].fd = state->pps->peer_fd;
pollfd[2].events = POLLIN;
/* We exit when we get a conclusion to write to lightningd: either
* opening_funder_reply or opening_fundee. */
@@ -1475,7 +1445,7 @@ int main(int argc, char *argv[])
/*~ If we get a signal which aborts the poll() call, valgrind
* complains about revents being uninitialized. I'm not sure
* that's correct, but it's easy to be sure. */
pollfd[0].revents = pollfd[1].revents = pollfd[2].revents = 0;
pollfd[0].revents = pollfd[1].revents = 0;
poll(pollfd, ARRAY_SIZE(pollfd), -1);
/* Subtle: handle_master_in can do its own poll loop, so
@@ -1484,22 +1454,19 @@ int main(int argc, char *argv[])
if (pollfd[0].revents & POLLIN)
msg = handle_master_in(state);
/* Second priority: messages from peer. */
else if (pollfd[2].revents & POLLIN)
msg = handle_peer_in(state);
/* Last priority: chit-chat from gossipd. */
else if (pollfd[1].revents & POLLIN)
handle_gossip_in(state);
msg = handle_peer_in(state);
/* Since we're the top-level event loop, we clean up */
clean_tmpctx();
}
/*~ Write message and hand back the peer fd and gossipd fd. This also
* means that if the peer or gossipd wrote us any messages we didn't
* read yet, it will simply be read by the next daemon. */
/*~ Write message and hand back the peer fd. This also means that if
* the peer wrote us any messages we didn't read yet, it will simply
* be read by the next daemon. */
wire_sync_write(REQ_FD, msg);
per_peer_state_fdpass_send(REQ_FD, state->pps);
status_debug("Sent %s with fds",
status_debug("Sent %s with fd",
openingd_wire_name(fromwire_peektype(msg)));
/* This frees the entire tal tree. */

View File

@@ -442,7 +442,7 @@ struct chain_coin_mvt *new_coin_wallet_deposit(const tal_t *ctx UNNEEDED,
{ fprintf(stderr, "new_coin_wallet_deposit called!\n"); abort(); }
/* Generated stub for new_peer_fd */
struct peer_fd *new_peer_fd(const tal_t *ctx UNNEEDED, int peer_fd UNNEEDED, int gossip_fd UNNEEDED)
struct peer_fd *new_peer_fd(const tal_t *ctx UNNEEDED, int peer_fd UNNEEDED)
{ fprintf(stderr, "new_peer_fd called!\n"); abort(); }
/* Generated stub for notify_chain_mvt */
void notify_chain_mvt(struct lightningd *ld UNNEEDED, const struct chain_coin_mvt *mvt UNNEEDED)