NAC_Blockchain/nac-ftan/src/lib.rs

331 lines
8.9 KiB
Rust

//! NAC FTAN - 碎片化交易聚合网络
//! Fragmented Transaction Aggregation Network
//!
//! 基于CSNP V2.0技术白皮书第4章"FTAN"
use nac_udm::primitives::{Address, Hash};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum FtanError {
#[error("Transaction not found: {0:?}")]
TransactionNotFound(Hash),
#[error("Pool full: {0}")]
PoolFull(String),
#[error("Aggregation failed: {0}")]
AggregationFailed(String),
#[error("Invalid signature")]
InvalidSignature,
}
/// 交易碎片
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionFragment {
/// 原始交易哈希
pub tx_hash: Hash,
/// 碎片索引
pub fragment_index: u32,
/// 总碎片数
pub total_fragments: u32,
/// 碎片数据
pub data: Vec<u8>,
/// 发送者地址
pub sender: Address,
/// 签名
pub signature: Vec<u8>,
/// 时间戳
pub timestamp: u64,
}
/// 聚合池
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AggregationPool {
/// 池ID
pub pool_id: String,
/// 池中的交易碎片
pub fragments: HashMap<Hash, Vec<TransactionFragment>>,
/// 最大池大小
pub max_size: usize,
/// 聚合超时(毫秒)
pub timeout_ms: u64,
/// 创建时间
pub created_at: u64,
}
impl AggregationPool {
/// 创建新的聚合池
pub fn new(pool_id: String, max_size: usize, timeout_ms: u64, timestamp: u64) -> Self {
Self {
pool_id,
fragments: HashMap::new(),
max_size,
timeout_ms,
created_at: timestamp,
}
}
/// 添加交易碎片
pub fn add_fragment(&mut self, fragment: TransactionFragment) -> Result<(), FtanError> {
if self.fragments.len() >= self.max_size {
return Err(FtanError::PoolFull(self.pool_id.clone()));
}
self.fragments
.entry(fragment.tx_hash)
.or_insert_with(Vec::new)
.push(fragment);
Ok(())
}
/// 检查交易是否完整
pub fn is_transaction_complete(&self, tx_hash: &Hash) -> bool {
if let Some(fragments) = self.fragments.get(tx_hash) {
if fragments.is_empty() {
return false;
}
let total = fragments[0].total_fragments as usize;
if fragments.len() != total {
return false;
}
// 检查所有碎片索引是否连续
let mut indices: Vec<u32> = fragments.iter().map(|f| f.fragment_index).collect();
indices.sort_unstable();
for (i, &idx) in indices.iter().enumerate() {
if idx != i as u32 {
return false;
}
}
true
} else {
false
}
}
/// 聚合交易
pub fn aggregate_transaction(&self, tx_hash: &Hash) -> Result<Vec<u8>, FtanError> {
let fragments = self.fragments
.get(tx_hash)
.ok_or(FtanError::TransactionNotFound(*tx_hash))?;
if !self.is_transaction_complete(tx_hash) {
return Err(FtanError::AggregationFailed(
"Transaction incomplete".to_string()
));
}
// 按索引排序
let mut sorted_fragments = fragments.clone();
sorted_fragments.sort_by_key(|f| f.fragment_index);
// 合并数据
let mut aggregated_data = Vec::new();
for fragment in sorted_fragments {
aggregated_data.extend_from_slice(&fragment.data);
}
Ok(aggregated_data)
}
/// 获取池中交易数量
pub fn transaction_count(&self) -> usize {
self.fragments.len()
}
/// 获取池中碎片总数
pub fn fragment_count(&self) -> usize {
self.fragments.values().map(|v| v.len()).sum()
}
/// 检查池是否超时
pub fn is_expired(&self, current_time: u64) -> bool {
current_time - self.created_at > self.timeout_ms
}
}
/// FTAN聚合节点
pub struct FtanAggregator {
/// 节点ID
node_id: String,
/// 聚合池
pools: HashMap<String, AggregationPool>,
/// 默认池大小
default_pool_size: usize,
/// 默认超时
default_timeout_ms: u64,
/// 已聚合的交易数
aggregated_count: u64,
}
impl FtanAggregator {
/// 创建新的FTAN聚合节点
pub fn new(node_id: String) -> Self {
Self {
node_id,
pools: HashMap::new(),
default_pool_size: 100, // 默认100个交易
default_timeout_ms: 500, // 默认500ms超时
aggregated_count: 0,
}
}
/// 创建新的聚合池
pub fn create_pool(&mut self, pool_id: String, timestamp: u64) -> &mut AggregationPool {
let pool = AggregationPool::new(
pool_id.clone(),
self.default_pool_size,
self.default_timeout_ms,
timestamp,
);
self.pools.insert(pool_id.clone(), pool);
self.pools.get_mut(&pool_id).unwrap()
}
/// 接收交易碎片
pub fn receive_fragment(
&mut self,
pool_id: &str,
fragment: TransactionFragment,
timestamp: u64,
) -> Result<(), FtanError> {
// 如果池不存在,创建新池
if !self.pools.contains_key(pool_id) {
self.create_pool(pool_id.to_string(), timestamp);
}
let pool = self.pools.get_mut(pool_id).unwrap();
pool.add_fragment(fragment)?;
Ok(())
}
/// 尝试聚合完整的交易
pub fn try_aggregate(
&mut self,
pool_id: &str,
tx_hash: &Hash,
) -> Result<Option<Vec<u8>>, FtanError> {
let pool = self.pools
.get(pool_id)
.ok_or(FtanError::PoolFull(pool_id.to_string()))?;
if pool.is_transaction_complete(tx_hash) {
let data = pool.aggregate_transaction(tx_hash)?;
self.aggregated_count += 1;
Ok(Some(data))
} else {
Ok(None)
}
}
/// 清理超时的池
pub fn cleanup_expired_pools(&mut self, current_time: u64) {
self.pools.retain(|_, pool| !pool.is_expired(current_time));
}
/// 获取节点统计信息
pub fn get_stats(&self) -> FtanStats {
FtanStats {
node_id: self.node_id.clone(),
active_pools: self.pools.len(),
total_fragments: self.pools.values().map(|p| p.fragment_count()).sum(),
aggregated_count: self.aggregated_count,
}
}
/// 获取节点ID
pub fn node_id(&self) -> &str {
&self.node_id
}
}
/// FTAN统计信息
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FtanStats {
pub node_id: String,
pub active_pools: usize,
pub total_fragments: usize,
pub aggregated_count: u64,
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_hash(id: u8) -> Hash {
let mut bytes = [0u8; 48];
bytes[47] = id;
Hash::from_bytes(bytes)
}
fn create_test_address(id: u8) -> Address {
let mut bytes = [0u8; 32];
bytes[31] = id;
Address::from_bytes(bytes)
}
#[test]
fn test_aggregation_pool() {
let mut pool = AggregationPool::new("test_pool".to_string(), 100, 500, 1000);
let tx_hash = create_test_hash(1);
let sender = create_test_address(1);
// 添加3个碎片
for i in 0..3 {
let fragment = TransactionFragment {
tx_hash,
fragment_index: i,
total_fragments: 3,
data: vec![i as u8; 10],
sender,
signature: vec![],
timestamp: 1000,
};
pool.add_fragment(fragment).unwrap();
}
assert!(pool.is_transaction_complete(&tx_hash));
let aggregated = pool.aggregate_transaction(&tx_hash).unwrap();
assert_eq!(aggregated.len(), 30); // 3 * 10
}
#[test]
fn test_ftan_aggregator() {
let mut aggregator = FtanAggregator::new("node1".to_string());
let tx_hash = create_test_hash(1);
let sender = create_test_address(1);
// 接收3个碎片
for i in 0..3 {
let fragment = TransactionFragment {
tx_hash,
fragment_index: i,
total_fragments: 3,
data: vec![i as u8; 10],
sender,
signature: vec![],
timestamp: 1000,
};
aggregator.receive_fragment("pool1", fragment, 1000).unwrap();
}
// 尝试聚合
let result = aggregator.try_aggregate("pool1", &tx_hash).unwrap();
assert!(result.is_some());
assert_eq!(result.unwrap().len(), 30);
let stats = aggregator.get_stats();
assert_eq!(stats.aggregated_count, 1);
}
}