NAC_Blockchain/nac-integration-tests/tests/performance/concurrent_test.rs

233 lines
6.5 KiB
Rust

/// 性能测试:并发测试
///
/// 测试系统在高并发下的表现
use nac_integration_tests::common::*;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::task;
#[tokio::test]
async fn test_concurrent_transactions() {
init_test_env();
let concurrent_users = 100;
let tx_per_user = 100;
let success_count = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
let start = std::time::Instant::now();
for user_id in 0..concurrent_users {
let success_count = Arc::clone(&success_count);
let handle = task::spawn(async move {
for i in 0..tx_per_user {
let tx = create_test_transaction(
(user_id % 50) as u8,
((user_id + 1) % 50) as u8,
100,
);
// 模拟交易处理
if tx.amount > 0 {
success_count.fetch_add(1, Ordering::Relaxed);
}
}
});
handles.push(handle);
}
// 等待所有任务完成
for handle in handles {
handle.await.expect("mainnet: handle error");
}
let duration = start.elapsed();
let total_tx = success_count.load(Ordering::Relaxed);
let tps = perf::calculate_tps(total_tx as usize, duration);
log::info!("Concurrent users: {}", concurrent_users);
log::info!("Total transactions: {}", total_tx);
log::info!("Duration: {:?}", duration);
log::info!("TPS: {:.2}", tps);
assert_eq!(total_tx, (concurrent_users * tx_per_user) as u64);
log::info!("Concurrent transactions test passed");
}
#[tokio::test]
async fn test_concurrent_contract_calls() {
init_test_env();
let concurrent_callers = 50;
let calls_per_caller = 50;
let success_count = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
for caller_id in 0..concurrent_callers {
let success_count = Arc::clone(&success_count);
let handle = task::spawn(async move {
let caller = Address::from_index(caller_id as u8);
let contract = Address::from_index(100);
for nonce in 0..calls_per_caller {
let tx = TestTransaction::new(caller.clone(), contract.clone(), 0, nonce);
if tx.amount == 0 {
success_count.fetch_add(1, Ordering::Relaxed);
}
}
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("mainnet: handle error");
}
let total_calls = success_count.load(Ordering::Relaxed);
assert_eq!(total_calls, (concurrent_callers * calls_per_caller) as u64);
log::info!("Concurrent contract calls test passed");
}
#[tokio::test]
async fn test_read_write_contention() {
init_test_env();
let readers = 80;
let writers = 20;
let read_count = Arc::new(AtomicU64::new(0));
let write_count = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
// 启动读线程
for _ in 0..readers {
let read_count = Arc::clone(&read_count);
let handle = task::spawn(async move {
for _ in 0..100 {
// 模拟读操作
let _balance = 1000u64;
read_count.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
// 启动写线程
for _ in 0..writers {
let write_count = Arc::clone(&write_count);
let handle = task::spawn(async move {
for _ in 0..100 {
// 模拟写操作
let _new_balance = 900u64;
write_count.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("mainnet: handle error");
}
let total_reads = read_count.load(Ordering::Relaxed);
let total_writes = write_count.load(Ordering::Relaxed);
log::info!("Total reads: {}", total_reads);
log::info!("Total writes: {}", total_writes);
assert_eq!(total_reads, (readers * 100) as u64);
assert_eq!(total_writes, (writers * 100) as u64);
log::info!("Read-write contention test passed");
}
#[tokio::test]
async fn test_connection_pool_performance() {
init_test_env();
let max_connections = 100;
let concurrent_requests = 200;
let active_connections = Arc::new(AtomicU64::new(0));
let completed_requests = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
for _ in 0..concurrent_requests {
let active_connections = Arc::clone(&active_connections);
let completed_requests = Arc::clone(&completed_requests);
let handle = task::spawn(async move {
// 获取连接
let current = active_connections.fetch_add(1, Ordering::Relaxed);
// 验证不超过最大连接数
assert!(current < max_connections as u64);
// 模拟请求处理
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
// 释放连接
active_connections.fetch_sub(1, Ordering::Relaxed);
completed_requests.fetch_add(1, Ordering::Relaxed);
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("mainnet: handle error");
}
let completed = completed_requests.load(Ordering::Relaxed);
assert_eq!(completed, concurrent_requests as u64);
log::info!("Connection pool performance test passed");
}
#[tokio::test]
async fn test_lock_contention() {
init_test_env();
let threads = 50;
let operations_per_thread = 100;
let counter = Arc::new(AtomicU64::new(0));
let mut handles = Vec::new();
for _ in 0..threads {
let counter = Arc::clone(&counter);
let handle = task::spawn(async move {
for _ in 0..operations_per_thread {
// 原子操作,避免锁竞争
counter.fetch_add(1, Ordering::Relaxed);
}
});
handles.push(handle);
}
for handle in handles {
handle.await.expect("mainnet: handle error");
}
let final_count = counter.load(Ordering::Relaxed);
assert_eq!(final_count, (threads * operations_per_thread) as u64);
log::info!("Lock contention test passed");
}