NAC_Blockchain/nac-lens/src/retry.rs

560 lines
15 KiB
Rust

//! NAC Lens重试机制和日志系统
//!
//! 实现错误传播、重试机制和日志记录
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use serde::{Serialize, Deserialize};
use crate::error::{Nrpc4Error, Result};
/// 重试策略
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RetryStrategy {
/// 固定延迟
FixedDelay,
/// 指数退避
ExponentialBackoff,
/// 线性退避
LinearBackoff,
}
/// 重试配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryConfig {
/// 最大重试次数
pub max_retries: u32,
/// 初始延迟(毫秒)
pub initial_delay: u64,
/// 最大延迟(毫秒)
pub max_delay: u64,
/// 重试策略
pub strategy: RetryStrategy,
/// 退避因子(用于指数/线性退避)
pub backoff_factor: f64,
/// 是否启用
pub enabled: bool,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_delay: 1000,
max_delay: 30000,
strategy: RetryStrategy::ExponentialBackoff,
backoff_factor: 2.0,
enabled: true,
}
}
}
/// 重试状态
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryState {
/// 尝试次数
pub attempt: u32,
/// 下次重试延迟(毫秒)
pub next_delay: u64,
/// 最后错误
pub last_error: Option<String>,
/// 开始时间
pub start_time: u64,
}
/// 重试管理器
#[derive(Debug)]
pub struct RetryManager {
/// 配置
config: RetryConfig,
/// 重试状态映射
states: Arc<Mutex<std::collections::HashMap<String, RetryState>>>,
}
impl RetryManager {
/// 创建新的重试管理器
pub fn new(config: RetryConfig) -> Self {
Self {
config,
states: Arc::new(Mutex::new(std::collections::HashMap::new())),
}
}
/// 开始重试
pub fn start_retry(&self, operation_id: String) {
if !self.config.enabled {
return;
}
let state = RetryState {
attempt: 0,
next_delay: self.config.initial_delay,
last_error: None,
start_time: Self::current_timestamp(),
};
let mut states = self.states.lock().unwrap();
states.insert(operation_id, state);
}
/// 记录失败
pub fn record_failure(&self, operation_id: &str, error: String) -> bool {
if !self.config.enabled {
return false;
}
let mut states = self.states.lock().unwrap();
if let Some(state) = states.get_mut(operation_id) {
state.attempt += 1;
state.last_error = Some(error);
// 检查是否达到最大重试次数
if state.attempt >= self.config.max_retries {
return false;
}
// 计算下次延迟
state.next_delay = self.calculate_delay(state.attempt);
true
} else {
false
}
}
/// 记录成功
pub fn record_success(&self, operation_id: &str) {
let mut states = self.states.lock().unwrap();
states.remove(operation_id);
}
/// 获取重试状态
pub fn get_state(&self, operation_id: &str) -> Option<RetryState> {
let states = self.states.lock().unwrap();
states.get(operation_id).cloned()
}
/// 计算延迟
fn calculate_delay(&self, attempt: u32) -> u64 {
let delay = match self.config.strategy {
RetryStrategy::FixedDelay => self.config.initial_delay,
RetryStrategy::ExponentialBackoff => {
let delay = self.config.initial_delay as f64
* self.config.backoff_factor.powi(attempt as i32 - 1);
delay as u64
}
RetryStrategy::LinearBackoff => {
self.config.initial_delay + (attempt as u64 - 1) * 1000
}
};
std::cmp::min(delay, self.config.max_delay)
}
/// 获取当前时间戳
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// 日志级别
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum LogLevel {
/// 跟踪
Trace,
/// 调试
Debug,
/// 信息
Info,
/// 警告
Warning,
/// 错误
Error,
/// 致命
Fatal,
}
/// 日志记录
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogRecord {
/// 日志ID
pub id: String,
/// 级别
pub level: LogLevel,
/// 消息
pub message: String,
/// 模块
pub module: String,
/// 时间戳
pub timestamp: u64,
/// 额外数据
pub data: Option<String>,
}
/// 日志配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogConfig {
/// 最小日志级别
pub min_level: LogLevel,
/// 最大日志数
pub max_logs: usize,
/// 是否启用控制台输出
pub console_output: bool,
/// 是否启用文件输出
pub file_output: bool,
/// 日志文件路径
pub file_path: Option<String>,
}
impl Default for LogConfig {
fn default() -> Self {
Self {
min_level: LogLevel::Info,
max_logs: 10000,
console_output: true,
file_output: false,
file_path: None,
}
}
}
/// 日志记录器
#[derive(Debug)]
pub struct Logger {
/// 配置
config: LogConfig,
/// 日志队列
logs: Arc<Mutex<VecDeque<LogRecord>>>,
/// 下一个日志ID
next_log_id: Arc<Mutex<u64>>,
}
impl Logger {
/// 创建新的日志记录器
pub fn new(config: LogConfig) -> Self {
Self {
config,
logs: Arc::new(Mutex::new(VecDeque::new())),
next_log_id: Arc::new(Mutex::new(1)),
}
}
/// 记录日志
pub fn log(
&self,
level: LogLevel,
module: String,
message: String,
data: Option<String>,
) -> String {
// 检查日志级别
if level < self.config.min_level {
return String::new();
}
let mut next_id = self.next_log_id.lock().unwrap();
let log_id = format!("LOG-{:08}", *next_id);
*next_id += 1;
drop(next_id);
let record = LogRecord {
id: log_id.clone(),
level,
message: message.clone(),
module: module.clone(),
timestamp: Self::current_timestamp(),
data,
};
// 控制台输出
if self.config.console_output {
self.print_log(&record);
}
// 添加到队列
let mut logs = self.logs.lock().unwrap();
logs.push_back(record);
// 限制日志数量
if logs.len() > self.config.max_logs {
logs.pop_front();
}
log_id
}
/// 打印日志
fn print_log(&self, record: &LogRecord) {
let level_str = match record.level {
LogLevel::Trace => "TRACE",
LogLevel::Debug => "DEBUG",
LogLevel::Info => "INFO",
LogLevel::Warning => "WARN",
LogLevel::Error => "ERROR",
LogLevel::Fatal => "FATAL",
};
println!(
"[{}] [{}] [{}] {}",
level_str, record.module, record.timestamp, record.message
);
}
/// Trace日志
pub fn trace(&self, module: String, message: String) {
self.log(LogLevel::Trace, module, message, None);
}
/// Debug日志
pub fn debug(&self, module: String, message: String) {
self.log(LogLevel::Debug, module, message, None);
}
/// Info日志
pub fn info(&self, module: String, message: String) {
self.log(LogLevel::Info, module, message, None);
}
/// Warning日志
pub fn warning(&self, module: String, message: String) {
self.log(LogLevel::Warning, module, message, None);
}
/// Error日志
pub fn error(&self, module: String, message: String) {
self.log(LogLevel::Error, module, message, None);
}
/// Fatal日志
pub fn fatal(&self, module: String, message: String) {
self.log(LogLevel::Fatal, module, message, None);
}
/// 获取日志
pub fn get_log(&self, log_id: &str) -> Option<LogRecord> {
let logs = self.logs.lock().unwrap();
logs.iter().find(|l| l.id == log_id).cloned()
}
/// 获取所有日志
pub fn get_all_logs(&self) -> Vec<LogRecord> {
let logs = self.logs.lock().unwrap();
logs.iter().cloned().collect()
}
/// 按级别获取日志
pub fn get_logs_by_level(&self, level: LogLevel) -> Vec<LogRecord> {
let logs = self.logs.lock().unwrap();
logs.iter()
.filter(|l| l.level == level)
.cloned()
.collect()
}
/// 按模块获取日志
pub fn get_logs_by_module(&self, module: &str) -> Vec<LogRecord> {
let logs = self.logs.lock().unwrap();
logs.iter()
.filter(|l| l.module == module)
.cloned()
.collect()
}
/// 清空日志
pub fn clear_logs(&self) {
let mut logs = self.logs.lock().unwrap();
logs.clear();
}
/// 获取日志数量
pub fn get_log_count(&self) -> usize {
let logs = self.logs.lock().unwrap();
logs.len()
}
/// 获取当前时间戳
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// 错误传播器
#[derive(Debug)]
pub struct ErrorPropagator {
/// 日志记录器
logger: Arc<Logger>,
/// 重试管理器
retry_manager: Arc<RetryManager>,
}
impl ErrorPropagator {
/// 创建新的错误传播器
pub fn new(logger: Arc<Logger>, retry_manager: Arc<RetryManager>) -> Self {
Self {
logger,
retry_manager,
}
}
/// 处理错误
pub fn handle_error(
&self,
operation_id: &str,
error: &Nrpc4Error,
module: &str,
) -> bool {
// 记录错误日志
self.logger.error(
module.to_string(),
format!("Operation {} failed: {}", operation_id, error),
);
// 记录失败并检查是否应该重试
let should_retry = self.retry_manager.record_failure(
operation_id,
error.to_string(),
);
if should_retry {
if let Some(state) = self.retry_manager.get_state(operation_id) {
self.logger.info(
module.to_string(),
format!(
"Retrying operation {} (attempt {}/{})",
operation_id,
state.attempt + 1,
self.retry_manager.config.max_retries
),
);
}
} else {
self.logger.error(
module.to_string(),
format!("Operation {} failed after max retries", operation_id),
);
}
should_retry
}
/// 处理成功
pub fn handle_success(&self, operation_id: &str, module: &str) {
self.retry_manager.record_success(operation_id);
self.logger.info(
module.to_string(),
format!("Operation {} succeeded", operation_id),
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_retry_manager() {
let config = RetryConfig::default();
let manager = RetryManager::new(config);
manager.start_retry("op1".to_string());
let should_retry = manager.record_failure("op1", "Error 1".to_string());
assert!(should_retry);
let state = manager.get_state("op1").unwrap();
assert_eq!(state.attempt, 1);
}
#[test]
fn test_retry_max_attempts() {
let mut config = RetryConfig::default();
config.max_retries = 2;
let manager = RetryManager::new(config);
manager.start_retry("op1".to_string());
assert!(manager.record_failure("op1", "Error 1".to_string()));
assert!(!manager.record_failure("op1", "Error 2".to_string()));
}
#[test]
fn test_exponential_backoff() {
let config = RetryConfig {
max_retries: 5,
initial_delay: 1000,
max_delay: 30000,
strategy: RetryStrategy::ExponentialBackoff,
backoff_factor: 2.0,
enabled: true,
};
let manager = RetryManager::new(config);
manager.start_retry("op1".to_string());
manager.record_failure("op1", "Error 1".to_string());
let state = manager.get_state("op1").unwrap();
assert_eq!(state.next_delay, 1000);
manager.record_failure("op1", "Error 2".to_string());
let state = manager.get_state("op1").unwrap();
assert_eq!(state.next_delay, 2000);
}
#[test]
fn test_logger() {
let config = LogConfig::default();
let logger = Logger::new(config);
logger.info("test".to_string(), "Test message".to_string());
logger.error("test".to_string(), "Error message".to_string());
assert_eq!(logger.get_log_count(), 2);
let info_logs = logger.get_logs_by_level(LogLevel::Info);
assert_eq!(info_logs.len(), 1);
let test_logs = logger.get_logs_by_module("test");
assert_eq!(test_logs.len(), 2);
}
#[test]
fn test_logger_level_filter() {
let mut config = LogConfig::default();
config.min_level = LogLevel::Warning;
let logger = Logger::new(config);
logger.info("test".to_string(), "Info message".to_string());
logger.warning("test".to_string(), "Warning message".to_string());
logger.error("test".to_string(), "Error message".to_string());
// Info级别的日志应该被过滤掉
assert_eq!(logger.get_log_count(), 2);
}
#[test]
fn test_error_propagator() {
let log_config = LogConfig::default();
let logger = Arc::new(Logger::new(log_config));
let retry_config = RetryConfig::default();
let retry_manager = Arc::new(RetryManager::new(retry_config));
let propagator = ErrorPropagator::new(logger.clone(), retry_manager.clone());
retry_manager.start_retry("op1".to_string());
let error = Nrpc4Error::NetworkError("Connection failed".to_string());
let should_retry = propagator.handle_error("op1", &error, "test");
assert!(should_retry);
assert!(logger.get_log_count() > 0);
}
}