From c953c40e04c52da4c8c0cafc666767da75744e37 Mon Sep 17 00:00:00 2001 From: conduition Date: Mon, 18 Mar 2024 18:08:45 +0000 Subject: [PATCH] send retry messages to players if some players reject --- demo/mm_server/src/server/mod.rs | 2 +- demo/mm_server/src/server/offer_and_ack.rs | 36 +++++++++++++++++----- 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/demo/mm_server/src/server/mod.rs b/demo/mm_server/src/server/mod.rs index d984e58..c4390c8 100644 --- a/demo/mm_server/src/server/mod.rs +++ b/demo/mm_server/src/server/mod.rs @@ -41,7 +41,7 @@ fn handle_tcp_conn( state_wlock.stage = Stage::OfferAndAck; } - if let Some(accepted_players) = offer_and_ack::offer_and_ack_cycle(&state)? { + if let Some(accepted_offers) = offer_and_ack::offer_and_ack_cycle(&state)? { // TODO prompt all players for signatures } } diff --git a/demo/mm_server/src/server/offer_and_ack.rs b/demo/mm_server/src/server/offer_and_ack.rs index 9889455..5120f9c 100644 --- a/demo/mm_server/src/server/offer_and_ack.rs +++ b/demo/mm_server/src/server/offer_and_ack.rs @@ -1,6 +1,6 @@ use super::{MIN_PLAYERS, SOCKET_DEFAULT_READ_TIMEOUT}; use crate::global_state::GlobalState; -use common::{ClientOfferAck, PlayerID, ServerOffer}; +use common::{ClientOfferAck, PlayerID, ServerOffer, ServerOfferAck}; use std::{ collections::{BTreeMap, BTreeSet}, @@ -50,13 +50,13 @@ pub(crate) fn offer_and_ack_cycle( } drop(ack_sender); // Otherwise the ack_receiver channel will stay open. - let mut accepted_players = BTreeMap::new(); + let mut accepted_offers = BTreeMap::new(); let mut rejected_players = BTreeSet::new(); while let Ok((player_id, offer, client_offer_ack)) = ack_receiver.recv() { match client_offer_ack { ClientOfferAck::Accept => { - accepted_players.insert(player_id, offer); + accepted_offers.insert(player_id, offer); } ClientOfferAck::Reject => { rejected_players.insert(player_id); @@ -65,22 +65,42 @@ pub(crate) fn offer_and_ack_cycle( } if rejected_players.len() > 0 { + let thread_handles: Vec> = accepted_offers + .into_keys() + .map(|player_id| { + let state = Arc::clone(state); + thread::spawn(move || { + let state_rlock = state.read().unwrap(); + let conn = &state_rlock.registrations[&player_id].connection; + if serde_cbor::to_writer(conn, &ServerOfferAck::Retry).is_err() { + drop(state_rlock); + state.write().unwrap().registrations.remove(&player_id); + } + }) + }) + .collect(); + // Disconnect rejecting players. { let mut state_wlock = state.write().unwrap(); for player_id in rejected_players { state_wlock.registrations.remove(&player_id); } + } - // We don't have enough accepting players to try again. - if state_wlock.registrations.len() < MIN_PLAYERS { - return Ok(None); - } + // Wait for all ACKs to be sent + for handle in thread_handles { + handle.join().unwrap(); + } + + // We don't have enough accepting players to try again. + if state.read().unwrap().registrations.len() < MIN_PLAYERS { + return Ok(None); } // Retry with accepting players. return offer_and_ack_cycle(state); } - Ok(Some(accepted_players)) + Ok(Some(accepted_offers)) }