use crate::types::*; use crate::locker::AssetLocker; use crate::unlocker::AssetUnlocker; use crate::validator::ValidatorPool; use async_trait::async_trait; use std::sync::Arc; use tokio::sync::RwLock; use sled::Db; /// 中继节点trait #[async_trait] pub trait Relayer: Send + Sync { /// 启动中继节点 async fn start(&self) -> Result<()>; /// 停止中继节点 async fn stop(&self) -> Result<()>; /// 中继消息 async fn relay_message(&self, message: CrossChainMessage) -> Result<()>; /// 获取中继统计信息 async fn get_stats(&self) -> Result; } /// 中继统计信息 #[derive(Debug, Clone)] pub struct RelayerStats { pub total_relayed: u64, pub successful: u64, pub failed: u64, pub pending: u64, } /// 中继节点实现 pub struct RelayerImpl { db: Arc, locker: Arc, unlocker: Arc, validator_pool: Arc, config: BridgeConfig, is_running: Arc>, stats: Arc>, } impl RelayerImpl { pub fn new( db: Arc, locker: Arc, unlocker: Arc, validator_pool: Arc, config: BridgeConfig, ) -> Self { Self { db, locker, unlocker, validator_pool, config, is_running: Arc::new(RwLock::new(false)), stats: Arc::new(RwLock::new(RelayerStats { total_relayed: 0, successful: 0, failed: 0, pending: 0, })), } } /// 验证消息 async fn validate_message(&self, message: &CrossChainMessage) -> Result<()> { // 检查桥接是否暂停 if self.config.is_paused { return Err(BridgeError::BridgePaused); } // 验证签名 if !self.validator_pool.verify_message_signatures(message).await? { return Err(BridgeError::InvalidMessage("Invalid signatures".to_string())); } // 检查消息是否已处理(防止重放攻击) if self.is_message_processed(&message.id)? { return Err(BridgeError::DuplicateMessage(message.id)); } // 验证金额限制 if message.amount > self.config.max_single_amount { return Err(BridgeError::AmountExceedsLimit { amount: message.amount, limit: self.config.max_single_amount, }); } Ok(()) } /// 检查消息是否已处理 fn is_message_processed(&self, message_id: &Hash) -> Result { let key = format!("processed:{}", hex::encode(message_id)); Ok(self.db.contains_key(key.as_bytes()) .map_err(|e| BridgeError::DatabaseError(e.to_string()))?) } /// 标记消息已处理 fn mark_message_processed(&self, message_id: &Hash) -> Result<()> { let key = format!("processed:{}", hex::encode(message_id)); self.db.insert(key.as_bytes(), b"1") .map_err(|e| BridgeError::DatabaseError(e.to_string()))?; Ok(()) } /// 处理锁定消息 async fn handle_lock_message(&self, message: &CrossChainMessage) -> Result<()> { log::info!( "Handling lock message: id={}, amount={}", hex::encode(message.id), message.amount ); // 锁定资产 let receipt = self.locker.lock_asset( message.asset.clone(), message.amount, message.target_chain, message.receiver, ).await?; // 更新锁定状态 self.locker.update_lock_status(receipt.lock_id, LockStatus::Locked).await?; // TODO: 在目标链上铸造资产 Ok(()) } /// 处理解锁消息 async fn handle_unlock_message(&self, message: &CrossChainMessage) -> Result<()> { log::info!( "Handling unlock message: id={}, amount={}", hex::encode(message.id), message.amount ); // TODO: 验证销毁证明 let proof = UnlockProof { burn_tx_hash: Hash::from([0u8; 48]), burn_block_number: 0, burn_block_hash: Hash::from([0u8; 48]), merkle_proof: vec![], signatures: message.signatures.clone(), }; // 解锁资产 let lock_id = message.id; // 简化实现 let _receipt = self.unlocker.unlock_asset(lock_id, proof).await?; Ok(()) } /// 更新统计信息 async fn update_stats(&self, success: bool) { let mut stats = self.stats.write().await; stats.total_relayed += 1; if success { stats.successful += 1; } else { stats.failed += 1; } } } #[async_trait] impl Relayer for RelayerImpl { async fn start(&self) -> Result<()> { let mut is_running = self.is_running.write().await; if *is_running { return Err(BridgeError::Other("Relayer already running".to_string())); } *is_running = true; log::info!("Relayer started"); // TODO: 启动事件监听器 // TODO: 启动消息处理循环 Ok(()) } async fn stop(&self) -> Result<()> { let mut is_running = self.is_running.write().await; if !*is_running { return Err(BridgeError::Other("Relayer not running".to_string())); } *is_running = false; log::info!("Relayer stopped"); Ok(()) } async fn relay_message(&self, message: CrossChainMessage) -> Result<()> { // 验证消息 self.validate_message(&message).await?; // 根据消息类型处理 let result = match message.message_type { MessageType::Lock => self.handle_lock_message(&message).await, MessageType::Unlock => self.handle_unlock_message(&message).await, MessageType::Mint => { // TODO: 实现铸造逻辑 Ok(()) } MessageType::Burn => { // TODO: 实现销毁逻辑 Ok(()) } }; // 更新统计信息 self.update_stats(result.is_ok()).await; // 标记消息已处理 if result.is_ok() { self.mark_message_processed(&message.id)?; } result } async fn get_stats(&self) -> Result { let stats = self.stats.read().await; Ok(stats.clone()) } } #[cfg(test)] mod tests { use super::*; use crate::locker::AssetLockerImpl; use crate::unlocker::AssetUnlockerImpl; use crate::validator::ValidatorPoolImpl; async fn create_test_relayer() -> RelayerImpl { let db = Arc::new(sled::Config::new().temporary(true).open().unwrap()); let config = BridgeConfig::default(); let locker = Arc::new(AssetLockerImpl::new(db.clone(), config.clone())); let unlocker = Arc::new(AssetUnlockerImpl::new(db.clone(), config.clone())); let validator_pool = Arc::new(ValidatorPoolImpl::new(db.clone(), config.clone())); // 添加测试验证器 let validator = ValidatorInfo { address: [1u8; 32], stake_amount: 10000 * 10u128.pow(18), reputation: 100, is_active: true, }; // 使用register_validator方法添加验证器 validator_pool.register_validator(validator).await.unwrap(); RelayerImpl::new(db, locker, unlocker, validator_pool, config) } #[tokio::test] async fn test_start_stop_relayer() { let relayer = create_test_relayer().await; relayer.start().await.unwrap(); assert!(*relayer.is_running.read().await); relayer.stop().await.unwrap(); assert!(!*relayer.is_running.read().await); } #[tokio::test] async fn test_relay_lock_message() { let relayer = create_test_relayer().await; let message = CrossChainMessage { id: Hash::from([1u8; 48]), source_chain: ChainId::NAC, target_chain: ChainId::Ethereum, message_type: MessageType::Lock, asset: AssetInfo { asset_id: Hash::from([2u8; 48]), name: "Test Token".to_string(), symbol: "TEST".to_string(), decimals: 18, chain_id: ChainId::NAC, }, sender: [3u8; 32], receiver: [4u8; 32], amount: 1000 * 10u128.pow(18), timestamp: 1234567890, nonce: 1, signatures: vec![Signature::from([5u8; 96]), Signature::from([6u8; 96])], }; relayer.relay_message(message).await.unwrap(); let stats = relayer.get_stats().await.unwrap(); assert_eq!(stats.total_relayed, 1); assert_eq!(stats.successful, 1); } #[tokio::test] async fn test_duplicate_message() { let relayer = create_test_relayer().await; let message = CrossChainMessage { id: Hash::from([1u8; 48]), source_chain: ChainId::NAC, target_chain: ChainId::Ethereum, message_type: MessageType::Lock, asset: AssetInfo { asset_id: Hash::from([2u8; 48]), name: "Test Token".to_string(), symbol: "TEST".to_string(), decimals: 18, chain_id: ChainId::NAC, }, sender: [3u8; 32], receiver: [4u8; 32], amount: 1000 * 10u128.pow(18), timestamp: 1234567890, nonce: 1, signatures: vec![Signature::from([5u8; 96]), Signature::from([6u8; 96])], }; // 第一次中继成功 relayer.relay_message(message.clone()).await.unwrap(); // 第二次中继应该失败(重复消息) let result = relayer.relay_message(message).await; assert!(matches!(result, Err(BridgeError::DuplicateMessage(_)))); } }