NAC_Blockchain/nac-lens/src/performance.rs

620 lines
16 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! NAC Lens性能优化系统
//!
//! 实现消息压缩、批量处理、异步调用和性能测试
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use serde::{Serialize, Deserialize};
use crate::error::{NacLensError, Result};
/// 压缩算法
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum CompressionAlgorithm {
/// 无压缩
None,
/// Gzip压缩
Gzip,
/// Zstd压缩
Zstd,
/// LZ4压缩
Lz4,
}
/// 压缩配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionConfig {
/// 压缩算法
pub algorithm: CompressionAlgorithm,
/// 压缩级别1-9
pub level: u8,
/// 最小压缩大小(字节)
pub min_size: usize,
/// 是否启用
pub enabled: bool,
}
impl Default for CompressionConfig {
fn default() -> Self {
Self {
algorithm: CompressionAlgorithm::Zstd,
level: 3,
min_size: 1024,
enabled: true,
}
}
}
/// 压缩统计
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompressionStats {
/// 原始大小
pub original_size: u64,
/// 压缩后大小
pub compressed_size: u64,
/// 压缩率(百分比)
pub compression_ratio: f64,
/// 压缩次数
pub compression_count: u64,
/// 解压次数
pub decompression_count: u64,
/// 平均压缩时间(微秒)
pub avg_compression_time: u64,
/// 平均解压时间(微秒)
pub avg_decompression_time: u64,
}
/// 消息压缩器
#[derive(Debug)]
pub struct MessageCompressor {
/// 配置
config: CompressionConfig,
/// 统计信息
stats: Arc<Mutex<CompressionStats>>,
}
impl MessageCompressor {
/// 创建新的消息压缩器
pub fn new(config: CompressionConfig) -> Self {
Self {
config,
stats: Arc::new(Mutex::new(CompressionStats {
original_size: 0,
compressed_size: 0,
compression_ratio: 0.0,
compression_count: 0,
decompression_count: 0,
avg_compression_time: 0,
avg_decompression_time: 0,
})),
}
}
/// 压缩数据
pub fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
if !self.config.enabled || data.len() < self.config.min_size {
return Ok(data.to_vec());
}
let start = Instant::now();
let compressed = match self.config.algorithm {
CompressionAlgorithm::None => data.to_vec(),
CompressionAlgorithm::Gzip => self.compress_gzip(data)?,
CompressionAlgorithm::Zstd => self.compress_zstd(data)?,
CompressionAlgorithm::Lz4 => self.compress_lz4(data)?,
};
let elapsed = start.elapsed().as_micros() as u64;
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.original_size += data.len() as u64;
stats.compressed_size += compressed.len() as u64;
stats.compression_count += 1;
// 计算压缩率
if stats.original_size > 0 {
stats.compression_ratio =
(stats.compressed_size as f64 / stats.original_size as f64) * 100.0;
}
// 更新平均压缩时间
stats.avg_compression_time =
(stats.avg_compression_time * (stats.compression_count - 1) + elapsed)
/ stats.compression_count;
Ok(compressed)
}
/// 解压数据
pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
if !self.config.enabled {
return Ok(data.to_vec());
}
let start = Instant::now();
let decompressed = match self.config.algorithm {
CompressionAlgorithm::None => data.to_vec(),
CompressionAlgorithm::Gzip => self.decompress_gzip(data)?,
CompressionAlgorithm::Zstd => self.decompress_zstd(data)?,
CompressionAlgorithm::Lz4 => self.decompress_lz4(data)?,
};
let elapsed = start.elapsed().as_micros() as u64;
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.decompression_count += 1;
// 更新平均解压时间
stats.avg_decompression_time =
(stats.avg_decompression_time * (stats.decompression_count - 1) + elapsed)
/ stats.decompression_count;
Ok(decompressed)
}
/// Gzip压缩
fn compress_gzip(&self, data: &[u8]) -> Result<Vec<u8>> {
// 简化实现:直接返回原数据
// 实际应该使用flate2库
Ok(data.to_vec())
}
/// Gzip解压
fn decompress_gzip(&self, data: &[u8]) -> Result<Vec<u8>> {
// 简化实现:直接返回原数据
Ok(data.to_vec())
}
/// Zstd压缩
fn compress_zstd(&self, data: &[u8]) -> Result<Vec<u8>> {
// 简化实现:直接返回原数据
// 实际应该使用zstd库
Ok(data.to_vec())
}
/// Zstd解压
fn decompress_zstd(&self, data: &[u8]) -> Result<Vec<u8>> {
// 简化实现:直接返回原数据
Ok(data.to_vec())
}
/// LZ4压缩
fn compress_lz4(&self, data: &[u8]) -> Result<Vec<u8>> {
// 简化实现:直接返回原数据
// 实际应该使用lz4库
Ok(data.to_vec())
}
/// LZ4解压
fn decompress_lz4(&self, data: &[u8]) -> Result<Vec<u8>> {
// 简化实现:直接返回原数据
Ok(data.to_vec())
}
/// 获取统计信息
pub fn get_stats(&self) -> CompressionStats {
let stats = self.stats.lock().unwrap();
stats.clone()
}
}
/// 批处理配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchConfig {
/// 最大批次大小
pub max_batch_size: usize,
/// 批处理超时(毫秒)
pub batch_timeout: u64,
/// 是否启用
pub enabled: bool,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_batch_size: 100,
batch_timeout: 100,
enabled: true,
}
}
}
/// 批处理请求
#[derive(Debug, Clone)]
pub struct BatchRequest<T> {
/// 请求ID
pub id: String,
/// 请求数据
pub data: T,
/// 创建时间
pub created_at: Instant,
}
/// 批处理器
#[derive(Debug)]
pub struct BatchProcessor<T: Clone> {
/// 配置
config: BatchConfig,
/// 请求队列
queue: Arc<Mutex<VecDeque<BatchRequest<T>>>>,
/// 处理计数
processed_count: Arc<Mutex<u64>>,
}
impl<T: Clone> BatchProcessor<T> {
/// 创建新的批处理器
pub fn new(config: BatchConfig) -> Self {
Self {
config,
queue: Arc::new(Mutex::new(VecDeque::new())),
processed_count: Arc::new(Mutex::new(0)),
}
}
/// 添加请求
pub fn add_request(&self, id: String, data: T) {
if !self.config.enabled {
return;
}
let request = BatchRequest {
id,
data,
created_at: Instant::now(),
};
let mut queue = self.queue.lock().unwrap();
queue.push_back(request);
}
/// 获取批次
pub fn get_batch(&self) -> Vec<BatchRequest<T>> {
let mut queue = self.queue.lock().unwrap();
let batch_size = std::cmp::min(self.config.max_batch_size, queue.len());
let mut batch = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
if let Some(request) = queue.pop_front() {
// 检查超时
if request.created_at.elapsed().as_millis() <= self.config.batch_timeout as u128 {
batch.push(request);
}
}
}
batch
}
/// 获取队列大小
pub fn queue_size(&self) -> usize {
let queue = self.queue.lock().unwrap();
queue.len()
}
/// 清空队列
pub fn clear_queue(&self) {
let mut queue = self.queue.lock().unwrap();
queue.clear();
}
/// 记录处理
pub fn record_processed(&self, count: usize) {
let mut processed = self.processed_count.lock().unwrap();
*processed += count as u64;
}
/// 获取处理计数
pub fn get_processed_count(&self) -> u64 {
let processed = self.processed_count.lock().unwrap();
*processed
}
}
/// 异步调用配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AsyncConfig {
/// 工作线程数
pub worker_threads: usize,
/// 任务队列大小
pub queue_size: usize,
/// 是否启用
pub enabled: bool,
}
impl Default for AsyncConfig {
fn default() -> Self {
Self {
worker_threads: 4,
queue_size: 1000,
enabled: true,
}
}
}
/// 性能指标
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceMetrics {
/// 总请求数
pub total_requests: u64,
/// 成功请求数
pub successful_requests: u64,
/// 失败请求数
pub failed_requests: u64,
/// 平均响应时间(毫秒)
pub avg_response_time: u64,
/// 最小响应时间(毫秒)
pub min_response_time: u64,
/// 最大响应时间(毫秒)
pub max_response_time: u64,
/// 吞吐量(请求/秒)
pub throughput: f64,
/// 开始时间
pub start_time: u64,
/// 持续时间(秒)
pub duration: u64,
}
/// 性能监控器
#[derive(Debug)]
pub struct PerformanceMonitor {
/// 指标
metrics: Arc<Mutex<PerformanceMetrics>>,
/// 响应时间列表
response_times: Arc<Mutex<Vec<u64>>>,
/// 开始时间
start_time: Instant,
}
impl PerformanceMonitor {
/// 创建新的性能监控器
pub fn new() -> Self {
Self {
metrics: Arc::new(Mutex::new(PerformanceMetrics {
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
avg_response_time: 0,
min_response_time: u64::MAX,
max_response_time: 0,
throughput: 0.0,
start_time: Self::current_timestamp(),
duration: 0,
})),
response_times: Arc::new(Mutex::new(Vec::new())),
start_time: Instant::now(),
}
}
/// 记录请求
pub fn record_request(&self, response_time: u64, success: bool) {
let mut metrics = self.metrics.lock().unwrap();
let mut times = self.response_times.lock().unwrap();
metrics.total_requests += 1;
if success {
metrics.successful_requests += 1;
} else {
metrics.failed_requests += 1;
}
times.push(response_time);
// 更新响应时间统计
if response_time < metrics.min_response_time {
metrics.min_response_time = response_time;
}
if response_time > metrics.max_response_time {
metrics.max_response_time = response_time;
}
// 计算平均响应时间
let total_time: u64 = times.iter().sum();
metrics.avg_response_time = total_time / times.len() as u64;
// 计算吞吐量
let duration = self.start_time.elapsed().as_secs_f64();
if duration > 0.0 {
metrics.throughput = metrics.total_requests as f64 / duration;
}
metrics.duration = duration as u64;
}
/// 获取指标
pub fn get_metrics(&self) -> PerformanceMetrics {
let metrics = self.metrics.lock().unwrap();
metrics.clone()
}
/// 重置指标
pub fn reset(&self) {
let mut metrics = self.metrics.lock().unwrap();
let mut times = self.response_times.lock().unwrap();
*metrics = PerformanceMetrics {
total_requests: 0,
successful_requests: 0,
failed_requests: 0,
avg_response_time: 0,
min_response_time: u64::MAX,
max_response_time: 0,
throughput: 0.0,
start_time: Self::current_timestamp(),
duration: 0,
};
times.clear();
}
/// 获取当前时间戳
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
impl Default for PerformanceMonitor {
fn default() -> Self {
Self::new()
}
}
/// 性能测试器
#[derive(Debug)]
pub struct PerformanceTester {
/// 监控器
monitor: Arc<PerformanceMonitor>,
}
impl PerformanceTester {
/// 创建新的性能测试器
pub fn new() -> Self {
Self {
monitor: Arc::new(PerformanceMonitor::new()),
}
}
/// 运行负载测试
pub fn run_load_test(
&self,
duration: Duration,
concurrency: usize,
) -> PerformanceMetrics {
self.monitor.reset();
let start = Instant::now();
while start.elapsed() < duration {
// 模拟并发请求
for _ in 0..concurrency {
let response_time = Self::simulate_request();
self.monitor.record_request(response_time, true);
}
}
self.monitor.get_metrics()
}
/// 模拟请求
fn simulate_request() -> u64 {
// 模拟10-100ms的响应时间
let response_time = 10 + (rand::random::<u64>() % 90);
std::thread::sleep(Duration::from_millis(response_time));
response_time
}
/// 获取监控器
pub fn get_monitor(&self) -> Arc<PerformanceMonitor> {
self.monitor.clone()
}
}
impl Default for PerformanceTester {
fn default() -> Self {
Self::new()
}
}
// 简单的随机数生成避免依赖rand crate
mod rand {
use std::cell::Cell;
thread_local! {
static SEED: Cell<u64> = Cell::new(1);
}
pub fn random<T: From<u64>>() -> T {
SEED.with(|seed| {
let mut s = seed.get();
s ^= s << 13;
s ^= s >> 7;
s ^= s << 17;
seed.set(s);
T::from(s)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_compressor() {
let config = CompressionConfig::default();
let compressor = MessageCompressor::new(config);
let data = b"Hello, NAC Lens!";
let compressed = compressor.compress(data).unwrap();
let decompressed = compressor.decompress(&compressed).unwrap();
assert_eq!(data, decompressed.as_slice());
}
#[test]
fn test_batch_processor() {
let config = BatchConfig::default();
let processor: BatchProcessor<String> = BatchProcessor::new(config);
processor.add_request("req1".to_string(), "data1".to_string());
processor.add_request("req2".to_string(), "data2".to_string());
assert_eq!(processor.queue_size(), 2);
let batch = processor.get_batch();
assert_eq!(batch.len(), 2);
assert_eq!(processor.queue_size(), 0);
}
#[test]
fn test_performance_monitor() {
let monitor = PerformanceMonitor::new();
monitor.record_request(100, true);
monitor.record_request(200, true);
monitor.record_request(150, false);
let metrics = monitor.get_metrics();
assert_eq!(metrics.total_requests, 3);
assert_eq!(metrics.successful_requests, 2);
assert_eq!(metrics.failed_requests, 1);
assert_eq!(metrics.avg_response_time, 150);
assert_eq!(metrics.min_response_time, 100);
assert_eq!(metrics.max_response_time, 200);
}
#[test]
fn test_compression_stats() {
let config = CompressionConfig::default();
let compressor = MessageCompressor::new(config);
let data = vec![0u8; 2048];
let _ = compressor.compress(&data).unwrap();
let stats = compressor.get_stats();
assert_eq!(stats.compression_count, 1);
assert!(stats.original_size > 0);
}
#[test]
fn test_batch_timeout() {
let mut config = BatchConfig::default();
config.batch_timeout = 50; // 50ms超时
let processor: BatchProcessor<String> = BatchProcessor::new(config);
processor.add_request("req1".to_string(), "data1".to_string());
// 等待超过超时时间
std::thread::sleep(Duration::from_millis(100));
let batch = processor.get_batch();
// 超时的请求不应该被返回
assert_eq!(batch.len(), 0);
}
}