Fixed bug with postgres reconnection in the connection pool (#1078)

This commit is contained in:
C
2025-09-15 19:19:48 -03:00
committed by GitHub
parent 4f65441c0d
commit 1cc4783198

View File

@@ -142,14 +142,10 @@ impl DatabasePool for PgConnectionPool {
fn new_resource( fn new_resource(
config: &Self::Config, config: &Self::Config,
still_valid: Arc<AtomicBool>, stale: Arc<AtomicBool>,
timeout: Duration, timeout: Duration,
) -> Result<Self::Connection, cdk_sql_common::pool::Error<Self::Error>> { ) -> Result<Self::Connection, cdk_sql_common::pool::Error<Self::Error>> {
Ok(PostgresConnection::new( Ok(PostgresConnection::new(config.to_owned(), timeout, stale))
config.to_owned(),
timeout,
still_valid,
))
} }
} }
@@ -164,7 +160,7 @@ pub struct PostgresConnection {
impl PostgresConnection { impl PostgresConnection {
/// Creates a new instance /// Creates a new instance
pub fn new(config: PgConfig, timeout: Duration, still_valid: Arc<AtomicBool>) -> Self { pub fn new(config: PgConfig, timeout: Duration, stale: Arc<AtomicBool>) -> Self {
let failed = Arc::new(Mutex::new(None)); let failed = Arc::new(Mutex::new(None));
let result = Arc::new(OnceLock::new()); let result = Arc::new(OnceLock::new());
let notify = Arc::new(Notify::new()); let notify = Arc::new(Notify::new());
@@ -191,22 +187,22 @@ impl PostgresConnection {
Err(err) => { Err(err) => {
*error_clone.lock().await = *error_clone.lock().await =
Some(cdk_common::database::Error::Database(Box::new(err))); Some(cdk_common::database::Error::Database(Box::new(err)));
still_valid.store(false, std::sync::atomic::Ordering::Release); stale.store(false, std::sync::atomic::Ordering::Release);
notify_clone.notify_waiters(); notify_clone.notify_waiters();
return; return;
} }
}; };
let still_valid_for_spawn = still_valid.clone(); let stale_for_spawn = stale.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = connection.await; let _ = connection.await;
still_valid_for_spawn.store(false, std::sync::atomic::Ordering::Release); stale_for_spawn.store(true, std::sync::atomic::Ordering::Release);
}); });
if let Some(schema) = config.schema.as_ref() { if let Some(schema) = config.schema.as_ref() {
if let Err(err) = select_schema(&client, schema).await { if let Err(err) = select_schema(&client, schema).await {
*error_clone.lock().await = Some(err); *error_clone.lock().await = Some(err);
still_valid.store(false, std::sync::atomic::Ordering::Release); stale.store(false, std::sync::atomic::Ordering::Release);
notify_clone.notify_waiters(); notify_clone.notify_waiters();
return; return;
} }
@@ -221,22 +217,22 @@ impl PostgresConnection {
Err(err) => { Err(err) => {
*error_clone.lock().await = *error_clone.lock().await =
Some(cdk_common::database::Error::Database(Box::new(err))); Some(cdk_common::database::Error::Database(Box::new(err)));
still_valid.store(false, std::sync::atomic::Ordering::Release); stale.store(false, std::sync::atomic::Ordering::Release);
notify_clone.notify_waiters(); notify_clone.notify_waiters();
return; return;
} }
}; };
let still_valid_for_spawn = still_valid.clone(); let stale_for_spawn = stale.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _ = connection.await; let _ = connection.await;
still_valid_for_spawn.store(false, std::sync::atomic::Ordering::Release); stale_for_spawn.store(true, std::sync::atomic::Ordering::Release);
}); });
if let Some(schema) = config.schema.as_ref() { if let Some(schema) = config.schema.as_ref() {
if let Err(err) = select_schema(&client, schema).await { if let Err(err) = select_schema(&client, schema).await {
*error_clone.lock().await = Some(err); *error_clone.lock().await = Some(err);
still_valid.store(false, std::sync::atomic::Ordering::Release); stale.store(true, std::sync::atomic::Ordering::Release);
notify_clone.notify_waiters(); notify_clone.notify_waiters();
return; return;
} }