NAC_Blockchain/nac-cbpp-l0/src/propagation/mod.rs

360 lines
11 KiB
Rust

// NAC CBPP L0 - Block Propagation Optimization
// 区块传播优化模块 (QUIC + 0-RTT + FEC)
use std::sync::Arc;
use tokio::sync::RwLock;
use std::collections::HashMap;
use nac_udm::primitives::{Hash, Address};
use thiserror::Error;
use serde::{Serialize, Deserialize};
#[derive(Error, Debug)]
pub enum PropagationError {
#[error("Connection failed: {0}")]
ConnectionFailed(String),
#[error("Block announce failed: {0}")]
AnnounceFailed(String),
#[error("Network error: {0}")]
NetworkError(String),
#[error("Peer not found: {0}")]
PeerNotFound(Address),
}
/// Block Announce Message
///
/// Sent before full block to allow peers to request missing blocks
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BlockAnnounce {
/// Block hash
pub block_hash: Hash,
/// Block height
pub height: u64,
/// Producer address
pub producer: Address,
/// Timestamp
pub timestamp: u64,
/// Number of transactions
pub tx_count: u32,
}
/// CBP Backbone Network Connection
///
/// Represents a connection to another CBP node in the backbone network
#[derive(Debug, Clone)]
pub struct BackboneConnection {
/// Peer address
pub peer_address: Address,
/// Connection quality score (0-100)
pub quality_score: u8,
/// Round-trip time (milliseconds)
pub rtt_ms: u64,
/// Last seen timestamp
pub last_seen: u64,
/// Is this a direct connection?
pub is_direct: bool,
}
/// Block Propagation Manager
///
/// Manages the CBP backbone network and optimizes block propagation
pub struct PropagationManager {
/// CBP backbone connections
connections: Arc<RwLock<HashMap<Address, BackboneConnection>>>,
/// Pending block announces
pending_announces: Arc<RwLock<HashMap<Hash, BlockAnnounce>>>,
/// Configuration
config: PropagationConfig,
}
#[derive(Debug, Clone)]
pub struct PropagationConfig {
/// Minimum number of CBP connections
pub min_cbp_connections: usize,
/// Target number of CBP connections
pub target_cbp_connections: usize,
/// Maximum RTT for backbone connection (ms)
pub max_rtt_ms: u64,
/// Connection timeout (ms)
pub connection_timeout_ms: u64,
}
impl Default for PropagationConfig {
fn default() -> Self {
Self {
min_cbp_connections: 12, // From constitutional clause NET_CONN_MIN_CBP
target_cbp_connections: 24,
max_rtt_ms: 500,
connection_timeout_ms: 5000,
}
}
}
impl PropagationManager {
/// Create a new propagation manager
pub fn new(config: PropagationConfig) -> Self {
Self {
connections: Arc::new(RwLock::new(HashMap::new())),
pending_announces: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
/// Create with default configuration
pub fn with_default_config() -> Self {
Self::new(PropagationConfig::default())
}
/// Add a CBP backbone connection
pub async fn add_connection(&self, connection: BackboneConnection) -> Result<(), PropagationError> {
let mut connections = self.connections.write().await;
connections.insert(connection.peer_address, connection);
Ok(())
}
/// Remove a CBP backbone connection
pub async fn remove_connection(&self, peer: &Address) -> Result<(), PropagationError> {
let mut connections = self.connections.write().await;
connections.remove(peer)
.ok_or_else(|| PropagationError::PeerNotFound(*peer))?;
Ok(())
}
/// Get all active CBP connections
pub async fn get_connections(&self) -> Vec<BackboneConnection> {
let connections = self.connections.read().await;
connections.values().cloned().collect()
}
/// Check if we have enough CBP connections
pub async fn has_sufficient_connections(&self) -> bool {
let connections = self.connections.read().await;
connections.len() >= self.config.min_cbp_connections
}
/// Broadcast block announce to all CBP peers
pub async fn broadcast_block_announce(&self, announce: BlockAnnounce) -> Result<(), PropagationError> {
// Store pending announce
{
let mut pending = self.pending_announces.write().await;
pending.insert(announce.block_hash, announce.clone());
}
// Get all connections
let connections = self.connections.read().await;
if connections.is_empty() {
return Err(PropagationError::AnnounceFailed(
"No CBP connections available".to_string()
));
}
// In real implementation, this would send via QUIC
// For now, we just log the intent
log::info!(
"Broadcasting block announce: hash={:?}, height={}, to {} peers",
announce.block_hash,
announce.height,
connections.len()
);
Ok(())
}
/// Request full block from peer
pub async fn request_block(&self, block_hash: Hash, from_peer: Address) -> Result<(), PropagationError> {
let connections = self.connections.read().await;
if !connections.contains_key(&from_peer) {
return Err(PropagationError::PeerNotFound(from_peer));
}
// In real implementation, this would send request via QUIC
log::info!(
"Requesting block {:?} from peer {:?}",
block_hash,
from_peer
);
Ok(())
}
/// Update connection quality score
pub async fn update_connection_quality(&self, peer: Address, new_score: u8, rtt_ms: u64) -> Result<(), PropagationError> {
let mut connections = self.connections.write().await;
if let Some(conn) = connections.get_mut(&peer) {
conn.quality_score = new_score;
conn.rtt_ms = rtt_ms;
conn.last_seen = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
Ok(())
} else {
Err(PropagationError::PeerNotFound(peer))
}
}
/// Get best peers for block propagation (sorted by quality)
pub async fn get_best_peers(&self, count: usize) -> Vec<BackboneConnection> {
let connections = self.connections.read().await;
let mut peers: Vec<BackboneConnection> = connections.values().cloned().collect();
// Sort by quality score (descending) and RTT (ascending)
peers.sort_by(|a, b| {
b.quality_score.cmp(&a.quality_score)
.then(a.rtt_ms.cmp(&b.rtt_ms))
});
peers.into_iter().take(count).collect()
}
/// Cleanup stale connections
pub async fn cleanup_stale_connections(&self, max_age_secs: u64) {
let mut connections = self.connections.write().await;
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
connections.retain(|_, conn| {
now - conn.last_seen < max_age_secs
});
}
/// Get propagation statistics
pub async fn get_stats(&self) -> PropagationStats {
let connections = self.connections.read().await;
let pending = self.pending_announces.read().await;
let avg_rtt = if !connections.is_empty() {
connections.values().map(|c| c.rtt_ms).sum::<u64>() / connections.len() as u64
} else {
0
};
let avg_quality = if !connections.is_empty() {
connections.values().map(|c| c.quality_score as u64).sum::<u64>() / connections.len() as u64
} else {
0
};
PropagationStats {
total_connections: connections.len(),
pending_announces: pending.len(),
avg_rtt_ms: avg_rtt,
avg_quality_score: avg_quality as u8,
}
}
}
/// Propagation statistics
#[derive(Debug, Clone)]
pub struct PropagationStats {
pub total_connections: usize,
pub pending_announces: usize,
pub avg_rtt_ms: u64,
pub avg_quality_score: u8,
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_connection(peer: u64, quality: u8, rtt: u64) -> BackboneConnection {
BackboneConnection {
peer_address: Address::from_low_u64_be(peer),
quality_score: quality,
rtt_ms: rtt,
last_seen: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
is_direct: true,
}
}
#[tokio::test]
async fn test_add_remove_connection() {
let manager = PropagationManager::with_default_config();
let conn = create_test_connection(1, 90, 100);
manager.add_connection(conn.clone()).await.unwrap();
let connections = manager.get_connections().await;
assert_eq!(connections.len(), 1);
manager.remove_connection(&conn.peer_address).await.unwrap();
let connections = manager.get_connections().await;
assert_eq!(connections.len(), 0);
}
#[tokio::test]
async fn test_get_best_peers() {
let manager = PropagationManager::with_default_config();
// Add peers with different quality scores
manager.add_connection(create_test_connection(1, 90, 100)).await.unwrap();
manager.add_connection(create_test_connection(2, 80, 150)).await.unwrap();
manager.add_connection(create_test_connection(3, 95, 120)).await.unwrap();
let best = manager.get_best_peers(2).await;
assert_eq!(best.len(), 2);
assert_eq!(best[0].quality_score, 95); // Best quality
assert_eq!(best[1].quality_score, 90); // Second best
}
#[tokio::test]
async fn test_broadcast_block_announce() {
let manager = PropagationManager::with_default_config();
// Add some connections
manager.add_connection(create_test_connection(1, 90, 100)).await.unwrap();
manager.add_connection(create_test_connection(2, 85, 120)).await.unwrap();
let announce = BlockAnnounce {
block_hash: Hash::zero(),
height: 1000,
producer: Address::zero(),
timestamp: 1000,
tx_count: 10,
};
let result = manager.broadcast_block_announce(announce).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_has_sufficient_connections() {
let manager = PropagationManager::with_default_config();
assert!(!manager.has_sufficient_connections().await);
// Add minimum required connections (12)
for i in 0..12 {
manager.add_connection(create_test_connection(i, 80, 100)).await.unwrap();
}
assert!(manager.has_sufficient_connections().await);
}
}