NAC_Blockchain/nac-lens/src/connection.rs

562 lines
16 KiB
Rust

//! NAC Lens连接管理系统
//!
//! 实现连接池、心跳机制、超时处理和连接复用
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use serde::{Serialize, Deserialize};
use crate::error::{NacLensError, Result};
/// 连接状态
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConnectionState {
/// 未连接
Disconnected,
/// 正在连接
Connecting,
/// 已连接
Connected,
/// 空闲
Idle,
/// 繁忙
Busy,
/// 正在关闭
Closing,
/// 已关闭
Closed,
/// 错误
Error,
}
/// 连接信息
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionInfo {
/// 连接ID
pub id: String,
/// 远程地址
pub remote_addr: String,
/// 连接状态
pub state: ConnectionState,
/// 创建时间
pub created_at: u64,
/// 最后活跃时间
pub last_active: u64,
/// 最后心跳时间
pub last_heartbeat: u64,
/// 请求计数
pub request_count: u64,
/// 错误计数
pub error_count: u64,
/// 是否可复用
pub reusable: bool,
}
/// 连接配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectionConfig {
/// 最大连接数
pub max_connections: usize,
/// 最小连接数
pub min_connections: usize,
/// 连接超时(秒)
pub connect_timeout: u64,
/// 空闲超时(秒)
pub idle_timeout: u64,
/// 心跳间隔(秒)
pub heartbeat_interval: u64,
/// 心跳超时(秒)
pub heartbeat_timeout: u64,
/// 最大重试次数
pub max_retries: u32,
/// 重试延迟(秒)
pub retry_delay: u64,
/// 是否启用连接复用
pub enable_reuse: bool,
}
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
max_connections: 100,
min_connections: 10,
connect_timeout: 30,
idle_timeout: 300,
heartbeat_interval: 30,
heartbeat_timeout: 10,
max_retries: 3,
retry_delay: 5,
enable_reuse: true,
}
}
}
/// 连接池统计
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoolStats {
/// 总连接数
pub total_connections: usize,
/// 活跃连接数
pub active_connections: usize,
/// 空闲连接数
pub idle_connections: usize,
/// 等待连接数
pub waiting_connections: usize,
/// 总请求数
pub total_requests: u64,
/// 总错误数
pub total_errors: u64,
/// 平均响应时间(毫秒)
pub avg_response_time: u64,
}
/// 连接
#[derive(Debug)]
struct Connection {
/// 连接信息
info: ConnectionInfo,
/// 最后使用时间
last_used: Instant,
/// 是否正在使用
in_use: bool,
}
/// 连接池
#[derive(Debug)]
pub struct ConnectionPool {
/// 配置
config: ConnectionConfig,
/// 连接映射
connections: Arc<Mutex<HashMap<String, Connection>>>,
/// 下一个连接ID
next_id: Arc<Mutex<u64>>,
/// 统计信息
stats: Arc<Mutex<PoolStats>>,
}
impl ConnectionPool {
/// 创建新的连接池
pub fn new(config: ConnectionConfig) -> Self {
Self {
config,
connections: Arc::new(Mutex::new(HashMap::new())),
next_id: Arc::new(Mutex::new(1)),
stats: Arc::new(Mutex::new(PoolStats {
total_connections: 0,
active_connections: 0,
idle_connections: 0,
waiting_connections: 0,
total_requests: 0,
total_errors: 0,
avg_response_time: 0,
})),
}
}
/// 获取连接
pub fn get_connection(&self, remote_addr: &str) -> Result<String> {
let mut connections = self.connections.lock().unwrap();
// 查找可复用的空闲连接
if self.config.enable_reuse {
for (id, conn) in connections.iter_mut() {
if conn.info.remote_addr == remote_addr
&& conn.info.state == ConnectionState::Idle
&& !conn.in_use
&& conn.info.reusable
{
// 检查连接是否过期
if conn.last_used.elapsed().as_secs() < self.config.idle_timeout {
conn.in_use = true;
conn.info.state = ConnectionState::Busy;
conn.info.last_active = Self::current_timestamp();
conn.last_used = Instant::now();
return Ok(id.clone());
}
}
}
}
// 检查是否达到最大连接数
if connections.len() >= self.config.max_connections {
return Err(NacLensError::NetworkError(
"Connection pool is full".to_string(),
));
}
// 创建新连接
let conn_id = self.create_connection(remote_addr)?;
// 标记为使用中
if let Some(conn) = connections.get_mut(&conn_id) {
conn.in_use = true;
conn.info.state = ConnectionState::Busy;
}
Ok(conn_id)
}
/// 创建连接
fn create_connection(&self, remote_addr: &str) -> Result<String> {
let mut next_id = self.next_id.lock().unwrap();
let conn_id = format!("CONN-{:08}", *next_id);
*next_id += 1;
drop(next_id);
let current_time = Self::current_timestamp();
let info = ConnectionInfo {
id: conn_id.clone(),
remote_addr: remote_addr.to_string(),
state: ConnectionState::Connected,
created_at: current_time,
last_active: current_time,
last_heartbeat: current_time,
request_count: 0,
error_count: 0,
reusable: self.config.enable_reuse,
};
let connection = Connection {
info,
last_used: Instant::now(),
in_use: false,
};
let mut connections = self.connections.lock().unwrap();
connections.insert(conn_id.clone(), connection);
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.total_connections += 1;
stats.active_connections += 1;
Ok(conn_id)
}
/// 释放连接
pub fn release_connection(&self, conn_id: &str) -> Result<()> {
let mut connections = self.connections.lock().unwrap();
if let Some(conn) = connections.get_mut(conn_id) {
conn.in_use = false;
conn.info.state = ConnectionState::Idle;
conn.info.last_active = Self::current_timestamp();
conn.last_used = Instant::now();
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.active_connections = stats.active_connections.saturating_sub(1);
stats.idle_connections += 1;
Ok(())
} else {
Err(NacLensError::NetworkError(format!(
"Connection {} not found",
conn_id
)))
}
}
/// 关闭连接
pub fn close_connection(&self, conn_id: &str) -> Result<()> {
let mut connections = self.connections.lock().unwrap();
if let Some(mut conn) = connections.remove(conn_id) {
conn.info.state = ConnectionState::Closed;
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.total_connections = stats.total_connections.saturating_sub(1);
if conn.in_use {
stats.active_connections = stats.active_connections.saturating_sub(1);
} else {
stats.idle_connections = stats.idle_connections.saturating_sub(1);
}
Ok(())
} else {
Err(NacLensError::NetworkError(format!(
"Connection {} not found",
conn_id
)))
}
}
/// 发送心跳
pub fn send_heartbeat(&self, conn_id: &str) -> Result<()> {
let mut connections = self.connections.lock().unwrap();
if let Some(conn) = connections.get_mut(conn_id) {
conn.info.last_heartbeat = Self::current_timestamp();
conn.info.last_active = Self::current_timestamp();
Ok(())
} else {
Err(NacLensError::NetworkError(format!(
"Connection {} not found",
conn_id
)))
}
}
/// 检查心跳超时
pub fn check_heartbeat_timeout(&self) -> Vec<String> {
let mut connections = self.connections.lock().unwrap();
let current_time = Self::current_timestamp();
let timeout = self.config.heartbeat_timeout;
let mut timeout_connections = Vec::new();
for (id, conn) in connections.iter_mut() {
let elapsed = current_time - conn.info.last_heartbeat;
if elapsed > timeout && conn.info.state == ConnectionState::Connected {
conn.info.state = ConnectionState::Error;
timeout_connections.push(id.clone());
}
}
timeout_connections
}
/// 清理空闲连接
pub fn cleanup_idle_connections(&self) -> usize {
let mut connections = self.connections.lock().unwrap();
let idle_timeout = self.config.idle_timeout;
let mut to_remove = Vec::new();
for (id, conn) in connections.iter() {
if !conn.in_use
&& conn.info.state == ConnectionState::Idle
&& conn.last_used.elapsed().as_secs() > idle_timeout
{
to_remove.push(id.clone());
}
}
let count = to_remove.len();
for id in to_remove {
connections.remove(&id);
}
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.total_connections = stats.total_connections.saturating_sub(count);
stats.idle_connections = stats.idle_connections.saturating_sub(count);
count
}
/// 记录请求
pub fn record_request(&self, conn_id: &str, success: bool) -> Result<()> {
let mut connections = self.connections.lock().unwrap();
if let Some(conn) = connections.get_mut(conn_id) {
conn.info.request_count += 1;
if !success {
conn.info.error_count += 1;
}
conn.info.last_active = Self::current_timestamp();
// 更新统计
let mut stats = self.stats.lock().unwrap();
stats.total_requests += 1;
if !success {
stats.total_errors += 1;
}
Ok(())
} else {
Err(NacLensError::NetworkError(format!(
"Connection {} not found",
conn_id
)))
}
}
/// 获取连接信息
pub fn get_connection_info(&self, conn_id: &str) -> Option<ConnectionInfo> {
let connections = self.connections.lock().unwrap();
connections.get(conn_id).map(|c| c.info.clone())
}
/// 获取所有连接信息
pub fn get_all_connections(&self) -> Vec<ConnectionInfo> {
let connections = self.connections.lock().unwrap();
connections.values().map(|c| c.info.clone()).collect()
}
/// 获取统计信息
pub fn get_stats(&self) -> PoolStats {
let stats = self.stats.lock().unwrap();
stats.clone()
}
/// 获取配置
pub fn get_config(&self) -> &ConnectionConfig {
&self.config
}
/// 获取当前时间戳
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
}
/// 心跳管理器
#[derive(Debug)]
pub struct HeartbeatManager {
/// 连接池
pool: Arc<ConnectionPool>,
/// 心跳间隔
pub interval: Duration,
/// 是否运行
running: Arc<Mutex<bool>>,
}
impl HeartbeatManager {
/// 创建新的心跳管理器
pub fn new(pool: Arc<ConnectionPool>, interval: Duration) -> Self {
Self {
pool,
interval,
running: Arc::new(Mutex::new(false)),
}
}
/// 启动心跳
pub fn start(&self) {
let mut running = self.running.lock().unwrap();
*running = true;
}
/// 停止心跳
pub fn stop(&self) {
let mut running = self.running.lock().unwrap();
*running = false;
}
/// 执行心跳检查
pub fn check(&self) {
let running = self.running.lock().unwrap();
if !*running {
return;
}
drop(running);
// 检查心跳超时
let timeout_connections = self.pool.check_heartbeat_timeout();
for conn_id in timeout_connections {
// 尝试重新连接或关闭
let _ = self.pool.close_connection(&conn_id);
}
// 清理空闲连接
let _ = self.pool.cleanup_idle_connections();
}
/// 是否正在运行
pub fn is_running(&self) -> bool {
let running = self.running.lock().unwrap();
*running
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connection_pool_create() {
let config = ConnectionConfig::default();
let pool = ConnectionPool::new(config);
let stats = pool.get_stats();
assert_eq!(stats.total_connections, 0);
}
#[test]
fn test_get_connection() {
let config = ConnectionConfig::default();
let pool = ConnectionPool::new(config);
let conn_id = pool.get_connection("127.0.0.1:8080").unwrap();
assert!(!conn_id.is_empty());
let info = pool.get_connection_info(&conn_id).unwrap();
assert_eq!(info.remote_addr, "127.0.0.1:8080");
assert_eq!(info.state, ConnectionState::Busy);
}
#[test]
fn test_release_connection() {
let config = ConnectionConfig::default();
let pool = ConnectionPool::new(config);
let conn_id = pool.get_connection("127.0.0.1:8080").unwrap();
pool.release_connection(&conn_id).unwrap();
let info = pool.get_connection_info(&conn_id).unwrap();
assert_eq!(info.state, ConnectionState::Idle);
}
#[test]
fn test_connection_reuse() {
let mut config = ConnectionConfig::default();
config.enable_reuse = true;
let pool = ConnectionPool::new(config);
let conn_id1 = pool.get_connection("127.0.0.1:8080").unwrap();
pool.release_connection(&conn_id1).unwrap();
let conn_id2 = pool.get_connection("127.0.0.1:8080").unwrap();
assert_eq!(conn_id1, conn_id2);
}
#[test]
fn test_close_connection() {
let config = ConnectionConfig::default();
let pool = ConnectionPool::new(config);
let conn_id = pool.get_connection("127.0.0.1:8080").unwrap();
pool.close_connection(&conn_id).unwrap();
assert!(pool.get_connection_info(&conn_id).is_none());
}
#[test]
fn test_record_request() {
let config = ConnectionConfig::default();
let pool = ConnectionPool::new(config);
let conn_id = pool.get_connection("127.0.0.1:8080").unwrap();
pool.record_request(&conn_id, true).unwrap();
pool.record_request(&conn_id, false).unwrap();
let info = pool.get_connection_info(&conn_id).unwrap();
assert_eq!(info.request_count, 2);
assert_eq!(info.error_count, 1);
let stats = pool.get_stats();
assert_eq!(stats.total_requests, 2);
assert_eq!(stats.total_errors, 1);
}
#[test]
fn test_heartbeat_manager() {
let config = ConnectionConfig::default();
let pool = Arc::new(ConnectionPool::new(config));
let manager = HeartbeatManager::new(pool, Duration::from_secs(30));
assert!(!manager.is_running());
manager.start();
assert!(manager.is_running());
manager.stop();
assert!(!manager.is_running());
}
}