50 lines
1.4 KiB
Python
50 lines
1.4 KiB
Python
from motor.motor_asyncio import AsyncIOMotorClient
|
||
from pymongo import MongoClient
|
||
from datetime import datetime, timezone
|
||
|
||
MONGO_URI = "mongodb://gnacs_user:GnacsDB2026!@127.0.0.1:27017/gnacs_db?authSource=admin"
|
||
MONGO_DB = "gnacs_db"
|
||
|
||
def now_utc():
|
||
return datetime.now(timezone.utc)
|
||
|
||
# 异步客户端(FastAPI使用)
|
||
motor_client = None
|
||
db = None
|
||
|
||
# 集合变量(在startup事件中初始化)
|
||
asset_classes_col = None
|
||
jurisdictions_col = None
|
||
jurisdiction_col = None # 别名,兼容旧代码
|
||
compliance_rules_col = None
|
||
tax_treaties_col = None
|
||
gnacs_codes_col = None
|
||
|
||
async def connect_db():
|
||
global motor_client, db
|
||
global asset_classes_col, jurisdictions_col, jurisdiction_col
|
||
global compliance_rules_col, tax_treaties_col, gnacs_codes_col
|
||
motor_client = AsyncIOMotorClient(MONGO_URI)
|
||
db = motor_client[MONGO_DB]
|
||
await db.command("ping")
|
||
asset_classes_col = db["asset_classes"]
|
||
jurisdictions_col = db["jurisdictions"]
|
||
jurisdiction_col = db["jurisdictions"] # 别名
|
||
compliance_rules_col = db["compliance_rules"]
|
||
tax_treaties_col = db["tax_treaties"]
|
||
gnacs_codes_col = db["gnacs_codes"]
|
||
print(f"GNACS MongoDB connected: {MONGO_DB}")
|
||
return db
|
||
|
||
async def close_db():
|
||
global motor_client
|
||
if motor_client:
|
||
motor_client.close()
|
||
|
||
def get_db():
|
||
return db
|
||
|
||
def get_sync_db():
|
||
sync_client = MongoClient(MONGO_URI)
|
||
return sync_client[MONGO_DB]
|