341 lines
12 KiB
Python
341 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
NAC Memory System - Query Tool
|
|
查询记忆系统中的文档、问题、决策和原则
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any
|
|
|
|
# 记忆系统根目录
|
|
MEMORY_ROOT = Path(__file__).parent.parent
|
|
|
|
def load_json(file_path: Path) -> Dict[str, Any]:
|
|
"""加载JSON文件"""
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
def search_in_text(text: str, keyword: str) -> bool:
|
|
"""在文本中搜索关键词(不区分大小写)"""
|
|
return keyword.lower() in text.lower()
|
|
|
|
def search_in_dict(data: Dict[str, Any], keyword: str) -> bool:
|
|
"""递归搜索字典中的关键词"""
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
if search_in_text(str(key), keyword) or search_in_dict(value, keyword):
|
|
return True
|
|
elif isinstance(data, list):
|
|
for item in data:
|
|
if search_in_dict(item, keyword):
|
|
return True
|
|
elif isinstance(data, str):
|
|
return search_in_text(data, keyword)
|
|
return False
|
|
|
|
def query_documents(keyword: str = None, doc_type: str = None) -> List[Dict[str, Any]]:
|
|
"""查询文档"""
|
|
index_file = MEMORY_ROOT / "documents" / "index.json"
|
|
if not index_file.exists():
|
|
return []
|
|
|
|
index = load_json(index_file)
|
|
results = []
|
|
|
|
for doc_info in index.get("documents", []):
|
|
doc_file = MEMORY_ROOT / "documents" / doc_info["file"]
|
|
if not doc_file.exists():
|
|
continue
|
|
|
|
doc = load_json(doc_file)
|
|
|
|
# 过滤条件
|
|
if keyword and not search_in_dict(doc, keyword):
|
|
continue
|
|
if doc_type and doc.get("document_type") != doc_type:
|
|
continue
|
|
|
|
results.append(doc)
|
|
|
|
return results
|
|
|
|
def query_problems(keyword: str = None, status: str = None, severity: str = None) -> List[Dict[str, Any]]:
|
|
"""查询问题"""
|
|
index_file = MEMORY_ROOT / "problems" / "index.json"
|
|
if not index_file.exists():
|
|
return []
|
|
|
|
index = load_json(index_file)
|
|
results = []
|
|
|
|
for problem_info in index.get("problems", []):
|
|
problem_file = MEMORY_ROOT / "problems" / problem_info["file"]
|
|
if not problem_file.exists():
|
|
continue
|
|
|
|
problem = load_json(problem_file)
|
|
|
|
# 过滤条件
|
|
if keyword and not search_in_dict(problem, keyword):
|
|
continue
|
|
if status and problem.get("status") != status:
|
|
continue
|
|
if severity and problem.get("severity") != severity:
|
|
continue
|
|
|
|
results.append(problem)
|
|
|
|
return results
|
|
|
|
def query_decisions(keyword: str = None, status: str = None, category: str = None) -> List[Dict[str, Any]]:
|
|
"""查询决策"""
|
|
index_file = MEMORY_ROOT / "decisions" / "index.json"
|
|
if not index_file.exists():
|
|
return []
|
|
|
|
index = load_json(index_file)
|
|
results = []
|
|
|
|
for decision_info in index.get("decisions", []):
|
|
decision_file = MEMORY_ROOT / "decisions" / decision_info["file"]
|
|
if not decision_file.exists():
|
|
continue
|
|
|
|
decision = load_json(decision_file)
|
|
|
|
# 过滤条件
|
|
if keyword and not search_in_dict(decision, keyword):
|
|
continue
|
|
if status and decision.get("status") != status:
|
|
continue
|
|
if category and decision.get("category") != category:
|
|
continue
|
|
|
|
results.append(decision)
|
|
|
|
return results
|
|
|
|
def query_principles(keyword: str = None, category: str = None) -> List[Dict[str, Any]]:
|
|
"""查询原则"""
|
|
index_file = MEMORY_ROOT / "principles" / "index.json"
|
|
if not index_file.exists():
|
|
return []
|
|
|
|
index = load_json(index_file)
|
|
results = []
|
|
|
|
for cat_info in index.get("categories", []):
|
|
if category and cat_info["category"] != category:
|
|
continue
|
|
|
|
principle_file = MEMORY_ROOT / "principles" / cat_info["file"]
|
|
if not principle_file.exists():
|
|
continue
|
|
|
|
principles = load_json(principle_file)
|
|
|
|
if keyword and not search_in_dict(principles, keyword):
|
|
continue
|
|
|
|
results.append(principles)
|
|
|
|
return results
|
|
|
|
def query_terminology(term: str = None) -> List[Dict[str, Any]]:
|
|
"""查询术语映射"""
|
|
terminology_file = MEMORY_ROOT / "principles" / "terminology.json"
|
|
if not terminology_file.exists():
|
|
return []
|
|
|
|
terminology = load_json(terminology_file)
|
|
|
|
if not term:
|
|
return terminology.get("mappings", [])
|
|
|
|
results = []
|
|
for mapping in terminology.get("mappings", []):
|
|
if (search_in_text(mapping.get("wrong_term", ""), term) or
|
|
search_in_text(mapping.get("correct_term", ""), term) or
|
|
search_in_text(mapping.get("chinese", ""), term)):
|
|
results.append(mapping)
|
|
|
|
return results
|
|
|
|
def print_document(doc: Dict[str, Any]):
|
|
"""打印文档信息"""
|
|
print(f"\n{'='*80}")
|
|
print(f"📄 {doc.get('title', 'Untitled')}")
|
|
print(f"{'='*80}")
|
|
print(f"ID: {doc.get('doc_id', 'N/A')}")
|
|
print(f"类型: {doc.get('document_type', 'N/A')}")
|
|
print(f"重要性: {doc.get('importance', 'N/A')}")
|
|
print(f"阅读日期: {doc.get('read_date', 'N/A')}")
|
|
print(f"路径: {doc.get('path', 'N/A')}")
|
|
print(f"\n摘要:")
|
|
print(f" {doc.get('summary', 'N/A')}")
|
|
|
|
if doc.get('core_concepts'):
|
|
print(f"\n核心概念 ({len(doc['core_concepts'])}个):")
|
|
for i, concept in enumerate(doc['core_concepts'][:3], 1):
|
|
print(f" {i}. {concept.get('concept', 'N/A')}")
|
|
print(f" {concept.get('definition', 'N/A')}")
|
|
|
|
if doc.get('key_features'):
|
|
print(f"\n关键特性 ({len(doc['key_features'])}个):")
|
|
for i, feature in enumerate(doc['key_features'][:5], 1):
|
|
print(f" {i}. {feature}")
|
|
|
|
def print_problem(problem: Dict[str, Any]):
|
|
"""打印问题信息"""
|
|
print(f"\n{'='*80}")
|
|
print(f"⚠️ {problem.get('title', 'Untitled')}")
|
|
print(f"{'='*80}")
|
|
print(f"ID: {problem.get('problem_id', 'N/A')}")
|
|
print(f"日期: {problem.get('date', 'N/A')}")
|
|
print(f"类别: {problem.get('category', 'N/A')}")
|
|
print(f"严重性: {problem.get('severity', 'N/A')}")
|
|
print(f"状态: {problem.get('status', 'N/A')}")
|
|
print(f"\n问题描述:")
|
|
print(f" {problem.get('description', 'N/A')}")
|
|
|
|
if problem.get('root_cause'):
|
|
print(f"\n根本原因:")
|
|
cause = problem['root_cause']
|
|
if isinstance(cause, dict):
|
|
print(f" 主要: {cause.get('primary', 'N/A')}")
|
|
print(f" 次要: {cause.get('secondary', 'N/A')}")
|
|
else:
|
|
print(f" {cause}")
|
|
|
|
if problem.get('solution'):
|
|
print(f"\n解决方案:")
|
|
solution = problem['solution']
|
|
if isinstance(solution, dict):
|
|
for key, value in solution.items():
|
|
print(f" {key}: {value}")
|
|
else:
|
|
print(f" {solution}")
|
|
|
|
def print_decision(decision: Dict[str, Any]):
|
|
"""打印决策信息"""
|
|
print(f"\n{'='*80}")
|
|
print(f"📋 {decision.get('title', 'Untitled')}")
|
|
print(f"{'='*80}")
|
|
print(f"ID: {decision.get('decision_id', 'N/A')}")
|
|
print(f"日期: {decision.get('date', 'N/A')}")
|
|
print(f"类别: {decision.get('category', 'N/A')}")
|
|
print(f"状态: {decision.get('status', 'N/A')}")
|
|
|
|
if decision.get('decision'):
|
|
print(f"\n决策内容:")
|
|
dec = decision['decision']
|
|
if isinstance(dec, dict):
|
|
print(f" {dec.get('statement', 'N/A')}")
|
|
if dec.get('rules'):
|
|
print(f"\n 规则:")
|
|
for rule in dec['rules']:
|
|
print(f" - {rule}")
|
|
else:
|
|
print(f" {dec}")
|
|
|
|
if decision.get('rationale'):
|
|
print(f"\n决策理由:")
|
|
for reason in decision['rationale']:
|
|
print(f" - {reason}")
|
|
|
|
def print_terminology(mapping: Dict[str, Any]):
|
|
"""打印术语映射"""
|
|
print(f"\n{'='*80}")
|
|
print(f"🔤 {mapping.get('wrong_term', 'N/A')} → {mapping.get('correct_term', 'N/A')}")
|
|
print(f"{'='*80}")
|
|
print(f"ID: {mapping.get('mapping_id', 'N/A')}")
|
|
print(f"中文: {mapping.get('chinese', 'N/A')}")
|
|
print(f"类别: {mapping.get('category', 'N/A')}")
|
|
print(f"\n说明:")
|
|
print(f" {mapping.get('explanation', 'N/A')}")
|
|
|
|
if mapping.get('examples'):
|
|
print(f"\n示例:")
|
|
for example in mapping['examples']:
|
|
print(f" - {example}")
|
|
|
|
if mapping.get('must_not_use'):
|
|
print(f"\n禁止使用:")
|
|
for term in mapping['must_not_use']:
|
|
print(f" ✗ {term}")
|
|
|
|
def main():
|
|
"""主函数"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(description='NAC Memory System - Query Tool')
|
|
parser.add_argument('--keyword', '-k', help='搜索关键词')
|
|
parser.add_argument('--type', '-t', choices=['document', 'problem', 'decision', 'principle', 'term'],
|
|
help='记录类型')
|
|
parser.add_argument('--status', '-s', help='状态过滤(问题/决策)')
|
|
parser.add_argument('--severity', help='严重性过滤(问题)')
|
|
parser.add_argument('--category', '-c', help='类别过滤')
|
|
parser.add_argument('--term', help='查询术语映射')
|
|
parser.add_argument('--list', '-l', action='store_true', help='列出所有记录')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# 查询术语映射
|
|
if args.term or (args.type == 'term'):
|
|
mappings = query_terminology(args.term)
|
|
print(f"\n找到 {len(mappings)} 个术语映射:")
|
|
for mapping in mappings:
|
|
print_terminology(mapping)
|
|
return
|
|
|
|
# 查询文档
|
|
if args.type == 'document' or (not args.type and args.keyword):
|
|
docs = query_documents(args.keyword)
|
|
print(f"\n找到 {len(docs)} 个文档:")
|
|
for doc in docs:
|
|
print_document(doc)
|
|
|
|
# 查询问题
|
|
if args.type == 'problem' or args.list:
|
|
problems = query_problems(args.keyword, args.status, args.severity)
|
|
print(f"\n找到 {len(problems)} 个问题:")
|
|
for problem in problems:
|
|
print_problem(problem)
|
|
|
|
# 查询决策
|
|
if args.type == 'decision' or args.list:
|
|
decisions = query_decisions(args.keyword, args.status, args.category)
|
|
print(f"\n找到 {len(decisions)} 个决策:")
|
|
for decision in decisions:
|
|
print_decision(decision)
|
|
|
|
# 查询原则
|
|
if args.type == 'principle':
|
|
principles = query_principles(args.keyword, args.category)
|
|
print(f"\n找到 {len(principles)} 个原则类别:")
|
|
for principle in principles:
|
|
print(f"\n{'='*80}")
|
|
print(f"🎯 {principle.get('title', 'Untitled')}")
|
|
print(f"{'='*80}")
|
|
print(f"类别: {principle.get('category', 'N/A')}")
|
|
if principle.get('principles'):
|
|
print(f"原则数量: {len(principle['principles'])}")
|
|
if principle.get('mappings'):
|
|
print(f"映射数量: {len(principle['mappings'])}")
|
|
|
|
# 如果没有指定任何参数,显示帮助
|
|
if not any([args.keyword, args.type, args.term, args.list]):
|
|
parser.print_help()
|
|
print("\n示例:")
|
|
print(" python query.py --keyword CBPP # 搜索包含CBPP的所有记录")
|
|
print(" python query.py --type problem # 列出所有问题")
|
|
print(" python query.py --type decision # 列出所有决策")
|
|
print(" python query.py --term Contract # 查询术语映射")
|
|
print(" python query.py --list # 列出所有记录")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|