Skip to content

Commit

Permalink
formating using lint
Browse files Browse the repository at this point in the history
  • Loading branch information
GilboaAWS committed Jan 5, 2025
1 parent bc1d66f commit 31cf02f
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ pub(crate) struct ConnectionsMap<Connection>(pub(crate) DashMap<String, ClusterN

pub(crate) struct RefreshState<Connection> {
pub handle: JoinHandle<()>, // The currect running refresh task
pub node_conn: Option<ClusterNode<Connection>> // The refreshed connection after the task is done
pub node_conn: Option<ClusterNode<Connection>>, // The refreshed connection after the task is done
}


impl<Connection> std::fmt::Display for ConnectionsMap<Connection> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for item in self.0.iter() {
Expand All @@ -148,7 +147,6 @@ pub(crate) struct ConnectionsContainer<Connection> {
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,


// Holds all the failed addresses that started a refresh task.
pub(crate) refresh_addresses_started: DashSet<String>,
// Follow the refresh ops on the connections
Expand Down
150 changes: 105 additions & 45 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,18 @@ use crate::{
use connections_container::RefreshState;
use dashmap::DashMap;
use std::{
collections::{HashMap, HashSet}, fmt, io, iter::once, mem, net::{IpAddr, SocketAddr}, pin::Pin, sync::{
collections::{HashMap, HashSet},
fmt, io,
iter::once,
mem,
net::{IpAddr, SocketAddr},
pin::Pin,
sync::{
atomic::{self, AtomicUsize, Ordering},
Arc, Mutex,
}, task::{self, Poll}, time::SystemTime
},
task::{self, Poll},
time::SystemTime,
};
use strum_macros::Display;
#[cfg(feature = "tokio-comp")]
Expand Down Expand Up @@ -1360,7 +1368,8 @@ where
inner.clone(),
addrs_to_refresh,
RefreshConnectionType::AllConnections,
).await;
)
.await;
}
}

Expand All @@ -1379,21 +1388,34 @@ where
info!("Skipping refresh for {}: already in progress", address);
continue;
}

let inner_clone = inner.clone();
let address_clone = address.clone();
let address_clone_for_task = address.clone();

let handle = tokio::spawn(async move {
info!("Refreshing connection task to {:?} started", address_clone_for_task);
info!(
"Refreshing connection task to {:?} started",
address_clone_for_task
);
let _ = async {
// Add this address to be removed in poll_flush so all requests see a consistent connection map.
// See next comment for elaborated explanation.
inner_clone.conn_lock.read().expect(MUTEX_READ_ERR).refresh_addresses_done.insert(address_clone_for_task.clone());

let mut cluster_params = inner_clone.cluster_params.read().expect(MUTEX_READ_ERR).clone();
// See next comment for elaborated explanation.
inner_clone
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.refresh_addresses_done
.insert(address_clone_for_task.clone());

let mut cluster_params = inner_clone
.cluster_params
.read()
.expect(MUTEX_READ_ERR)
.clone();
let subs_guard = inner_clone.subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions = subs_guard.get(&address_clone_for_task).cloned();
cluster_params.pubsub_subscriptions =
subs_guard.get(&address_clone_for_task).cloned();
drop(subs_guard);

let node_result = get_or_create_conn(
Expand All @@ -1409,7 +1431,7 @@ where
Ok(node) => {
// Maintain the newly refreshed connection separately from the main connection map.
// This refreshed connection will be incorporated into the main connection map at the start of the poll_flush operation.
// This approach ensures that all requests within the current batch interact with a consistent connection map,
// This approach ensures that all requests within the current batch interact with a consistent connection map,
// preventing potential reordering issues.
//
// By delaying the integration of the refreshed connection:
Expand All @@ -1421,11 +1443,17 @@ where
// This strategy effectively creates a synchronization point at the beginning of poll_flush, where the connection map is
// updated atomically for the next batch of operations. This approach balances the need for up-to-date connection information
// with the requirement for consistent request handling within each processing cycle.
let connection_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR);
if let Some(mut refresh_state) = connection_container.refresh_operations.get_mut(&address_clone_for_task) {
let connection_container =
inner_clone.conn_lock.read().expect(MUTEX_READ_ERR);
if let Some(mut refresh_state) = connection_container
.refresh_operations
.get_mut(&address_clone_for_task)
{
refresh_state.node_conn = Some(node);
}
connection_container.refresh_addresses_done.insert(address_clone_for_task);
connection_container
.refresh_addresses_done
.insert(address_clone_for_task);
Ok(())
}
Err(err) => {
Expand All @@ -1436,17 +1464,24 @@ where
Err(err)
}
}
}.await;
}
.await;

info!("Refreshing connection task to {:?} is done", address_clone);
});

// Keep the task handle into the RefreshState of this address
info!("Inserting tokio task to refresh_ops map of address {:?}", address.clone());
refresh_ops_map.insert(address, RefreshState {
handle,
info!(
"Inserting tokio task to refresh_ops map of address {:?}",
address.clone()
);
refresh_ops_map.insert(
address,
RefreshState {
handle,
node_conn: None,
});
},
);
}
debug!("refresh connection tasts initiated");
}
Expand Down Expand Up @@ -1786,7 +1821,8 @@ where
inner.clone(),
addrs_to_refresh,
RefreshConnectionType::AllConnections,
).await;
)
.await;
}
}

Expand Down Expand Up @@ -2291,7 +2327,12 @@ where
ConnectionCheck::Found((address, connection)) => (address, connection.await),
ConnectionCheck::OnlyAddress(addr) => {
// No connection in for this address in the conn_map
Self::refresh_connections(core, HashSet::from_iter(once(addr)),RefreshConnectionType::AllConnections).await;
Self::refresh_connections(
core,
HashSet::from_iter(once(addr)),
RefreshConnectionType::AllConnections,
)
.await;
return Err(RedisError::from((
ErrorKind::AllConnectionsUnavailable,
"No connection for the address, started a refresh task",
Expand Down Expand Up @@ -2395,39 +2436,58 @@ where
fn update_refreshed_connection(&mut self) {
loop {
let connections_container = self.inner.conn_lock.read().expect(MUTEX_WRITE_ERR);

// Process refresh_addresses_started
let addresses_to_remove: Vec<String> = connections_container.refresh_addresses_started.iter().map(|r| r.key().clone()).collect();
let addresses_to_remove: Vec<String> = connections_container
.refresh_addresses_started
.iter()
.map(|r| r.key().clone())
.collect();
for address in addresses_to_remove {
connections_container.refresh_addresses_started.remove(&address);
connections_container
.refresh_addresses_started
.remove(&address);
connections_container.remove_node(&address);
}

// Process refresh_addresses_done
let addresses_done: Vec<String> = connections_container.refresh_addresses_done.iter().map(|r| r.key().clone()).collect();
let addresses_done: Vec<String> = connections_container
.refresh_addresses_done
.iter()
.map(|r| r.key().clone())
.collect();
for address in addresses_done {
connections_container.refresh_addresses_done.remove(&address);

if let Some(mut refresh_state) = connections_container.refresh_operations.get_mut(&address) {
info!("update_refreshed_connection: Update conn for addr: {}", address);

connections_container
.refresh_addresses_done
.remove(&address);

if let Some(mut refresh_state) =
connections_container.refresh_operations.get_mut(&address)
{
info!(
"update_refreshed_connection: Update conn for addr: {}",
address
);

// Take the node_conn out of RefreshState, replacing it with None
if let Some(node_conn) = mem::take(&mut refresh_state.node_conn) {
info!("update_refreshed_connection: replacing/adding the conn");
// Move the node_conn to the function
connections_container.replace_or_add_connection_for_address(address.clone(), node_conn);
connections_container
.replace_or_add_connection_for_address(address.clone(), node_conn);
}
}
// Remove this entry from refresh_ops_map
connections_container.refresh_operations.remove(&address);

}

// Check if both sets are empty
if connections_container.refresh_addresses_started.is_empty() && connections_container.refresh_addresses_done.is_empty() {
if connections_container.refresh_addresses_started.is_empty()
&& connections_container.refresh_addresses_done.is_empty()
{
break;
}

// Release the lock before the next iteration
drop(connections_container);
}
Expand Down Expand Up @@ -2537,8 +2597,8 @@ where
}
}
Next::Reconnect { request, target } => {
poll_flush_action =
poll_flush_action.change_state(PollFlushAction::Reconnect(HashSet::from_iter([target])));
poll_flush_action = poll_flush_action
.change_state(PollFlushAction::Reconnect(HashSet::from_iter([target])));
if let Some(request) = request {
self.inner.pending_requests.lock().unwrap().push(request);
}
Expand Down Expand Up @@ -2621,18 +2681,18 @@ where

fn start_send(self: Pin<&mut Self>, msg: Message<C>) -> Result<(), Self::Error> {
let Message { cmd, sender } = msg;

let info = RequestInfo { cmd };

self.inner
.pending_requests
.lock()
.unwrap()
.push(PendingRequest {
retry: 0,
sender,
info,
});
retry: 0,
sender,
info,
});
Ok(())
}

Expand All @@ -2659,7 +2719,7 @@ where

// Updating the connection_map with all the refreshed_connections
// In case of active poll_recovery, the <RecoverSlots / Reconnect(reconnect_to_initial_nodes)> work should
// take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of
// take care of the refreshed_connection, add them if still relevant, and kill the refresh_tasks of
// non-relevant addresses.
self.update_refreshed_connection();

Expand Down

0 comments on commit 31cf02f

Please sign in to comment.