NAC_Blockchain/nac-cross-chain-bridge/src/manager.rs

348 lines
11 KiB
Rust

use crate::types::*;
use crate::locker::{AssetLocker, AssetLockerImpl};
use crate::unlocker::{AssetUnlocker, AssetUnlockerImpl};
use crate::validator::{ValidatorPool, ValidatorPoolImpl};
use crate::relayer::{Relayer, RelayerImpl};
use crate::eth_listener::{EventListener, EthereumListener, BridgeEvent};
use async_trait::async_trait;
use sled::Db;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
/// 桥接管理器trait
#[async_trait]
pub trait BridgeManager: Send + Sync {
/// 初始化桥接
async fn initialize(&self) -> Result<()>;
/// 启动桥接
async fn start(&self) -> Result<()>;
/// 停止桥接
async fn stop(&self) -> Result<()>;
/// 获取桥接配置
async fn get_config(&self) -> Result<BridgeConfig>;
/// 更新桥接配置
async fn update_config(&self, config: BridgeConfig) -> Result<()>;
/// 暂停桥接
async fn pause(&self) -> Result<()>;
/// 恢复桥接
async fn resume(&self) -> Result<()>;
/// 获取桥接状态
async fn get_status(&self) -> Result<BridgeStatus>;
}
/// 桥接状态
#[derive(Debug, Clone)]
pub struct BridgeStatus {
pub is_running: bool,
pub is_paused: bool,
pub total_locked: Amount,
pub total_unlocked: Amount,
pub active_validators: usize,
pub active_relayers: usize,
}
/// 桥接管理器实现
pub struct BridgeManagerImpl {
db: Arc<Db>,
config: Arc<RwLock<BridgeConfig>>,
locker: Arc<dyn AssetLocker>,
unlocker: Arc<dyn AssetUnlocker>,
validator_pool: Arc<dyn ValidatorPool>,
relayer: Arc<dyn Relayer>,
listeners: Arc<RwLock<HashMap<ChainId, Arc<dyn EventListener>>>>,
is_running: Arc<RwLock<bool>>,
}
impl BridgeManagerImpl {
pub fn new(db_path: &str) -> Result<Self> {
let db = Arc::new(
sled::open(db_path)
.map_err(|e| BridgeError::DatabaseError(e.to_string()))?
);
let config = Arc::new(RwLock::new(BridgeConfig::default()));
let config_clone = config.clone();
// 创建组件
let locker = Arc::new(AssetLockerImpl::new(
db.clone(),
config_clone.try_read().expect("mainnet: handle error").clone(),
));
let unlocker = Arc::new(AssetUnlockerImpl::new(
db.clone(),
config_clone.try_read().expect("mainnet: handle error").clone(),
));
let validator_pool = Arc::new(ValidatorPoolImpl::new(
db.clone(),
config_clone.try_read().expect("mainnet: handle error").clone(),
));
let relayer = Arc::new(RelayerImpl::new(
db.clone(),
locker.clone(),
unlocker.clone(),
validator_pool.clone(),
config_clone.try_read().expect("mainnet: handle error").clone(),
));
Ok(Self {
db,
config,
locker,
unlocker,
validator_pool,
relayer,
listeners: Arc::new(RwLock::new(HashMap::new())),
is_running: Arc::new(RwLock::new(false)),
})
}
/// 注册事件监听器
pub async fn register_listener(
&self,
chain_id: ChainId,
listener: Arc<dyn EventListener>,
) -> Result<()> {
let mut listeners = self.listeners.write().await;
listeners.insert(chain_id, listener);
log::info!("Registered listener for chain: {}", chain_id);
Ok(())
}
/// 处理桥接事件
async fn handle_bridge_event(&self, event: BridgeEvent) -> Result<()> {
match event {
BridgeEvent::AssetLocked {
lock_id,
asset,
amount,
locker,
target_chain,
receiver,
} => {
log::info!(
"Processing AssetLocked event: lock_id={}, amount={}",
hex::encode(lock_id),
amount
);
// 创建跨链消息
let message = CrossChainMessage {
id: lock_id,
source_chain: asset.chain_id,
target_chain,
message_type: MessageType::Mint,
asset,
sender: locker,
receiver,
amount,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("mainnet: handle error")
.as_secs(),
nonce: 0,
signatures: vec![],
};
// 中继消息
self.relayer.relay_message(message).await?;
}
BridgeEvent::AssetBurned {
burn_id,
asset,
amount,
burner,
target_chain,
receiver,
} => {
log::info!(
"Processing AssetBurned event: burn_id={}, amount={}",
hex::encode(burn_id),
amount
);
// 创建跨链消息
let message = CrossChainMessage {
id: burn_id,
source_chain: asset.chain_id,
target_chain,
message_type: MessageType::Unlock,
asset,
sender: burner,
receiver,
amount,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("mainnet: handle error")
.as_secs(),
nonce: 0,
signatures: vec![],
};
// 中继消息
self.relayer.relay_message(message).await?;
}
}
Ok(())
}
}
#[async_trait]
impl BridgeManager for BridgeManagerImpl {
async fn initialize(&self) -> Result<()> {
log::info!("Initializing bridge manager");
// 加载验证器
self.validator_pool.load_validators().await?;
// TODO: 加载其他持久化数据
log::info!("Bridge manager initialized");
Ok(())
}
async fn start(&self) -> Result<()> {
let mut is_running = self.is_running.write().await;
if *is_running {
return Err(BridgeError::Other("Bridge already running".to_string()));
}
// 启动中继节点
self.relayer.start().await?;
// 启动所有监听器
let listeners = self.listeners.read().await;
for (chain_id, listener) in listeners.iter() {
listener.start().await?;
log::info!("Started listener for chain: {}", chain_id);
}
*is_running = true;
log::info!("Bridge started");
Ok(())
}
async fn stop(&self) -> Result<()> {
let mut is_running = self.is_running.write().await;
if !*is_running {
return Err(BridgeError::Other("Bridge not running".to_string()));
}
// 停止所有监听器
let listeners = self.listeners.read().await;
for (chain_id, listener) in listeners.iter() {
listener.stop().await?;
log::info!("Stopped listener for chain: {}", chain_id);
}
// 停止中继节点
self.relayer.stop().await?;
*is_running = false;
log::info!("Bridge stopped");
Ok(())
}
async fn get_config(&self) -> Result<BridgeConfig> {
let config = self.config.read().await;
Ok(config.clone())
}
async fn update_config(&self, new_config: BridgeConfig) -> Result<()> {
let mut config = self.config.write().await;
*config = new_config;
log::info!("Bridge config updated");
Ok(())
}
async fn pause(&self) -> Result<()> {
let mut config = self.config.write().await;
config.is_paused = true;
log::info!("Bridge paused");
Ok(())
}
async fn resume(&self) -> Result<()> {
let mut config = self.config.write().await;
config.is_paused = false;
log::info!("Bridge resumed");
Ok(())
}
async fn get_status(&self) -> Result<BridgeStatus> {
let is_running = *self.is_running.read().await;
let config = self.config.read().await;
let active_validators = self.validator_pool.get_active_validators().await?.len();
Ok(BridgeStatus {
is_running,
is_paused: config.is_paused,
total_locked: 0, // TODO: 计算实际值
total_unlocked: 0, // TODO: 计算实际值
active_validators,
active_relayers: 1, // TODO: 支持多个中继节点
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_initialize_bridge() {
let manager = BridgeManagerImpl::new("/tmp/test_bridge_db").expect("mainnet: handle error");
manager.initialize().await.expect("mainnet: handle error");
}
#[tokio::test]
async fn test_start_stop_bridge() {
let manager = BridgeManagerImpl::new("/tmp/test_bridge_db2").expect("mainnet: handle error");
manager.initialize().await.expect("mainnet: handle error");
manager.start().await.expect("mainnet: handle error");
assert!(*manager.is_running.read().await);
manager.stop().await.expect("mainnet: handle error");
assert!(!*manager.is_running.read().await);
}
#[tokio::test]
async fn test_pause_resume_bridge() {
let manager = BridgeManagerImpl::new("/tmp/test_bridge_db3").expect("mainnet: handle error");
manager.initialize().await.expect("mainnet: handle error");
manager.pause().await.expect("mainnet: handle error");
let config = manager.get_config().await.expect("mainnet: handle error");
assert!(config.is_paused);
manager.resume().await.expect("mainnet: handle error");
let config = manager.get_config().await.expect("mainnet: handle error");
assert!(!config.is_paused);
}
#[tokio::test]
async fn test_get_status() {
let manager = BridgeManagerImpl::new("/tmp/test_bridge_db4").expect("mainnet: handle error");
manager.initialize().await.expect("mainnet: handle error");
let status = manager.get_status().await.expect("mainnet: handle error");
assert!(!status.is_running);
assert!(!status.is_paused);
}
}