#!/usr/bin/env python3 """ NAC Memory System - Query Tool 查询记忆系统中的文档、问题、决策和原则 """ import json import sys import os from pathlib import Path from typing import List, Dict, Any # 记忆系统根目录 MEMORY_ROOT = Path(__file__).parent.parent def load_json(file_path: Path) -> Dict[str, Any]: """加载JSON文件""" with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) def search_in_text(text: str, keyword: str) -> bool: """在文本中搜索关键词(不区分大小写)""" return keyword.lower() in text.lower() def search_in_dict(data: Dict[str, Any], keyword: str) -> bool: """递归搜索字典中的关键词""" if isinstance(data, dict): for key, value in data.items(): if search_in_text(str(key), keyword) or search_in_dict(value, keyword): return True elif isinstance(data, list): for item in data: if search_in_dict(item, keyword): return True elif isinstance(data, str): return search_in_text(data, keyword) return False def query_documents(keyword: str = None, doc_type: str = None) -> List[Dict[str, Any]]: """查询文档""" index_file = MEMORY_ROOT / "documents" / "index.json" if not index_file.exists(): return [] index = load_json(index_file) results = [] for doc_info in index.get("documents", []): doc_file = MEMORY_ROOT / "documents" / doc_info["file"] if not doc_file.exists(): continue doc = load_json(doc_file) # 过滤条件 if keyword and not search_in_dict(doc, keyword): continue if doc_type and doc.get("document_type") != doc_type: continue results.append(doc) return results def query_problems(keyword: str = None, status: str = None, severity: str = None) -> List[Dict[str, Any]]: """查询问题""" index_file = MEMORY_ROOT / "problems" / "index.json" if not index_file.exists(): return [] index = load_json(index_file) results = [] for problem_info in index.get("problems", []): problem_file = MEMORY_ROOT / "problems" / problem_info["file"] if not problem_file.exists(): continue problem = load_json(problem_file) # 过滤条件 if keyword and not search_in_dict(problem, keyword): continue if status and problem.get("status") != status: continue if severity and problem.get("severity") != severity: continue results.append(problem) return results def query_decisions(keyword: str = None, status: str = None, category: str = None) -> List[Dict[str, Any]]: """查询决策""" index_file = MEMORY_ROOT / "decisions" / "index.json" if not index_file.exists(): return [] index = load_json(index_file) results = [] for decision_info in index.get("decisions", []): decision_file = MEMORY_ROOT / "decisions" / decision_info["file"] if not decision_file.exists(): continue decision = load_json(decision_file) # 过滤条件 if keyword and not search_in_dict(decision, keyword): continue if status and decision.get("status") != status: continue if category and decision.get("category") != category: continue results.append(decision) return results def query_principles(keyword: str = None, category: str = None) -> List[Dict[str, Any]]: """查询原则""" index_file = MEMORY_ROOT / "principles" / "index.json" if not index_file.exists(): return [] index = load_json(index_file) results = [] for cat_info in index.get("categories", []): if category and cat_info["category"] != category: continue principle_file = MEMORY_ROOT / "principles" / cat_info["file"] if not principle_file.exists(): continue principles = load_json(principle_file) if keyword and not search_in_dict(principles, keyword): continue results.append(principles) return results def query_terminology(term: str = None) -> List[Dict[str, Any]]: """查询术语映射""" terminology_file = MEMORY_ROOT / "principles" / "terminology.json" if not terminology_file.exists(): return [] terminology = load_json(terminology_file) if not term: return terminology.get("mappings", []) results = [] for mapping in terminology.get("mappings", []): if (search_in_text(mapping.get("wrong_term", ""), term) or search_in_text(mapping.get("correct_term", ""), term) or search_in_text(mapping.get("chinese", ""), term)): results.append(mapping) return results def print_document(doc: Dict[str, Any]): """打印文档信息""" print(f"\n{'='*80}") print(f"📄 {doc.get('title', 'Untitled')}") print(f"{'='*80}") print(f"ID: {doc.get('doc_id', 'N/A')}") print(f"类型: {doc.get('document_type', 'N/A')}") print(f"重要性: {doc.get('importance', 'N/A')}") print(f"阅读日期: {doc.get('read_date', 'N/A')}") print(f"路径: {doc.get('path', 'N/A')}") print(f"\n摘要:") print(f" {doc.get('summary', 'N/A')}") if doc.get('core_concepts'): print(f"\n核心概念 ({len(doc['core_concepts'])}个):") for i, concept in enumerate(doc['core_concepts'][:3], 1): print(f" {i}. {concept.get('concept', 'N/A')}") print(f" {concept.get('definition', 'N/A')}") if doc.get('key_features'): print(f"\n关键特性 ({len(doc['key_features'])}个):") for i, feature in enumerate(doc['key_features'][:5], 1): print(f" {i}. {feature}") def print_problem(problem: Dict[str, Any]): """打印问题信息""" print(f"\n{'='*80}") print(f"⚠️ {problem.get('title', 'Untitled')}") print(f"{'='*80}") print(f"ID: {problem.get('problem_id', 'N/A')}") print(f"日期: {problem.get('date', 'N/A')}") print(f"类别: {problem.get('category', 'N/A')}") print(f"严重性: {problem.get('severity', 'N/A')}") print(f"状态: {problem.get('status', 'N/A')}") print(f"\n问题描述:") print(f" {problem.get('description', 'N/A')}") if problem.get('root_cause'): print(f"\n根本原因:") cause = problem['root_cause'] if isinstance(cause, dict): print(f" 主要: {cause.get('primary', 'N/A')}") print(f" 次要: {cause.get('secondary', 'N/A')}") else: print(f" {cause}") if problem.get('solution'): print(f"\n解决方案:") solution = problem['solution'] if isinstance(solution, dict): for key, value in solution.items(): print(f" {key}: {value}") else: print(f" {solution}") def print_decision(decision: Dict[str, Any]): """打印决策信息""" print(f"\n{'='*80}") print(f"📋 {decision.get('title', 'Untitled')}") print(f"{'='*80}") print(f"ID: {decision.get('decision_id', 'N/A')}") print(f"日期: {decision.get('date', 'N/A')}") print(f"类别: {decision.get('category', 'N/A')}") print(f"状态: {decision.get('status', 'N/A')}") if decision.get('decision'): print(f"\n决策内容:") dec = decision['decision'] if isinstance(dec, dict): print(f" {dec.get('statement', 'N/A')}") if dec.get('rules'): print(f"\n 规则:") for rule in dec['rules']: print(f" - {rule}") else: print(f" {dec}") if decision.get('rationale'): print(f"\n决策理由:") for reason in decision['rationale']: print(f" - {reason}") def print_terminology(mapping: Dict[str, Any]): """打印术语映射""" print(f"\n{'='*80}") print(f"🔤 {mapping.get('wrong_term', 'N/A')} → {mapping.get('correct_term', 'N/A')}") print(f"{'='*80}") print(f"ID: {mapping.get('mapping_id', 'N/A')}") print(f"中文: {mapping.get('chinese', 'N/A')}") print(f"类别: {mapping.get('category', 'N/A')}") print(f"\n说明:") print(f" {mapping.get('explanation', 'N/A')}") if mapping.get('examples'): print(f"\n示例:") for example in mapping['examples']: print(f" - {example}") if mapping.get('must_not_use'): print(f"\n禁止使用:") for term in mapping['must_not_use']: print(f" ✗ {term}") def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='NAC Memory System - Query Tool') parser.add_argument('--keyword', '-k', help='搜索关键词') parser.add_argument('--type', '-t', choices=['document', 'problem', 'decision', 'principle', 'term'], help='记录类型') parser.add_argument('--status', '-s', help='状态过滤(问题/决策)') parser.add_argument('--severity', help='严重性过滤(问题)') parser.add_argument('--category', '-c', help='类别过滤') parser.add_argument('--term', help='查询术语映射') parser.add_argument('--list', '-l', action='store_true', help='列出所有记录') args = parser.parse_args() # 查询术语映射 if args.term or (args.type == 'term'): mappings = query_terminology(args.term) print(f"\n找到 {len(mappings)} 个术语映射:") for mapping in mappings: print_terminology(mapping) return # 查询文档 if args.type == 'document' or (not args.type and args.keyword): docs = query_documents(args.keyword) print(f"\n找到 {len(docs)} 个文档:") for doc in docs: print_document(doc) # 查询问题 if args.type == 'problem' or args.list: problems = query_problems(args.keyword, args.status, args.severity) print(f"\n找到 {len(problems)} 个问题:") for problem in problems: print_problem(problem) # 查询决策 if args.type == 'decision' or args.list: decisions = query_decisions(args.keyword, args.status, args.category) print(f"\n找到 {len(decisions)} 个决策:") for decision in decisions: print_decision(decision) # 查询原则 if args.type == 'principle': principles = query_principles(args.keyword, args.category) print(f"\n找到 {len(principles)} 个原则类别:") for principle in principles: print(f"\n{'='*80}") print(f"🎯 {principle.get('title', 'Untitled')}") print(f"{'='*80}") print(f"类别: {principle.get('category', 'N/A')}") if principle.get('principles'): print(f"原则数量: {len(principle['principles'])}") if principle.get('mappings'): print(f"映射数量: {len(principle['mappings'])}") # 如果没有指定任何参数,显示帮助 if not any([args.keyword, args.type, args.term, args.list]): parser.print_help() print("\n示例:") print(" python query.py --keyword CBPP # 搜索包含CBPP的所有记录") print(" python query.py --type problem # 列出所有问题") print(" python query.py --type decision # 列出所有决策") print(" python query.py --term Contract # 查询术语映射") print(" python query.py --list # 列出所有记录") if __name__ == "__main__": main()