use crate::types::*; use crate::locker::{AssetLocker, AssetLockerImpl}; use crate::unlocker::{AssetUnlocker, AssetUnlockerImpl}; use crate::validator::{ValidatorPool, ValidatorPoolImpl}; use crate::relayer::{Relayer, RelayerImpl}; use crate::eth_listener::{EventListener, EthereumListener, BridgeEvent}; use async_trait::async_trait; use sled::Db; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; /// 桥接管理器trait #[async_trait] pub trait BridgeManager: Send + Sync { /// 初始化桥接 async fn initialize(&self) -> Result<()>; /// 启动桥接 async fn start(&self) -> Result<()>; /// 停止桥接 async fn stop(&self) -> Result<()>; /// 获取桥接配置 async fn get_config(&self) -> Result; /// 更新桥接配置 async fn update_config(&self, config: BridgeConfig) -> Result<()>; /// 暂停桥接 async fn pause(&self) -> Result<()>; /// 恢复桥接 async fn resume(&self) -> Result<()>; /// 获取桥接状态 async fn get_status(&self) -> Result; } /// 桥接状态 #[derive(Debug, Clone)] pub struct BridgeStatus { pub is_running: bool, pub is_paused: bool, pub total_locked: Amount, pub total_unlocked: Amount, pub active_validators: usize, pub active_relayers: usize, } /// 桥接管理器实现 pub struct BridgeManagerImpl { db: Arc, config: Arc>, locker: Arc, unlocker: Arc, validator_pool: Arc, relayer: Arc, listeners: Arc>>>, is_running: Arc>, } impl BridgeManagerImpl { pub fn new(db_path: &str) -> Result { let db = Arc::new( sled::open(db_path) .map_err(|e| BridgeError::DatabaseError(e.to_string()))? ); let config = Arc::new(RwLock::new(BridgeConfig::default())); let config_clone = config.clone(); // 创建组件 let locker = Arc::new(AssetLockerImpl::new( db.clone(), config_clone.try_read().expect("mainnet: handle error").clone(), )); let unlocker = Arc::new(AssetUnlockerImpl::new( db.clone(), config_clone.try_read().expect("mainnet: handle error").clone(), )); let validator_pool = Arc::new(ValidatorPoolImpl::new( db.clone(), config_clone.try_read().expect("mainnet: handle error").clone(), )); let relayer = Arc::new(RelayerImpl::new( db.clone(), locker.clone(), unlocker.clone(), validator_pool.clone(), config_clone.try_read().expect("mainnet: handle error").clone(), )); Ok(Self { db, config, locker, unlocker, validator_pool, relayer, listeners: Arc::new(RwLock::new(HashMap::new())), is_running: Arc::new(RwLock::new(false)), }) } /// 注册事件监听器 pub async fn register_listener( &self, chain_id: ChainId, listener: Arc, ) -> Result<()> { let mut listeners = self.listeners.write().await; listeners.insert(chain_id, listener); log::info!("Registered listener for chain: {}", chain_id); Ok(()) } /// 处理桥接事件 async fn handle_bridge_event(&self, event: BridgeEvent) -> Result<()> { match event { BridgeEvent::AssetLocked { lock_id, asset, amount, locker, target_chain, receiver, } => { log::info!( "Processing AssetLocked event: lock_id={}, amount={}", hex::encode(lock_id), amount ); // 创建跨链消息 let message = CrossChainMessage { id: lock_id, source_chain: asset.chain_id, target_chain, message_type: MessageType::Mint, asset, sender: locker, receiver, amount, timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("mainnet: handle error") .as_secs(), nonce: 0, signatures: vec![], }; // 中继消息 self.relayer.relay_message(message).await?; } BridgeEvent::AssetBurned { burn_id, asset, amount, burner, target_chain, receiver, } => { log::info!( "Processing AssetBurned event: burn_id={}, amount={}", hex::encode(burn_id), amount ); // 创建跨链消息 let message = CrossChainMessage { id: burn_id, source_chain: asset.chain_id, target_chain, message_type: MessageType::Unlock, asset, sender: burner, receiver, amount, timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .expect("mainnet: handle error") .as_secs(), nonce: 0, signatures: vec![], }; // 中继消息 self.relayer.relay_message(message).await?; } } Ok(()) } } #[async_trait] impl BridgeManager for BridgeManagerImpl { async fn initialize(&self) -> Result<()> { log::info!("Initializing bridge manager"); // 加载验证器 self.validator_pool.load_validators().await?; // TODO: 加载其他持久化数据 log::info!("Bridge manager initialized"); Ok(()) } async fn start(&self) -> Result<()> { let mut is_running = self.is_running.write().await; if *is_running { return Err(BridgeError::Other("Bridge already running".to_string())); } // 启动中继节点 self.relayer.start().await?; // 启动所有监听器 let listeners = self.listeners.read().await; for (chain_id, listener) in listeners.iter() { listener.start().await?; log::info!("Started listener for chain: {}", chain_id); } *is_running = true; log::info!("Bridge started"); Ok(()) } async fn stop(&self) -> Result<()> { let mut is_running = self.is_running.write().await; if !*is_running { return Err(BridgeError::Other("Bridge not running".to_string())); } // 停止所有监听器 let listeners = self.listeners.read().await; for (chain_id, listener) in listeners.iter() { listener.stop().await?; log::info!("Stopped listener for chain: {}", chain_id); } // 停止中继节点 self.relayer.stop().await?; *is_running = false; log::info!("Bridge stopped"); Ok(()) } async fn get_config(&self) -> Result { let config = self.config.read().await; Ok(config.clone()) } async fn update_config(&self, new_config: BridgeConfig) -> Result<()> { let mut config = self.config.write().await; *config = new_config; log::info!("Bridge config updated"); Ok(()) } async fn pause(&self) -> Result<()> { let mut config = self.config.write().await; config.is_paused = true; log::info!("Bridge paused"); Ok(()) } async fn resume(&self) -> Result<()> { let mut config = self.config.write().await; config.is_paused = false; log::info!("Bridge resumed"); Ok(()) } async fn get_status(&self) -> Result { let is_running = *self.is_running.read().await; let config = self.config.read().await; let active_validators = self.validator_pool.get_active_validators().await?.len(); Ok(BridgeStatus { is_running, is_paused: config.is_paused, total_locked: 0, // TODO: 计算实际值 total_unlocked: 0, // TODO: 计算实际值 active_validators, active_relayers: 1, // TODO: 支持多个中继节点 }) } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_initialize_bridge() { let manager = BridgeManagerImpl::new("/tmp/test_bridge_db").expect("mainnet: handle error"); manager.initialize().await.expect("mainnet: handle error"); } #[tokio::test] async fn test_start_stop_bridge() { let manager = BridgeManagerImpl::new("/tmp/test_bridge_db2").expect("mainnet: handle error"); manager.initialize().await.expect("mainnet: handle error"); manager.start().await.expect("mainnet: handle error"); assert!(*manager.is_running.read().await); manager.stop().await.expect("mainnet: handle error"); assert!(!*manager.is_running.read().await); } #[tokio::test] async fn test_pause_resume_bridge() { let manager = BridgeManagerImpl::new("/tmp/test_bridge_db3").expect("mainnet: handle error"); manager.initialize().await.expect("mainnet: handle error"); manager.pause().await.expect("mainnet: handle error"); let config = manager.get_config().await.expect("mainnet: handle error"); assert!(config.is_paused); manager.resume().await.expect("mainnet: handle error"); let config = manager.get_config().await.expect("mainnet: handle error"); assert!(!config.is_paused); } #[tokio::test] async fn test_get_status() { let manager = BridgeManagerImpl::new("/tmp/test_bridge_db4").expect("mainnet: handle error"); manager.initialize().await.expect("mainnet: handle error"); let status = manager.get_status().await.expect("mainnet: handle error"); assert!(!status.is_running); assert!(!status.is_paused); } }