620 lines
16 KiB
Rust
620 lines
16 KiB
Rust
//! NAC Lens性能优化系统
|
||
//!
|
||
//! 实现消息压缩、批量处理、异步调用和性能测试
|
||
|
||
use std::collections::VecDeque;
|
||
use std::sync::{Arc, Mutex};
|
||
use std::time::{Duration, Instant};
|
||
use serde::{Serialize, Deserialize};
|
||
use crate::error::{Nrpc4Error, Result};
|
||
|
||
/// 压缩算法
|
||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||
pub enum CompressionAlgorithm {
|
||
/// 无压缩
|
||
None,
|
||
/// Gzip压缩
|
||
Gzip,
|
||
/// Zstd压缩
|
||
Zstd,
|
||
/// LZ4压缩
|
||
Lz4,
|
||
}
|
||
|
||
/// 压缩配置
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct CompressionConfig {
|
||
/// 压缩算法
|
||
pub algorithm: CompressionAlgorithm,
|
||
/// 压缩级别(1-9)
|
||
pub level: u8,
|
||
/// 最小压缩大小(字节)
|
||
pub min_size: usize,
|
||
/// 是否启用
|
||
pub enabled: bool,
|
||
}
|
||
|
||
impl Default for CompressionConfig {
|
||
fn default() -> Self {
|
||
Self {
|
||
algorithm: CompressionAlgorithm::Zstd,
|
||
level: 3,
|
||
min_size: 1024,
|
||
enabled: true,
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 压缩统计
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct CompressionStats {
|
||
/// 原始大小
|
||
pub original_size: u64,
|
||
/// 压缩后大小
|
||
pub compressed_size: u64,
|
||
/// 压缩率(百分比)
|
||
pub compression_ratio: f64,
|
||
/// 压缩次数
|
||
pub compression_count: u64,
|
||
/// 解压次数
|
||
pub decompression_count: u64,
|
||
/// 平均压缩时间(微秒)
|
||
pub avg_compression_time: u64,
|
||
/// 平均解压时间(微秒)
|
||
pub avg_decompression_time: u64,
|
||
}
|
||
|
||
/// 消息压缩器
|
||
#[derive(Debug)]
|
||
pub struct MessageCompressor {
|
||
/// 配置
|
||
config: CompressionConfig,
|
||
/// 统计信息
|
||
stats: Arc<Mutex<CompressionStats>>,
|
||
}
|
||
|
||
impl MessageCompressor {
|
||
/// 创建新的消息压缩器
|
||
pub fn new(config: CompressionConfig) -> Self {
|
||
Self {
|
||
config,
|
||
stats: Arc::new(Mutex::new(CompressionStats {
|
||
original_size: 0,
|
||
compressed_size: 0,
|
||
compression_ratio: 0.0,
|
||
compression_count: 0,
|
||
decompression_count: 0,
|
||
avg_compression_time: 0,
|
||
avg_decompression_time: 0,
|
||
})),
|
||
}
|
||
}
|
||
|
||
/// 压缩数据
|
||
pub fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
if !self.config.enabled || data.len() < self.config.min_size {
|
||
return Ok(data.to_vec());
|
||
}
|
||
|
||
let start = Instant::now();
|
||
|
||
let compressed = match self.config.algorithm {
|
||
CompressionAlgorithm::None => data.to_vec(),
|
||
CompressionAlgorithm::Gzip => self.compress_gzip(data)?,
|
||
CompressionAlgorithm::Zstd => self.compress_zstd(data)?,
|
||
CompressionAlgorithm::Lz4 => self.compress_lz4(data)?,
|
||
};
|
||
|
||
let elapsed = start.elapsed().as_micros() as u64;
|
||
|
||
// 更新统计
|
||
let mut stats = self.stats.lock().unwrap();
|
||
stats.original_size += data.len() as u64;
|
||
stats.compressed_size += compressed.len() as u64;
|
||
stats.compression_count += 1;
|
||
|
||
// 计算压缩率
|
||
if stats.original_size > 0 {
|
||
stats.compression_ratio =
|
||
(stats.compressed_size as f64 / stats.original_size as f64) * 100.0;
|
||
}
|
||
|
||
// 更新平均压缩时间
|
||
stats.avg_compression_time =
|
||
(stats.avg_compression_time * (stats.compression_count - 1) + elapsed)
|
||
/ stats.compression_count;
|
||
|
||
Ok(compressed)
|
||
}
|
||
|
||
/// 解压数据
|
||
pub fn decompress(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
if !self.config.enabled {
|
||
return Ok(data.to_vec());
|
||
}
|
||
|
||
let start = Instant::now();
|
||
|
||
let decompressed = match self.config.algorithm {
|
||
CompressionAlgorithm::None => data.to_vec(),
|
||
CompressionAlgorithm::Gzip => self.decompress_gzip(data)?,
|
||
CompressionAlgorithm::Zstd => self.decompress_zstd(data)?,
|
||
CompressionAlgorithm::Lz4 => self.decompress_lz4(data)?,
|
||
};
|
||
|
||
let elapsed = start.elapsed().as_micros() as u64;
|
||
|
||
// 更新统计
|
||
let mut stats = self.stats.lock().unwrap();
|
||
stats.decompression_count += 1;
|
||
|
||
// 更新平均解压时间
|
||
stats.avg_decompression_time =
|
||
(stats.avg_decompression_time * (stats.decompression_count - 1) + elapsed)
|
||
/ stats.decompression_count;
|
||
|
||
Ok(decompressed)
|
||
}
|
||
|
||
/// Gzip压缩
|
||
fn compress_gzip(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
// 简化实现:直接返回原数据
|
||
// 实际应该使用flate2库
|
||
Ok(data.to_vec())
|
||
}
|
||
|
||
/// Gzip解压
|
||
fn decompress_gzip(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
// 简化实现:直接返回原数据
|
||
Ok(data.to_vec())
|
||
}
|
||
|
||
/// Zstd压缩
|
||
fn compress_zstd(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
// 简化实现:直接返回原数据
|
||
// 实际应该使用zstd库
|
||
Ok(data.to_vec())
|
||
}
|
||
|
||
/// Zstd解压
|
||
fn decompress_zstd(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
// 简化实现:直接返回原数据
|
||
Ok(data.to_vec())
|
||
}
|
||
|
||
/// LZ4压缩
|
||
fn compress_lz4(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
// 简化实现:直接返回原数据
|
||
// 实际应该使用lz4库
|
||
Ok(data.to_vec())
|
||
}
|
||
|
||
/// LZ4解压
|
||
fn decompress_lz4(&self, data: &[u8]) -> Result<Vec<u8>> {
|
||
// 简化实现:直接返回原数据
|
||
Ok(data.to_vec())
|
||
}
|
||
|
||
/// 获取统计信息
|
||
pub fn get_stats(&self) -> CompressionStats {
|
||
let stats = self.stats.lock().unwrap();
|
||
stats.clone()
|
||
}
|
||
}
|
||
|
||
/// 批处理配置
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct BatchConfig {
|
||
/// 最大批次大小
|
||
pub max_batch_size: usize,
|
||
/// 批处理超时(毫秒)
|
||
pub batch_timeout: u64,
|
||
/// 是否启用
|
||
pub enabled: bool,
|
||
}
|
||
|
||
impl Default for BatchConfig {
|
||
fn default() -> Self {
|
||
Self {
|
||
max_batch_size: 100,
|
||
batch_timeout: 100,
|
||
enabled: true,
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 批处理请求
|
||
#[derive(Debug, Clone)]
|
||
pub struct BatchRequest<T> {
|
||
/// 请求ID
|
||
pub id: String,
|
||
/// 请求数据
|
||
pub data: T,
|
||
/// 创建时间
|
||
pub created_at: Instant,
|
||
}
|
||
|
||
/// 批处理器
|
||
#[derive(Debug)]
|
||
pub struct BatchProcessor<T: Clone> {
|
||
/// 配置
|
||
config: BatchConfig,
|
||
/// 请求队列
|
||
queue: Arc<Mutex<VecDeque<BatchRequest<T>>>>,
|
||
/// 处理计数
|
||
processed_count: Arc<Mutex<u64>>,
|
||
}
|
||
|
||
impl<T: Clone> BatchProcessor<T> {
|
||
/// 创建新的批处理器
|
||
pub fn new(config: BatchConfig) -> Self {
|
||
Self {
|
||
config,
|
||
queue: Arc::new(Mutex::new(VecDeque::new())),
|
||
processed_count: Arc::new(Mutex::new(0)),
|
||
}
|
||
}
|
||
|
||
/// 添加请求
|
||
pub fn add_request(&self, id: String, data: T) {
|
||
if !self.config.enabled {
|
||
return;
|
||
}
|
||
|
||
let request = BatchRequest {
|
||
id,
|
||
data,
|
||
created_at: Instant::now(),
|
||
};
|
||
|
||
let mut queue = self.queue.lock().unwrap();
|
||
queue.push_back(request);
|
||
}
|
||
|
||
/// 获取批次
|
||
pub fn get_batch(&self) -> Vec<BatchRequest<T>> {
|
||
let mut queue = self.queue.lock().unwrap();
|
||
|
||
let batch_size = std::cmp::min(self.config.max_batch_size, queue.len());
|
||
let mut batch = Vec::with_capacity(batch_size);
|
||
|
||
for _ in 0..batch_size {
|
||
if let Some(request) = queue.pop_front() {
|
||
// 检查超时
|
||
if request.created_at.elapsed().as_millis() <= self.config.batch_timeout as u128 {
|
||
batch.push(request);
|
||
}
|
||
}
|
||
}
|
||
|
||
batch
|
||
}
|
||
|
||
/// 获取队列大小
|
||
pub fn queue_size(&self) -> usize {
|
||
let queue = self.queue.lock().unwrap();
|
||
queue.len()
|
||
}
|
||
|
||
/// 清空队列
|
||
pub fn clear_queue(&self) {
|
||
let mut queue = self.queue.lock().unwrap();
|
||
queue.clear();
|
||
}
|
||
|
||
/// 记录处理
|
||
pub fn record_processed(&self, count: usize) {
|
||
let mut processed = self.processed_count.lock().unwrap();
|
||
*processed += count as u64;
|
||
}
|
||
|
||
/// 获取处理计数
|
||
pub fn get_processed_count(&self) -> u64 {
|
||
let processed = self.processed_count.lock().unwrap();
|
||
*processed
|
||
}
|
||
}
|
||
|
||
/// 异步调用配置
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct AsyncConfig {
|
||
/// 工作线程数
|
||
pub worker_threads: usize,
|
||
/// 任务队列大小
|
||
pub queue_size: usize,
|
||
/// 是否启用
|
||
pub enabled: bool,
|
||
}
|
||
|
||
impl Default for AsyncConfig {
|
||
fn default() -> Self {
|
||
Self {
|
||
worker_threads: 4,
|
||
queue_size: 1000,
|
||
enabled: true,
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 性能指标
|
||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||
pub struct PerformanceMetrics {
|
||
/// 总请求数
|
||
pub total_requests: u64,
|
||
/// 成功请求数
|
||
pub successful_requests: u64,
|
||
/// 失败请求数
|
||
pub failed_requests: u64,
|
||
/// 平均响应时间(毫秒)
|
||
pub avg_response_time: u64,
|
||
/// 最小响应时间(毫秒)
|
||
pub min_response_time: u64,
|
||
/// 最大响应时间(毫秒)
|
||
pub max_response_time: u64,
|
||
/// 吞吐量(请求/秒)
|
||
pub throughput: f64,
|
||
/// 开始时间
|
||
pub start_time: u64,
|
||
/// 持续时间(秒)
|
||
pub duration: u64,
|
||
}
|
||
|
||
/// 性能监控器
|
||
#[derive(Debug)]
|
||
pub struct PerformanceMonitor {
|
||
/// 指标
|
||
metrics: Arc<Mutex<PerformanceMetrics>>,
|
||
/// 响应时间列表
|
||
response_times: Arc<Mutex<Vec<u64>>>,
|
||
/// 开始时间
|
||
start_time: Instant,
|
||
}
|
||
|
||
impl PerformanceMonitor {
|
||
/// 创建新的性能监控器
|
||
pub fn new() -> Self {
|
||
Self {
|
||
metrics: Arc::new(Mutex::new(PerformanceMetrics {
|
||
total_requests: 0,
|
||
successful_requests: 0,
|
||
failed_requests: 0,
|
||
avg_response_time: 0,
|
||
min_response_time: u64::MAX,
|
||
max_response_time: 0,
|
||
throughput: 0.0,
|
||
start_time: Self::current_timestamp(),
|
||
duration: 0,
|
||
})),
|
||
response_times: Arc::new(Mutex::new(Vec::new())),
|
||
start_time: Instant::now(),
|
||
}
|
||
}
|
||
|
||
/// 记录请求
|
||
pub fn record_request(&self, response_time: u64, success: bool) {
|
||
let mut metrics = self.metrics.lock().unwrap();
|
||
let mut times = self.response_times.lock().unwrap();
|
||
|
||
metrics.total_requests += 1;
|
||
if success {
|
||
metrics.successful_requests += 1;
|
||
} else {
|
||
metrics.failed_requests += 1;
|
||
}
|
||
|
||
times.push(response_time);
|
||
|
||
// 更新响应时间统计
|
||
if response_time < metrics.min_response_time {
|
||
metrics.min_response_time = response_time;
|
||
}
|
||
if response_time > metrics.max_response_time {
|
||
metrics.max_response_time = response_time;
|
||
}
|
||
|
||
// 计算平均响应时间
|
||
let total_time: u64 = times.iter().sum();
|
||
metrics.avg_response_time = total_time / times.len() as u64;
|
||
|
||
// 计算吞吐量
|
||
let duration = self.start_time.elapsed().as_secs_f64();
|
||
if duration > 0.0 {
|
||
metrics.throughput = metrics.total_requests as f64 / duration;
|
||
}
|
||
metrics.duration = duration as u64;
|
||
}
|
||
|
||
/// 获取指标
|
||
pub fn get_metrics(&self) -> PerformanceMetrics {
|
||
let metrics = self.metrics.lock().unwrap();
|
||
metrics.clone()
|
||
}
|
||
|
||
/// 重置指标
|
||
pub fn reset(&self) {
|
||
let mut metrics = self.metrics.lock().unwrap();
|
||
let mut times = self.response_times.lock().unwrap();
|
||
|
||
*metrics = PerformanceMetrics {
|
||
total_requests: 0,
|
||
successful_requests: 0,
|
||
failed_requests: 0,
|
||
avg_response_time: 0,
|
||
min_response_time: u64::MAX,
|
||
max_response_time: 0,
|
||
throughput: 0.0,
|
||
start_time: Self::current_timestamp(),
|
||
duration: 0,
|
||
};
|
||
times.clear();
|
||
}
|
||
|
||
/// 获取当前时间戳
|
||
fn current_timestamp() -> u64 {
|
||
std::time::SystemTime::now()
|
||
.duration_since(std::time::UNIX_EPOCH)
|
||
.unwrap()
|
||
.as_secs()
|
||
}
|
||
}
|
||
|
||
impl Default for PerformanceMonitor {
|
||
fn default() -> Self {
|
||
Self::new()
|
||
}
|
||
}
|
||
|
||
/// 性能测试器
|
||
#[derive(Debug)]
|
||
pub struct PerformanceTester {
|
||
/// 监控器
|
||
monitor: Arc<PerformanceMonitor>,
|
||
}
|
||
|
||
impl PerformanceTester {
|
||
/// 创建新的性能测试器
|
||
pub fn new() -> Self {
|
||
Self {
|
||
monitor: Arc::new(PerformanceMonitor::new()),
|
||
}
|
||
}
|
||
|
||
/// 运行负载测试
|
||
pub fn run_load_test(
|
||
&self,
|
||
duration: Duration,
|
||
concurrency: usize,
|
||
) -> PerformanceMetrics {
|
||
self.monitor.reset();
|
||
|
||
let start = Instant::now();
|
||
|
||
while start.elapsed() < duration {
|
||
// 模拟并发请求
|
||
for _ in 0..concurrency {
|
||
let response_time = Self::simulate_request();
|
||
self.monitor.record_request(response_time, true);
|
||
}
|
||
}
|
||
|
||
self.monitor.get_metrics()
|
||
}
|
||
|
||
/// 模拟请求
|
||
fn simulate_request() -> u64 {
|
||
// 模拟10-100ms的响应时间
|
||
let response_time = 10 + (rand::random::<u64>() % 90);
|
||
std::thread::sleep(Duration::from_millis(response_time));
|
||
response_time
|
||
}
|
||
|
||
/// 获取监控器
|
||
pub fn get_monitor(&self) -> Arc<PerformanceMonitor> {
|
||
self.monitor.clone()
|
||
}
|
||
}
|
||
|
||
impl Default for PerformanceTester {
|
||
fn default() -> Self {
|
||
Self::new()
|
||
}
|
||
}
|
||
|
||
// 简单的随机数生成(避免依赖rand crate)
|
||
mod rand {
|
||
use std::cell::Cell;
|
||
|
||
thread_local! {
|
||
static SEED: Cell<u64> = Cell::new(1);
|
||
}
|
||
|
||
pub fn random<T: From<u64>>() -> T {
|
||
SEED.with(|seed| {
|
||
let mut s = seed.get();
|
||
s ^= s << 13;
|
||
s ^= s >> 7;
|
||
s ^= s << 17;
|
||
seed.set(s);
|
||
T::from(s)
|
||
})
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn test_message_compressor() {
|
||
let config = CompressionConfig::default();
|
||
let compressor = MessageCompressor::new(config);
|
||
|
||
let data = b"Hello, NAC Lens!";
|
||
let compressed = compressor.compress(data).unwrap();
|
||
let decompressed = compressor.decompress(&compressed).unwrap();
|
||
|
||
assert_eq!(data, decompressed.as_slice());
|
||
}
|
||
|
||
#[test]
|
||
fn test_batch_processor() {
|
||
let config = BatchConfig::default();
|
||
let processor: BatchProcessor<String> = BatchProcessor::new(config);
|
||
|
||
processor.add_request("req1".to_string(), "data1".to_string());
|
||
processor.add_request("req2".to_string(), "data2".to_string());
|
||
|
||
assert_eq!(processor.queue_size(), 2);
|
||
|
||
let batch = processor.get_batch();
|
||
assert_eq!(batch.len(), 2);
|
||
assert_eq!(processor.queue_size(), 0);
|
||
}
|
||
|
||
#[test]
|
||
fn test_performance_monitor() {
|
||
let monitor = PerformanceMonitor::new();
|
||
|
||
monitor.record_request(100, true);
|
||
monitor.record_request(200, true);
|
||
monitor.record_request(150, false);
|
||
|
||
let metrics = monitor.get_metrics();
|
||
assert_eq!(metrics.total_requests, 3);
|
||
assert_eq!(metrics.successful_requests, 2);
|
||
assert_eq!(metrics.failed_requests, 1);
|
||
assert_eq!(metrics.avg_response_time, 150);
|
||
assert_eq!(metrics.min_response_time, 100);
|
||
assert_eq!(metrics.max_response_time, 200);
|
||
}
|
||
|
||
#[test]
|
||
fn test_compression_stats() {
|
||
let config = CompressionConfig::default();
|
||
let compressor = MessageCompressor::new(config);
|
||
|
||
let data = vec![0u8; 2048];
|
||
let _ = compressor.compress(&data).unwrap();
|
||
|
||
let stats = compressor.get_stats();
|
||
assert_eq!(stats.compression_count, 1);
|
||
assert!(stats.original_size > 0);
|
||
}
|
||
|
||
#[test]
|
||
fn test_batch_timeout() {
|
||
let mut config = BatchConfig::default();
|
||
config.batch_timeout = 50; // 50ms超时
|
||
let processor: BatchProcessor<String> = BatchProcessor::new(config);
|
||
|
||
processor.add_request("req1".to_string(), "data1".to_string());
|
||
|
||
// 等待超过超时时间
|
||
std::thread::sleep(Duration::from_millis(100));
|
||
|
||
let batch = processor.get_batch();
|
||
// 超时的请求不应该被返回
|
||
assert_eq!(batch.len(), 0);
|
||
}
|
||
}
|