NAC_Blockchain/nac-udm/src/l1_protocol/cross_shard_transaction.rs

720 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

///! 跨分片交易处理模块
///!
///! 处理跨分片的资产转移和证书验证
///!
///! **NAC原生设计原则**
///! - 使用Asset资产不是Token
///! - 使用Certificate证书不是Contract
///! - 使用SHA3-384哈希不是SHA256/Keccak256
///! - 通过CBPP共识协调
///! - 使用Constitutional Receipt验证
use crate::primitives::{Address, Hash};
use std::collections::{HashMap, VecDeque};
use serde::{Deserialize, Serialize};
/// 跨分片交易
#[derive(Debug, Clone, Serialize, Deserialize)]
/// CrossShardTransaction
pub struct CrossShardTransaction {
/// 交易ID
pub tx_id: Hash,
/// 源分片ID
pub source_shard: u64,
/// 目标分片ID
pub target_shard: u64,
/// 发送者地址
pub sender: Address,
/// 接收者地址
pub receiver: Address,
/// 资产类型GNACS编码
pub asset_type: String,
/// 资产数量
pub amount: u64,
/// 交易状态
pub status: CrossShardTxStatus,
/// 创建时间
pub created_at: u64,
/// 锁定证明
pub lock_proof: Option<LockProof>,
/// 解锁证明
pub unlock_proof: Option<UnlockProof>,
/// Constitutional Receipt哈希
pub cr_hash: Option<Hash>,
}
impl CrossShardTransaction {
/// 创建新的跨分片交易
pub fn new(
tx_id: Hash,
source_shard: u64,
target_shard: u64,
sender: Address,
receiver: Address,
asset_type: String,
amount: u64,
created_at: u64,
) -> Self {
Self {
tx_id,
source_shard,
target_shard,
sender,
receiver,
asset_type,
amount,
status: CrossShardTxStatus::Pending,
created_at,
lock_proof: None,
unlock_proof: None,
cr_hash: None,
}
}
/// 设置锁定证明
pub fn set_lock_proof(&mut self, proof: LockProof) {
self.lock_proof = Some(proof);
self.status = CrossShardTxStatus::Locked;
}
/// 设置解锁证明
pub fn set_unlock_proof(&mut self, proof: UnlockProof) {
self.unlock_proof = Some(proof);
self.status = CrossShardTxStatus::Completed;
}
/// 设置Constitutional Receipt
pub fn set_cr_hash(&mut self, cr_hash: Hash) {
self.cr_hash = Some(cr_hash);
}
}
/// 跨分片交易状态
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
/// CrossShardTxStatus
pub enum CrossShardTxStatus {
/// 待处理
Pending,
/// 已锁定(源分片)
Locked,
/// 已铸造(目标分片)
Minted,
/// 已完成
Completed,
/// 已失败
Failed,
/// 已回滚
Rolledback,
}
/// 锁定证明
#[derive(Debug, Clone, Serialize, Deserialize)]
/// LockProof
pub struct LockProof {
/// 源分片ID
pub shard_id: u64,
/// 锁定区块哈希
pub block_hash: Hash,
/// 锁定交易哈希
pub tx_hash: Hash,
/// 锁定时间
pub locked_at: u64,
/// 验证者签名
pub validator_signatures: Vec<Vec<u8>>,
}
/// 解锁证明
#[derive(Debug, Clone, Serialize, Deserialize)]
/// UnlockProof
pub struct UnlockProof {
/// 目标分片ID
pub shard_id: u64,
/// 铸造区块哈希
pub block_hash: Hash,
/// 铸造交易哈希
pub tx_hash: Hash,
/// 铸造时间
pub minted_at: u64,
/// 验证者签名
pub validator_signatures: Vec<Vec<u8>>,
}
/// 跨分片交易处理器
#[derive(Debug, Clone)]
/// CrossShardTxProcessor
pub struct CrossShardTxProcessor {
/// 当前分片ID
shard_id: u64,
/// 待处理交易队列
pending_queue: VecDeque<CrossShardTransaction>,
/// 已锁定交易
locked_txs: HashMap<Hash, CrossShardTransaction>,
/// 已完成交易
completed_txs: HashMap<Hash, CrossShardTransaction>,
/// 失败交易
failed_txs: HashMap<Hash, (CrossShardTransaction, String)>,
/// 处理统计
stats: ProcessorStats,
}
/// 处理器统计
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
/// ProcessorStats
pub struct ProcessorStats {
/// 总交易数
pub total_txs: u64,
/// 待处理交易数
pub pending_count: u64,
/// 已锁定交易数
pub locked_count: u64,
/// 已完成交易数
pub completed_count: u64,
/// 失败交易数
pub failed_count: u64,
/// 平均处理时间(毫秒)
pub avg_processing_time: u64,
}
impl CrossShardTxProcessor {
/// 创建新的跨分片交易处理器
pub fn new(shard_id: u64) -> Self {
Self {
shard_id,
pending_queue: VecDeque::new(),
locked_txs: HashMap::new(),
completed_txs: HashMap::new(),
failed_txs: HashMap::new(),
stats: ProcessorStats::default(),
}
}
/// 提交跨分片交易
pub fn submit_transaction(&mut self, tx: CrossShardTransaction) -> Result<(), String> {
// 验证分片ID
if tx.source_shard != self.shard_id && tx.target_shard != self.shard_id {
return Err("Transaction does not involve this shard".to_string());
}
// 验证金额
if tx.amount == 0 {
return Err("Amount must be greater than zero".to_string());
}
// 添加到待处理队列
self.pending_queue.push_back(tx);
self.stats.total_txs += 1;
self.stats.pending_count += 1;
Ok(())
}
/// 处理源分片锁定
pub fn process_lock(&mut self, tx_id: &Hash, lock_proof: LockProof) -> Result<(), String> {
// 从待处理队列中查找交易
let tx_index = self.pending_queue
.iter()
.position(|tx| tx.tx_id == *tx_id)
.ok_or("Transaction not found in pending queue")?;
let mut tx = self.pending_queue.remove(tx_index).unwrap();
// 验证是否为源分片
if tx.source_shard != self.shard_id {
return Err("This shard is not the source shard".to_string());
}
// 验证锁定证明
self.verify_lock_proof(&lock_proof)?;
// 设置锁定证明
tx.set_lock_proof(lock_proof);
// 移动到已锁定交易
self.locked_txs.insert(tx_id.clone(), tx);
self.stats.pending_count -= 1;
self.stats.locked_count += 1;
Ok(())
}
/// 处理目标分片铸造
pub fn process_mint(&mut self, tx_id: &Hash, unlock_proof: UnlockProof) -> Result<(), String> {
// 从已锁定交易中查找
let mut tx = self.locked_txs
.remove(tx_id)
.ok_or("Transaction not found in locked transactions")?;
// 验证是否为目标分片
if tx.target_shard != self.shard_id {
return Err("This shard is not the target shard".to_string());
}
// 验证解锁证明
self.verify_unlock_proof(&unlock_proof)?;
// 设置解锁证明
tx.set_unlock_proof(unlock_proof);
// 移动到已完成交易
self.completed_txs.insert(tx_id.clone(), tx);
self.stats.locked_count -= 1;
self.stats.completed_count += 1;
Ok(())
}
/// 验证锁定证明
fn verify_lock_proof(&self, proof: &LockProof) -> Result<(), String> {
// 验证签名数量至少需要2/3验证者签名
if proof.validator_signatures.len() < 3 {
return Err("Insufficient validator signatures".to_string());
}
// 验证签名有效性
// 构造需要签名的消息shard_id + block_hash + tx_hash + locked_at
let mut message = Vec::new();
message.extend_from_slice(&proof.shard_id.to_le_bytes());
message.extend_from_slice(proof.block_hash.as_bytes());
message.extend_from_slice(proof.tx_hash.as_bytes());
message.extend_from_slice(&proof.locked_at.to_le_bytes());
// 验证每个签名(简化版本:只检查签名非空)
// 实际应该使用ed25519或secp256k1进行密码学验证
for sig in &proof.validator_signatures {
if sig.is_empty() {
return Err("Invalid signature: empty".to_string());
}
// 实际应该调用: Signature::from_slice(sig).verify(&message, validator_pubkey)
}
Ok(())
}
/// 验证解锁证明
fn verify_unlock_proof(&self, proof: &UnlockProof) -> Result<(), String> {
// 验证签名数量
if proof.validator_signatures.len() < 3 {
return Err("Insufficient validator signatures".to_string());
}
// 验证签名有效性
// 构造需要签名的消息shard_id + block_hash + tx_hash + minted_at
let mut message = Vec::new();
message.extend_from_slice(&proof.shard_id.to_le_bytes());
message.extend_from_slice(proof.block_hash.as_bytes());
message.extend_from_slice(proof.tx_hash.as_bytes());
message.extend_from_slice(&proof.minted_at.to_le_bytes());
// 验证每个签名(简化版本:只检查签名非空)
// 实际应该使用ed25519或secp256k1进行密码学验证
for sig in &proof.validator_signatures {
if sig.is_empty() {
return Err("Invalid signature: empty".to_string());
}
// 实际应该调用: Signature::from_slice(sig).verify(&message, validator_pubkey)
}
Ok(())
}
/// 标记交易失败
pub fn mark_failed(&mut self, tx_id: &Hash, reason: String) -> Result<(), String> {
// 从待处理队列中查找
if let Some(index) = self.pending_queue.iter().position(|tx| tx.tx_id == *tx_id) {
let mut tx = self.pending_queue.remove(index).unwrap();
tx.status = CrossShardTxStatus::Failed;
self.failed_txs.insert(tx_id.clone(), (tx, reason));
self.stats.pending_count -= 1;
self.stats.failed_count += 1;
return Ok(());
}
// 从已锁定交易中查找
if let Some(mut tx) = self.locked_txs.remove(tx_id) {
tx.status = CrossShardTxStatus::Failed;
self.failed_txs.insert(tx_id.clone(), (tx, reason));
self.stats.locked_count -= 1;
self.stats.failed_count += 1;
return Ok(());
}
Err("Transaction not found".to_string())
}
/// 回滚交易
pub fn rollback_transaction(&mut self, tx_id: &Hash) -> Result<(), String> {
// 从已锁定交易中查找
let mut tx = self.locked_txs
.remove(tx_id)
.ok_or("Transaction not found in locked transactions")?;
tx.status = CrossShardTxStatus::Rolledback;
self.failed_txs.insert(tx_id.clone(), (tx, "Rolled back".to_string()));
self.stats.locked_count -= 1;
self.stats.failed_count += 1;
Ok(())
}
/// 获取交易状态
pub fn get_transaction_status(&self, tx_id: &Hash) -> Option<CrossShardTxStatus> {
// 检查待处理队列
if let Some(tx) = self.pending_queue.iter().find(|tx| tx.tx_id == *tx_id) {
return Some(tx.status.clone());
}
// 检查已锁定交易
if let Some(tx) = self.locked_txs.get(tx_id) {
return Some(tx.status.clone());
}
// 检查已完成交易
if let Some(tx) = self.completed_txs.get(tx_id) {
return Some(tx.status.clone());
}
// 检查失败交易
if let Some((tx, _)) = self.failed_txs.get(tx_id) {
return Some(tx.status.clone());
}
None
}
/// 获取待处理交易列表
pub fn get_pending_transactions(&self) -> Vec<CrossShardTransaction> {
self.pending_queue.iter().cloned().collect()
}
/// 获取已锁定交易列表
pub fn get_locked_transactions(&self) -> Vec<CrossShardTransaction> {
self.locked_txs.values().cloned().collect()
}
/// 获取统计信息
pub fn get_stats(&self) -> ProcessorStats {
self.stats.clone()
}
/// 清理已完成交易保留最近N个
pub fn cleanup_completed(&mut self, keep_count: usize) -> usize {
if self.completed_txs.len() <= keep_count {
return 0;
}
let to_remove = self.completed_txs.len() - keep_count;
// 按时间排序,移除最旧的交易
let mut txs: Vec<_> = self.completed_txs.iter().collect();
txs.sort_by_key(|(_, tx)| tx.created_at);
// 收集要删除的交易ID
let tx_ids_to_remove: Vec<Hash> = txs.iter()
.take(to_remove)
.map(|(tx_id, _)| (*tx_id).clone())
.collect();
let mut removed = 0;
for tx_id in tx_ids_to_remove {
self.completed_txs.remove(&tx_id);
removed += 1;
}
removed
}
}
/// 跨分片路由器
#[derive(Debug, Clone)]
/// CrossShardRouter
pub struct CrossShardRouter {
/// 分片拓扑
shard_topology: HashMap<u64, Vec<u64>>,
/// 路由表
routing_table: HashMap<(u64, u64), Vec<u64>>,
}
impl CrossShardRouter {
/// 创建新的跨分片路由器
pub fn new() -> Self {
Self {
shard_topology: HashMap::new(),
routing_table: HashMap::new(),
}
}
/// 添加分片连接
pub fn add_shard_connection(&mut self, shard_a: u64, shard_b: u64) {
self.shard_topology
.entry(shard_a)
.or_insert_with(Vec::new)
.push(shard_b);
self.shard_topology
.entry(shard_b)
.or_insert_with(Vec::new)
.push(shard_a);
}
/// 计算最短路径
pub fn find_route(&mut self, source: u64, target: u64) -> Option<Vec<u64>> {
// 检查缓存
if let Some(route) = self.routing_table.get(&(source, target)) {
return Some(route.clone());
}
// BFS查找最短路径
let mut queue = VecDeque::new();
let mut visited = HashMap::new();
let mut parent = HashMap::new();
queue.push_back(source);
visited.insert(source, true);
while let Some(current) = queue.pop_front() {
if current == target {
// 重建路径
let mut path = vec![target];
let mut node = target;
while let Some(&prev) = parent.get(&node) {
path.push(prev);
node = prev;
}
path.reverse();
// 缓存路径
self.routing_table.insert((source, target), path.clone());
return Some(path);
}
if let Some(neighbors) = self.shard_topology.get(&current) {
for &neighbor in neighbors {
if !visited.contains_key(&neighbor) {
visited.insert(neighbor, true);
parent.insert(neighbor, current);
queue.push_back(neighbor);
}
}
}
}
None
}
/// 获取分片邻居
pub fn get_neighbors(&self, shard_id: u64) -> Vec<u64> {
self.shard_topology
.get(&shard_id)
.cloned()
.unwrap_or_default()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cross_shard_tx_creation() {
let tx_id = Hash::from_slice(&[1u8; 32]).unwrap();
let sender = Address::from_slice(&[1u8; 20]).unwrap();
let receiver = Address::from_slice(&[2u8; 20]).unwrap();
let tx = CrossShardTransaction::new(
tx_id,
1,
2,
sender,
receiver,
"GNACS-001".to_string(),
1000,
1000,
);
assert_eq!(tx.source_shard, 1);
assert_eq!(tx.target_shard, 2);
assert_eq!(tx.amount, 1000);
assert_eq!(tx.status, CrossShardTxStatus::Pending);
}
#[test]
fn test_processor_submit_transaction() {
let mut processor = CrossShardTxProcessor::new(1);
let tx_id = Hash::from_slice(&[1u8; 32]).unwrap();
let sender = Address::from_slice(&[1u8; 20]).unwrap();
let receiver = Address::from_slice(&[2u8; 20]).unwrap();
let tx = CrossShardTransaction::new(
tx_id,
1,
2,
sender,
receiver,
"GNACS-001".to_string(),
1000,
1000,
);
assert!(processor.submit_transaction(tx).is_ok());
assert_eq!(processor.stats.total_txs, 1);
assert_eq!(processor.stats.pending_count, 1);
}
#[test]
fn test_processor_process_lock() {
let mut processor = CrossShardTxProcessor::new(1);
let tx_id = Hash::from_slice(&[1u8; 32]).unwrap();
let sender = Address::from_slice(&[1u8; 20]).unwrap();
let receiver = Address::from_slice(&[2u8; 20]).unwrap();
let tx = CrossShardTransaction::new(
tx_id.clone(),
1,
2,
sender,
receiver,
"GNACS-001".to_string(),
1000,
1000,
);
processor.submit_transaction(tx).unwrap();
let lock_proof = LockProof {
shard_id: 1,
block_hash: Hash::from_slice(&[2u8; 32]).unwrap(),
tx_hash: tx_id.clone(),
locked_at: 1001,
validator_signatures: vec![vec![1, 2, 3]; 3],
};
assert!(processor.process_lock(&tx_id, lock_proof).is_ok());
assert_eq!(processor.stats.locked_count, 1);
assert_eq!(processor.stats.pending_count, 0);
}
#[test]
fn test_processor_mark_failed() {
let mut processor = CrossShardTxProcessor::new(1);
let tx_id = Hash::from_slice(&[1u8; 32]).unwrap();
let sender = Address::from_slice(&[1u8; 20]).unwrap();
let receiver = Address::from_slice(&[2u8; 20]).unwrap();
let tx = CrossShardTransaction::new(
tx_id.clone(),
1,
2,
sender,
receiver,
"GNACS-001".to_string(),
1000,
1000,
);
processor.submit_transaction(tx).unwrap();
assert!(processor.mark_failed(&tx_id, "Test failure".to_string()).is_ok());
assert_eq!(processor.stats.failed_count, 1);
assert_eq!(processor.stats.pending_count, 0);
}
#[test]
fn test_processor_get_status() {
let mut processor = CrossShardTxProcessor::new(1);
let tx_id = Hash::from_slice(&[1u8; 32]).unwrap();
let sender = Address::from_slice(&[1u8; 20]).unwrap();
let receiver = Address::from_slice(&[2u8; 20]).unwrap();
let tx = CrossShardTransaction::new(
tx_id.clone(),
1,
2,
sender,
receiver,
"GNACS-001".to_string(),
1000,
1000,
);
processor.submit_transaction(tx).unwrap();
let status = processor.get_transaction_status(&tx_id);
assert_eq!(status, Some(CrossShardTxStatus::Pending));
}
#[test]
fn test_router_add_connection() {
let mut router = CrossShardRouter::new();
router.add_shard_connection(1, 2);
router.add_shard_connection(2, 3);
let neighbors = router.get_neighbors(2);
assert_eq!(neighbors.len(), 2);
assert!(neighbors.contains(&1));
assert!(neighbors.contains(&3));
}
#[test]
fn test_router_find_route() {
let mut router = CrossShardRouter::new();
router.add_shard_connection(1, 2);
router.add_shard_connection(2, 3);
router.add_shard_connection(3, 4);
let route = router.find_route(1, 4);
assert_eq!(route, Some(vec![1, 2, 3, 4]));
}
#[test]
fn test_router_no_route() {
let mut router = CrossShardRouter::new();
router.add_shard_connection(1, 2);
router.add_shard_connection(3, 4);
let route = router.find_route(1, 4);
assert_eq!(route, None);
}
#[test]
fn test_processor_cleanup_completed() {
let mut processor = CrossShardTxProcessor::new(1);
// 添加5个已完成交易
for i in 0..5 {
let tx_id = Hash::from_slice(&[i; 32]).unwrap();
let sender = Address::from_slice(&[1u8; 20]).unwrap();
let receiver = Address::from_slice(&[2u8; 20]).unwrap();
let mut tx = CrossShardTransaction::new(
tx_id.clone(),
1,
2,
sender,
receiver,
"GNACS-001".to_string(),
1000,
1000 + i as u64,
);
tx.status = CrossShardTxStatus::Completed;
processor.completed_txs.insert(tx_id, tx);
}
processor.stats.completed_count = 5;
// 保留最近3个
let removed = processor.cleanup_completed(3);
assert_eq!(removed, 2);
assert_eq!(processor.completed_txs.len(), 3);
}
}