NAC_Blockchain/memory/tools/query.py

341 lines
12 KiB
Python

#!/usr/bin/env python3
"""
NAC Memory System - Query Tool
查询记忆系统中的文档、问题、决策和原则
"""
import json
import sys
import os
from pathlib import Path
from typing import List, Dict, Any
# 记忆系统根目录
MEMORY_ROOT = Path(__file__).parent.parent
def load_json(file_path: Path) -> Dict[str, Any]:
"""加载JSON文件"""
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
def search_in_text(text: str, keyword: str) -> bool:
"""在文本中搜索关键词(不区分大小写)"""
return keyword.lower() in text.lower()
def search_in_dict(data: Dict[str, Any], keyword: str) -> bool:
"""递归搜索字典中的关键词"""
if isinstance(data, dict):
for key, value in data.items():
if search_in_text(str(key), keyword) or search_in_dict(value, keyword):
return True
elif isinstance(data, list):
for item in data:
if search_in_dict(item, keyword):
return True
elif isinstance(data, str):
return search_in_text(data, keyword)
return False
def query_documents(keyword: str = None, doc_type: str = None) -> List[Dict[str, Any]]:
"""查询文档"""
index_file = MEMORY_ROOT / "documents" / "index.json"
if not index_file.exists():
return []
index = load_json(index_file)
results = []
for doc_info in index.get("documents", []):
doc_file = MEMORY_ROOT / "documents" / doc_info["file"]
if not doc_file.exists():
continue
doc = load_json(doc_file)
# 过滤条件
if keyword and not search_in_dict(doc, keyword):
continue
if doc_type and doc.get("document_type") != doc_type:
continue
results.append(doc)
return results
def query_problems(keyword: str = None, status: str = None, severity: str = None) -> List[Dict[str, Any]]:
"""查询问题"""
index_file = MEMORY_ROOT / "problems" / "index.json"
if not index_file.exists():
return []
index = load_json(index_file)
results = []
for problem_info in index.get("problems", []):
problem_file = MEMORY_ROOT / "problems" / problem_info["file"]
if not problem_file.exists():
continue
problem = load_json(problem_file)
# 过滤条件
if keyword and not search_in_dict(problem, keyword):
continue
if status and problem.get("status") != status:
continue
if severity and problem.get("severity") != severity:
continue
results.append(problem)
return results
def query_decisions(keyword: str = None, status: str = None, category: str = None) -> List[Dict[str, Any]]:
"""查询决策"""
index_file = MEMORY_ROOT / "decisions" / "index.json"
if not index_file.exists():
return []
index = load_json(index_file)
results = []
for decision_info in index.get("decisions", []):
decision_file = MEMORY_ROOT / "decisions" / decision_info["file"]
if not decision_file.exists():
continue
decision = load_json(decision_file)
# 过滤条件
if keyword and not search_in_dict(decision, keyword):
continue
if status and decision.get("status") != status:
continue
if category and decision.get("category") != category:
continue
results.append(decision)
return results
def query_principles(keyword: str = None, category: str = None) -> List[Dict[str, Any]]:
"""查询原则"""
index_file = MEMORY_ROOT / "principles" / "index.json"
if not index_file.exists():
return []
index = load_json(index_file)
results = []
for cat_info in index.get("categories", []):
if category and cat_info["category"] != category:
continue
principle_file = MEMORY_ROOT / "principles" / cat_info["file"]
if not principle_file.exists():
continue
principles = load_json(principle_file)
if keyword and not search_in_dict(principles, keyword):
continue
results.append(principles)
return results
def query_terminology(term: str = None) -> List[Dict[str, Any]]:
"""查询术语映射"""
terminology_file = MEMORY_ROOT / "principles" / "terminology.json"
if not terminology_file.exists():
return []
terminology = load_json(terminology_file)
if not term:
return terminology.get("mappings", [])
results = []
for mapping in terminology.get("mappings", []):
if (search_in_text(mapping.get("wrong_term", ""), term) or
search_in_text(mapping.get("correct_term", ""), term) or
search_in_text(mapping.get("chinese", ""), term)):
results.append(mapping)
return results
def print_document(doc: Dict[str, Any]):
"""打印文档信息"""
print(f"\n{'='*80}")
print(f"📄 {doc.get('title', 'Untitled')}")
print(f"{'='*80}")
print(f"ID: {doc.get('doc_id', 'N/A')}")
print(f"类型: {doc.get('document_type', 'N/A')}")
print(f"重要性: {doc.get('importance', 'N/A')}")
print(f"阅读日期: {doc.get('read_date', 'N/A')}")
print(f"路径: {doc.get('path', 'N/A')}")
print(f"\n摘要:")
print(f" {doc.get('summary', 'N/A')}")
if doc.get('core_concepts'):
print(f"\n核心概念 ({len(doc['core_concepts'])}个):")
for i, concept in enumerate(doc['core_concepts'][:3], 1):
print(f" {i}. {concept.get('concept', 'N/A')}")
print(f" {concept.get('definition', 'N/A')}")
if doc.get('key_features'):
print(f"\n关键特性 ({len(doc['key_features'])}个):")
for i, feature in enumerate(doc['key_features'][:5], 1):
print(f" {i}. {feature}")
def print_problem(problem: Dict[str, Any]):
"""打印问题信息"""
print(f"\n{'='*80}")
print(f"⚠️ {problem.get('title', 'Untitled')}")
print(f"{'='*80}")
print(f"ID: {problem.get('problem_id', 'N/A')}")
print(f"日期: {problem.get('date', 'N/A')}")
print(f"类别: {problem.get('category', 'N/A')}")
print(f"严重性: {problem.get('severity', 'N/A')}")
print(f"状态: {problem.get('status', 'N/A')}")
print(f"\n问题描述:")
print(f" {problem.get('description', 'N/A')}")
if problem.get('root_cause'):
print(f"\n根本原因:")
cause = problem['root_cause']
if isinstance(cause, dict):
print(f" 主要: {cause.get('primary', 'N/A')}")
print(f" 次要: {cause.get('secondary', 'N/A')}")
else:
print(f" {cause}")
if problem.get('solution'):
print(f"\n解决方案:")
solution = problem['solution']
if isinstance(solution, dict):
for key, value in solution.items():
print(f" {key}: {value}")
else:
print(f" {solution}")
def print_decision(decision: Dict[str, Any]):
"""打印决策信息"""
print(f"\n{'='*80}")
print(f"📋 {decision.get('title', 'Untitled')}")
print(f"{'='*80}")
print(f"ID: {decision.get('decision_id', 'N/A')}")
print(f"日期: {decision.get('date', 'N/A')}")
print(f"类别: {decision.get('category', 'N/A')}")
print(f"状态: {decision.get('status', 'N/A')}")
if decision.get('decision'):
print(f"\n决策内容:")
dec = decision['decision']
if isinstance(dec, dict):
print(f" {dec.get('statement', 'N/A')}")
if dec.get('rules'):
print(f"\n 规则:")
for rule in dec['rules']:
print(f" - {rule}")
else:
print(f" {dec}")
if decision.get('rationale'):
print(f"\n决策理由:")
for reason in decision['rationale']:
print(f" - {reason}")
def print_terminology(mapping: Dict[str, Any]):
"""打印术语映射"""
print(f"\n{'='*80}")
print(f"🔤 {mapping.get('wrong_term', 'N/A')}{mapping.get('correct_term', 'N/A')}")
print(f"{'='*80}")
print(f"ID: {mapping.get('mapping_id', 'N/A')}")
print(f"中文: {mapping.get('chinese', 'N/A')}")
print(f"类别: {mapping.get('category', 'N/A')}")
print(f"\n说明:")
print(f" {mapping.get('explanation', 'N/A')}")
if mapping.get('examples'):
print(f"\n示例:")
for example in mapping['examples']:
print(f" - {example}")
if mapping.get('must_not_use'):
print(f"\n禁止使用:")
for term in mapping['must_not_use']:
print(f"{term}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='NAC Memory System - Query Tool')
parser.add_argument('--keyword', '-k', help='搜索关键词')
parser.add_argument('--type', '-t', choices=['document', 'problem', 'decision', 'principle', 'term'],
help='记录类型')
parser.add_argument('--status', '-s', help='状态过滤(问题/决策)')
parser.add_argument('--severity', help='严重性过滤(问题)')
parser.add_argument('--category', '-c', help='类别过滤')
parser.add_argument('--term', help='查询术语映射')
parser.add_argument('--list', '-l', action='store_true', help='列出所有记录')
args = parser.parse_args()
# 查询术语映射
if args.term or (args.type == 'term'):
mappings = query_terminology(args.term)
print(f"\n找到 {len(mappings)} 个术语映射:")
for mapping in mappings:
print_terminology(mapping)
return
# 查询文档
if args.type == 'document' or (not args.type and args.keyword):
docs = query_documents(args.keyword)
print(f"\n找到 {len(docs)} 个文档:")
for doc in docs:
print_document(doc)
# 查询问题
if args.type == 'problem' or args.list:
problems = query_problems(args.keyword, args.status, args.severity)
print(f"\n找到 {len(problems)} 个问题:")
for problem in problems:
print_problem(problem)
# 查询决策
if args.type == 'decision' or args.list:
decisions = query_decisions(args.keyword, args.status, args.category)
print(f"\n找到 {len(decisions)} 个决策:")
for decision in decisions:
print_decision(decision)
# 查询原则
if args.type == 'principle':
principles = query_principles(args.keyword, args.category)
print(f"\n找到 {len(principles)} 个原则类别:")
for principle in principles:
print(f"\n{'='*80}")
print(f"🎯 {principle.get('title', 'Untitled')}")
print(f"{'='*80}")
print(f"类别: {principle.get('category', 'N/A')}")
if principle.get('principles'):
print(f"原则数量: {len(principle['principles'])}")
if principle.get('mappings'):
print(f"映射数量: {len(principle['mappings'])}")
# 如果没有指定任何参数,显示帮助
if not any([args.keyword, args.type, args.term, args.list]):
parser.print_help()
print("\n示例:")
print(" python query.py --keyword CBPP # 搜索包含CBPP的所有记录")
print(" python query.py --type problem # 列出所有问题")
print(" python query.py --type decision # 列出所有决策")
print(" python query.py --term Contract # 查询术语映射")
print(" python query.py --list # 列出所有记录")
if __name__ == "__main__":
main()