diff --git a/connectd/connectd.c b/connectd/connectd.c index b8343f6c6..aa24cd099 100644 --- a/connectd/connectd.c +++ b/connectd/connectd.c @@ -2028,7 +2028,7 @@ static struct io_plan *recv_gossip(struct io_conn *conn, peer = peer_htable_get(&daemon->peers, &dst); if (peer) - queue_peer_msg(peer, take(gossip_msg)); + inject_peer_msg(peer, take(gossip_msg)); return daemon_conn_read_next(conn, daemon->gossipd); } diff --git a/connectd/multiplex.c b/connectd/multiplex.c index 8637d01ff..384f12f62 100644 --- a/connectd/multiplex.c +++ b/connectd/multiplex.c @@ -36,8 +36,9 @@ #include #include -void queue_peer_msg(struct peer *peer, const u8 *msg TAKES) +void inject_peer_msg(struct peer *peer, const u8 *msg TAKES) { + status_peer_io(LOG_IO_OUT, &peer->id, msg); msg_enqueue(peer->peer_outq, msg); } @@ -360,7 +361,7 @@ static void send_ping(struct peer *peer) return; } - queue_peer_msg(peer, take(make_ping(NULL, 1, 0))); + inject_peer_msg(peer, take(make_ping(NULL, 1, 0))); peer->expecting_pong = PONG_EXPECTED_PROBING; set_ping_timer(peer); } @@ -378,7 +379,7 @@ static void handle_ping_in(struct peer *peer, const u8 *msg) } if (pong) - queue_peer_msg(peer, take(pong)); + inject_peer_msg(peer, take(pong)); } static void handle_ping_reply(struct peer *peer, const u8 *msg) @@ -582,7 +583,7 @@ static struct io_plan *read_from_subd_done(struct io_conn *subd_conn, struct peer *peer) { /* Tell them to encrypt & write. */ - queue_peer_msg(peer, take(peer->subd_in)); + msg_enqueue(peer->peer_outq, take(peer->subd_in)); peer->subd_in = NULL; /* Wait for them to wake us */ @@ -828,7 +829,7 @@ void send_manual_ping(struct daemon *daemon, const u8 *msg) if (tal_count(ping) > 65535) status_failed(STATUS_FAIL_MASTER_IO, "Oversize ping"); - queue_peer_msg(peer, take(ping)); + inject_peer_msg(peer, take(ping)); status_debug("sending ping expecting %sresponse", num_pong_bytes >= 65532 ? "no " : ""); diff --git a/connectd/multiplex.h b/connectd/multiplex.h index bbb67e755..fa0a032fb 100644 --- a/connectd/multiplex.h +++ b/connectd/multiplex.h @@ -21,8 +21,9 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn, void multiplex_final_msg(struct peer *peer, const u8 *final_msg TAKES); -/* Inject a message into the output stream */ -void queue_peer_msg(struct peer *peer, const u8 *msg TAKES); +/* Inject a message into the output stream. Unlike a raw msg_enqueue, + * this does io logging if required. */ +void inject_peer_msg(struct peer *peer, const u8 *msg TAKES); void setup_peer_gossip_store(struct peer *peer, const struct feature_set *our_features, diff --git a/connectd/onion_message.c b/connectd/onion_message.c index f19594899..5459fd57b 100644 --- a/connectd/onion_message.c +++ b/connectd/onion_message.c @@ -37,9 +37,9 @@ void handle_obs2_onion_message(struct daemon *daemon, /* FIXME: ratelimit! */ if (!fromwire_obs2_onion_message(msg, msg, &blinding, &onion)) { - queue_peer_msg(peer, - towire_warningfmt(NULL, NULL, - "Bad onion_message")); + inject_peer_msg(peer, + towire_warningfmt(NULL, NULL, + "Bad onion_message")); return; } @@ -161,10 +161,10 @@ void handle_obs2_onion_message(struct daemon *daemon, &next_node)); return; } - queue_peer_msg(next_peer, - take(towire_obs2_onion_message(NULL, - &next_blinding, - serialize_onionpacket(tmpctx, rs->next)))); + inject_peer_msg(next_peer, + take(towire_obs2_onion_message(NULL, + &next_blinding, + serialize_onionpacket(tmpctx, rs->next)))); } } @@ -188,7 +188,7 @@ void onionmsg_req(struct daemon *daemon, const u8 *msg) omsg = towire_obs2_onion_message(NULL, &blinding, onionmsg); else omsg = towire_onion_message(NULL, &blinding, onionmsg); - queue_peer_msg(peer, take(omsg)); + inject_peer_msg(peer, take(omsg)); } } @@ -213,9 +213,9 @@ void handle_onion_message(struct daemon *daemon, /* FIXME: ratelimit! */ if (!fromwire_onion_message(msg, msg, &blinding, &onion)) { - queue_peer_msg(peer, - towire_warningfmt(NULL, NULL, - "Bad onion_message")); + inject_peer_msg(peer, + towire_warningfmt(NULL, NULL, + "Bad onion_message")); return; } @@ -336,10 +336,10 @@ void handle_onion_message(struct daemon *daemon, &next_node)); return; } - queue_peer_msg(next_peer, - take(towire_onion_message(NULL, - &next_blinding, - serialize_onionpacket(tmpctx, rs->next)))); + inject_peer_msg(next_peer, + take(towire_onion_message(NULL, + &next_blinding, + serialize_onionpacket(tmpctx, rs->next)))); } } diff --git a/connectd/test/run-onion_message.c b/connectd/test/run-onion_message.c index 8533d718f..56ea1e268 100644 --- a/connectd/test/run-onion_message.c +++ b/connectd/test/run-onion_message.c @@ -88,6 +88,9 @@ bool fromwire_connectd_send_onionmsg(const tal_t *ctx UNNEEDED, const void *p UN /* Generated stub for fromwire_node_id */ void fromwire_node_id(const u8 **cursor UNNEEDED, size_t *max UNNEEDED, struct node_id *id UNNEEDED) { fprintf(stderr, "fromwire_node_id called!\n"); abort(); } +/* Generated stub for inject_peer_msg */ +void inject_peer_msg(struct peer *peer UNNEEDED, const u8 *msg TAKES UNNEEDED) +{ fprintf(stderr, "inject_peer_msg called!\n"); abort(); } /* Generated stub for master_badmsg */ void master_badmsg(u32 type_expected UNNEEDED, const u8 *msg) { fprintf(stderr, "master_badmsg called!\n"); abort(); } @@ -100,9 +103,6 @@ void node_id_from_pubkey(struct node_id *id UNNEEDED, const struct pubkey *key U /* Generated stub for pubkey_from_node_id */ bool pubkey_from_node_id(struct pubkey *key UNNEEDED, const struct node_id *id UNNEEDED) { fprintf(stderr, "pubkey_from_node_id called!\n"); abort(); } -/* Generated stub for queue_peer_msg */ -void queue_peer_msg(struct peer *peer UNNEEDED, const u8 *msg TAKES UNNEEDED) -{ fprintf(stderr, "queue_peer_msg called!\n"); abort(); } /* Generated stub for status_fmt */ void status_fmt(enum log_level level UNNEEDED, const struct node_id *peer UNNEEDED,