chore: 删除 Python 遗留文件,仓库全部改为 Rust 实现

pull/2/head
jackwener 2026-04-16 15:22:29 +08:00
parent 6b7285c730
commit 02f6c4a748
8 changed files with 0 additions and 2809 deletions

220
config.py
View File

@ -1,220 +0,0 @@
"""
配置加载器 - config.json 读取路径配置
首次运行时自动检测微信数据目录检测失败则提示手动配置
"""
import glob
import json
import os
import platform
import sys
CONFIG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
_SYSTEM = platform.system().lower()
if _SYSTEM == "linux":
_DEFAULT_TEMPLATE_DIR = os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage")
_DEFAULT_PROCESS = "wechat"
elif _SYSTEM == "darwin":
# macOS 使用独立的 C 扫描器 (find_all_keys_macos.c),此处仅提供 config 默认值
_DEFAULT_TEMPLATE_DIR = os.path.expanduser("~/Documents/xwechat_files/your_wxid/db_storage")
_DEFAULT_PROCESS = "WeChat"
else:
_DEFAULT_TEMPLATE_DIR = r"D:\xwechat_files\your_wxid\db_storage"
_DEFAULT_PROCESS = "Weixin.exe"
_DEFAULT = {
"db_dir": _DEFAULT_TEMPLATE_DIR,
"keys_file": "all_keys.json",
"decrypted_dir": "decrypted",
"decoded_image_dir": "decoded_images",
"wechat_process": _DEFAULT_PROCESS,
}
def _choose_candidate(candidates):
"""在多个候选目录中选择一个。"""
if len(candidates) == 1:
return candidates[0]
if len(candidates) > 1:
if not sys.stdin.isatty():
return candidates[0]
print("[!] 检测到多个微信数据目录(请选择当前正在运行的微信账号):")
for i, c in enumerate(candidates, 1):
print(f" {i}. {c}")
print(" 0. 跳过,稍后手动配置")
try:
while True:
choice = input("请选择 [0-{}]: ".format(len(candidates))).strip()
if choice == "0":
return None
if choice.isdigit() and 1 <= int(choice) <= len(candidates):
return candidates[int(choice) - 1]
print(" 无效输入,请重新选择")
except (EOFError, KeyboardInterrupt):
print()
return None
return None
def _auto_detect_db_dir_windows():
"""从微信本地配置自动检测 Windows db_storage 路径。
读取 %APPDATA%\\Tencent\\xwechat\\config\\*.ini
找到数据存储根目录然后匹配 xwechat_files\\*\\db_storage
"""
appdata = os.environ.get("APPDATA", "")
config_dir = os.path.join(appdata, "Tencent", "xwechat", "config")
if not os.path.isdir(config_dir):
return None
# 从 ini 文件中找到有效的目录路径
data_roots = []
for ini_file in glob.glob(os.path.join(config_dir, "*.ini")):
try:
# 微信 ini 可能是 utf-8 或 gbk 编码(中文路径)
content = None
for enc in ("utf-8", "gbk"):
try:
with open(ini_file, "r", encoding=enc) as f:
content = f.read(1024).strip()
break
except UnicodeDecodeError:
continue
if not content or any(c in content for c in "\n\r\x00"):
continue
if os.path.isdir(content):
data_roots.append(content)
except OSError:
continue
# 在每个根目录下搜索 xwechat_files\*\db_storage
seen = set()
candidates = []
for root in data_roots:
pattern = os.path.join(root, "xwechat_files", "*", "db_storage")
for match in glob.glob(pattern):
normalized = os.path.normcase(os.path.normpath(match))
if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
candidates.append(match)
return _choose_candidate(candidates)
def _auto_detect_db_dir_linux():
"""自动检测 Linux 微信 db_storage 路径。
优先搜索当前用户的 home 目录 sudo 运行时通过 SUDO_USER 回退到
实际用户的 home避免只搜索 /root 而遗漏真实数据目录
"""
seen = set()
candidates = []
search_roots = [
os.path.expanduser("~/Documents/xwechat_files"),
]
# sudo 运行时,~ 展开为 /root回退到实际用户的 home
sudo_user = os.environ.get("SUDO_USER")
if sudo_user:
# 验证 SUDO_USER 是合法系统用户,防止路径注入
import pwd
try:
sudo_home = pwd.getpwnam(sudo_user).pw_dir
except KeyError:
sudo_home = None
if sudo_home:
fallback = os.path.join(sudo_home, "Documents", "xwechat_files")
if fallback not in search_roots:
search_roots.append(fallback)
for root in search_roots:
if not os.path.isdir(root):
continue
pattern = os.path.join(root, "*", "db_storage")
for match in glob.glob(pattern):
normalized = os.path.normcase(os.path.normpath(match))
if os.path.isdir(match) and normalized not in seen:
seen.add(normalized)
candidates.append(match)
# 早期 Linux 微信版本wine/容器方案)使用的数据路径
old_path = os.path.expanduser("~/.local/share/weixin/data/db_storage")
if os.path.isdir(old_path):
normalized = os.path.normcase(os.path.normpath(old_path))
if normalized not in seen:
candidates.append(old_path)
# 优先使用最近活跃账号:按 message 目录 mtime 降序近似排序best-effort
def _mtime(path):
msg_dir = os.path.join(path, "message")
target = msg_dir if os.path.isdir(msg_dir) else path
try:
return os.path.getmtime(target)
except OSError:
return 0
candidates.sort(key=_mtime, reverse=True)
return _choose_candidate(candidates)
def auto_detect_db_dir():
if _SYSTEM == "windows":
return _auto_detect_db_dir_windows()
if _SYSTEM == "linux":
return _auto_detect_db_dir_linux()
return None
def load_config():
cfg = {}
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, encoding="utf-8") as f:
cfg = json.load(f)
except json.JSONDecodeError:
print(f"[!] {CONFIG_FILE} 格式损坏,将使用默认配置")
cfg = {}
# db_dir 缺失或仍为模板值时,尝试自动检测
db_dir = cfg.get("db_dir", "")
if not db_dir or db_dir == _DEFAULT_TEMPLATE_DIR or "your_wxid" in db_dir:
detected = auto_detect_db_dir()
if detected:
print(f"[+] 自动检测到微信数据目录: {detected}")
cfg = {**_DEFAULT, **cfg, "db_dir": detected}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
print(f"[+] 已保存到: {CONFIG_FILE}")
else:
if not os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(_DEFAULT, f, indent=4, ensure_ascii=False)
print(f"[!] 未能自动检测微信数据目录")
print(f" 请手动编辑 {CONFIG_FILE} 中的 db_dir 字段")
if _SYSTEM == "linux":
print(" Linux 默认路径类似: ~/Documents/xwechat_files/<wxid>/db_storage")
else:
print(f" 路径可在 微信设置 → 文件管理 中找到")
sys.exit(1)
else:
cfg = {**_DEFAULT, **cfg}
# 将相对路径转为绝对路径
base = os.path.dirname(os.path.abspath(__file__))
for key in ("keys_file", "decrypted_dir", "decoded_image_dir"):
if key in cfg and not os.path.isabs(cfg[key]):
cfg[key] = os.path.join(base, cfg[key])
# 自动推导微信数据根目录db_dir 的上级目录)
# db_dir 格式: D:\xwechat_files\<wxid>\db_storage
# base_dir 格式: D:\xwechat_files\<wxid>
db_dir = cfg.get("db_dir", "")
if db_dir and os.path.basename(db_dir) == "db_storage":
cfg["wechat_base_dir"] = os.path.dirname(db_dir)
else:
cfg["wechat_base_dir"] = db_dir
# decoded_image_dir 默认值
if "decoded_image_dir" not in cfg:
cfg["decoded_image_dir"] = os.path.join(base, "decoded_images")
return cfg

View File

@ -1,38 +0,0 @@
import os
import posixpath
def strip_key_metadata(keys):
"""移除 all_keys.json 中以下划线开头的元数据字段,返回新 dict。"""
return {k: v for k, v in keys.items() if not k.startswith("_")}
def _is_safe_rel_path(path):
"""检查路径不包含 .. 等遍历组件。"""
normalized = path.replace("\\", "/")
return ".." not in posixpath.normpath(normalized).split("/")
def key_path_variants(rel_path):
"""生成同一路径的多种分隔符表示,兼容 Windows/Linux JSON key。"""
normalized = rel_path.replace("\\", "/")
variants = []
for candidate in (
rel_path,
normalized,
normalized.replace("/", "\\"),
normalized.replace("/", os.sep),
):
if candidate not in variants:
variants.append(candidate)
return variants
def get_key_info(keys, rel_path):
"""按相对路径查找数据库密钥,自动兼容不同平台分隔符。"""
if not _is_safe_rel_path(rel_path):
return None
for candidate in key_path_variants(rel_path):
if candidate in keys and not candidate.startswith("_"):
return keys[candidate]
return None

View File

@ -1,9 +0,0 @@
[project]
name = "wechat-decrypt"
version = "0.1.0"
requires-python = ">=3.12"
dependencies = [
"pycryptodome>=3.19,<4",
"zstandard>=0.22,<1",
"click>=8.1,<9",
]

View File

@ -1,342 +0,0 @@
"""
Tests for wx_daemon query functions and wx CLI commands.
These tests use mocking to avoid requiring a live WeChat installation.
"""
import hashlib
import json
import os
import queue
import socket
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, call
# Ensure project root is on the path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# ─── helpers ─────────────────────────────────────────────────────────────────
def _md5(s: str) -> str:
return hashlib.md5(s.encode()).hexdigest()
# ─── Test: global search chat-name resolution (Task 2) ───────────────────────
class TestSearchChatNameResolution(unittest.TestCase):
"""q_search should resolve contact names instead of showing raw md5/empty."""
def _make_names(self):
return {
"wxid_abc": "Alice",
"wxid_xyz@chatroom": "AI 交流群",
"wxid_solo": "Bob",
}
def test_md5_lookup_built_correctly(self):
"""_get_md5_lookup returns {md5(username): username} for all contacts."""
import wx_daemon
names = self._make_names()
with patch.object(wx_daemon, '_names', names), \
patch.object(wx_daemon, '_md5_to_uname', None):
lookup = wx_daemon._get_md5_lookup()
for uname in names:
assert _md5(uname) in lookup
assert lookup[_md5(uname)] == uname
def test_search_resolves_display_name(self):
"""Global search results contain resolved display names, not empty strings."""
import wx_daemon
names = self._make_names()
alice_md5 = _md5("wxid_abc")
table_name = f"Msg_{alice_md5}"
md5_lookup = {_md5(u): u for u in names}
fake_row = (1, 1, 1700000000, 0, "hello Alice", None)
fake_tables = [(table_name,)]
with patch.object(wx_daemon, '_names', names), \
patch.object(wx_daemon, '_md5_to_uname', md5_lookup), \
patch.object(wx_daemon, 'MSG_DB_KEYS', ['message/message_0.db']), \
patch.object(wx_daemon._db, 'get', return_value='/tmp/fake.db'), \
patch('wx_daemon.closing') as mock_closing, \
patch('wx_daemon.sqlite3') as mock_sqlite:
mock_conn = MagicMock()
mock_conn.execute.side_effect = [
MagicMock(fetchall=lambda: fake_tables), # table listing
MagicMock(fetchall=lambda: []), # Name2Id
MagicMock(fetchall=lambda: [fake_row]), # message search
]
mock_sqlite.connect.return_value = mock_conn
mock_closing.return_value.__enter__ = lambda s, *a: mock_conn
mock_closing.return_value.__exit__ = MagicMock(return_value=False)
result = wx_daemon.q_search("Alice", chats=None, limit=10)
# The result should have chat name "Alice", not "" or "未知"
assert result.get("count", 0) >= 0 # basic sanity
def test_refresh_names_clears_md5_cache(self):
"""_refresh_names() clears both _names and _md5_to_uname caches."""
import wx_daemon
saved_names = wx_daemon._names
saved_md5 = wx_daemon._md5_to_uname
try:
# Pre-populate caches with stale data
wx_daemon._names = {"old": "OldName"}
wx_daemon._md5_to_uname = {_md5("old"): "old"}
with patch.object(wx_daemon._db, 'get', return_value=None):
wx_daemon._refresh_names()
# After refresh, md5 cache must be rebuilt (not None)
assert wx_daemon._md5_to_uname is not None
# Cache no longer contains stale "old" username (contact.db unavailable → empty)
assert _md5("old") not in wx_daemon._md5_to_uname
finally:
wx_daemon._names = saved_names
wx_daemon._md5_to_uname = saved_md5
# ─── Test: wx init helpers (Task 1) ──────────────────────────────────────────
class TestInitHelpers(unittest.TestCase):
"""Tests for wx init helper functions."""
def test_detect_db_dir_macos_returns_most_recent(self):
"""_detect_db_dir picks the most recently modified db_storage on macOS."""
import wx
# Use paths that don't share characters to avoid 'in' ambiguity
newer = '/wechat/newer/db_storage'
older = '/wechat/older/db_storage'
mtimes = {newer: 9999, older: 1000}
with patch('wx.platform.system', return_value='Darwin'), \
patch('wx.glob.glob', return_value=[older, newer]), \
patch('wx.os.path.isdir', return_value=True), \
patch('wx.os.path.getmtime', side_effect=lambda p: mtimes.get(p, 0)):
result = wx._detect_db_dir()
assert result == newer
def test_detect_db_dir_macos_returns_none_when_not_found(self):
"""_detect_db_dir returns None when no db_storage directory exists."""
import wx
with patch('wx.platform.system', return_value='Darwin'), \
patch('wx.glob.glob', return_value=[]):
result = wx._detect_db_dir()
assert result is None
def test_detect_db_dir_linux(self):
"""_detect_db_dir works on Linux with standard xwechat_files paths."""
import wx
with patch('wx.platform.system', return_value='Linux'), \
patch('wx.glob.glob', side_effect=lambda p: ['/home/user/Documents/xwechat_files/wxid/db_storage'] if '*' in p else []), \
patch('wx.os.path.isdir', return_value=True), \
patch('wx.os.path.getmtime', return_value=1000.0):
result = wx._detect_db_dir()
assert result is not None
# ─── Test: wx export formatting (Task 4) ─────────────────────────────────────
class TestExportFormatting(unittest.TestCase):
"""Tests for wx export command output formats."""
_SAMPLE_RESP = {
"ok": True,
"chat": "Alice",
"username": "wxid_abc",
"is_group": False,
"count": 2,
"messages": [
{"timestamp": 1700000000, "time": "2023-11-14 22:13", "sender": "", "content": "Hello", "type": "文本", "local_id": 1},
{"timestamp": 1700000060, "time": "2023-11-14 22:14", "sender": "Alice", "content": "World", "type": "文本", "local_id": 2},
],
}
def _run_export(self, fmt, extra_args=None):
from click.testing import CliRunner
import wx
runner = CliRunner()
with patch('wx._send', return_value=self._SAMPLE_RESP), \
patch('wx._ensure_daemon'):
args = ['export', 'Alice', '--format', fmt]
if extra_args:
args.extend(extra_args)
result = runner.invoke(wx.cli, args)
return result
def test_export_json(self):
result = self._run_export('json')
assert result.exit_code == 0
data = json.loads(result.output)
assert data['chat'] == 'Alice'
assert len(data['messages']) == 2
def test_export_txt(self):
result = self._run_export('txt')
assert result.exit_code == 0
assert '=== Alice' in result.output
assert 'Hello' in result.output
assert 'Alice: World' in result.output
def test_export_markdown(self):
result = self._run_export('markdown')
assert result.exit_code == 0
assert '# Alice' in result.output
assert '**Alice**' in result.output
assert 'Hello' in result.output
def test_export_to_file(self):
from click.testing import CliRunner
import wx
runner = CliRunner()
with runner.isolated_filesystem():
with patch('wx._send', return_value=self._SAMPLE_RESP), \
patch('wx._ensure_daemon'):
result = runner.invoke(wx.cli, ['export', 'Alice', '-o', 'out.md'])
assert result.exit_code == 0
assert os.path.exists('out.md')
content = open('out.md').read()
assert '# Alice' in content
def test_export_group_chat_markdown(self):
resp = dict(self._SAMPLE_RESP, chat='AI 群', is_group=True,
messages=[{**self._SAMPLE_RESP['messages'][1]}])
from click.testing import CliRunner
import wx
runner = CliRunner()
with patch('wx._send', return_value=resp), patch('wx._ensure_daemon'):
result = runner.invoke(wx.cli, ['export', 'AI 群', '--format', 'markdown'])
assert result.exit_code == 0
assert '群聊' in result.output
# ─── Test: watch connection protocol (Task 3) ─────────────────────────────────
class TestWatchProtocol(unittest.TestCase):
"""Tests for the watch streaming protocol."""
def test_watch_receives_connected_event(self):
"""watch command should receive a 'connected' event upon connection."""
import wx
events = [
json.dumps({"event": "connected"}) + '\n',
]
mock_socket = MagicMock()
mock_file = MagicMock()
mock_file.__iter__ = lambda s: iter(events)
mock_socket.makefile.return_value = mock_file
from click.testing import CliRunner
runner = CliRunner()
with patch('wx.socket.socket', return_value=mock_socket), \
patch('wx._ensure_daemon'):
result = runner.invoke(wx.cli, ['watch', '--json'],
catch_exceptions=False)
# connected/heartbeat events are filtered out; output should be empty
assert result.exit_code == 0
assert result.output.strip() == ''
def test_watch_json_outputs_message_events(self):
"""watch --json should print message events as JSON lines."""
import wx
msg_event = {"event": "message", "chat": "Alice", "content": "hi",
"time": "10:00", "sender": "", "is_group": False}
events = [
json.dumps({"event": "connected"}) + '\n',
json.dumps(msg_event) + '\n',
]
mock_socket = MagicMock()
mock_file = MagicMock()
mock_file.__iter__ = lambda s: iter(events)
mock_socket.makefile.return_value = mock_file
from click.testing import CliRunner
runner = CliRunner()
with patch('wx.socket.socket', return_value=mock_socket), \
patch('wx._ensure_daemon'):
result = runner.invoke(wx.cli, ['watch', '--json'],
catch_exceptions=False)
assert result.exit_code == 0
lines = [l for l in result.output.strip().split('\n') if l]
assert len(lines) == 1
data = json.loads(lines[0])
assert data['chat'] == 'Alice'
assert data['event'] == 'message'
def test_watch_plain_formats_output(self):
"""watch without --json should format messages with ANSI codes."""
import wx
msg_event = {"event": "message", "chat": "Alice", "content": "hello",
"time": "10:00", "sender": "", "is_group": False}
events = [
json.dumps({"event": "connected"}) + '\n',
json.dumps(msg_event) + '\n',
]
mock_socket = MagicMock()
mock_file = MagicMock()
mock_file.__iter__ = lambda s: iter(events)
mock_socket.makefile.return_value = mock_file
from click.testing import CliRunner
runner = CliRunner()
with patch('wx.socket.socket', return_value=mock_socket), \
patch('wx._ensure_daemon'):
result = runner.invoke(wx.cli, ['watch'],
catch_exceptions=False)
assert result.exit_code == 0
# Should contain the chat name and content
assert 'Alice' in result.output
assert 'hello' in result.output
def test_watch_filters_by_chat(self):
"""watch --chat should filter events to only the specified chat."""
import wx
events = [
json.dumps({"event": "connected"}) + '\n',
json.dumps({"event": "message", "chat": "Bob", "content": "noise",
"time": "10:01", "sender": "", "is_group": False,
"username": "wxid_bob"}) + '\n',
json.dumps({"event": "message", "chat": "Alice", "content": "signal",
"time": "10:02", "sender": "", "is_group": False,
"username": "wxid_alice"}) + '\n',
]
mock_socket = MagicMock()
mock_file = MagicMock()
mock_file.__iter__ = lambda s: iter(events)
mock_socket.makefile.return_value = mock_file
from click.testing import CliRunner
runner = CliRunner()
with patch('wx.socket.socket', return_value=mock_socket), \
patch('wx._ensure_daemon'):
result = runner.invoke(wx.cli, ['watch', '--chat', 'Alice', '--json'],
catch_exceptions=False)
assert result.exit_code == 0
lines = [l for l in result.output.strip().split('\n') if l]
assert len(lines) == 1
assert json.loads(lines[0])['chat'] == 'Alice'
if __name__ == '__main__':
unittest.main(verbosity=2)

View File

@ -1,686 +0,0 @@
import hashlib
import os
import sqlite3
import tempfile
import unittest
from unittest.mock import patch
import mcp_server
class _FakeCache:
# 用最小缓存桩替代真实解密缓存,避免单元测试依赖本地微信环境。
def __init__(self, mapping):
self._mapping = mapping
def get(self, rel_key):
return self._mapping.get(rel_key)
def _msg_table_name(username):
# 生产代码使用 username 的 md5 作为消息表名,测试里保持一致。
return f"Msg_{hashlib.md5(username.encode()).hexdigest()}"
def _create_message_db(path, chats):
# 构造最小可用消息库,只包含搜索/历史查询依赖的字段。
conn = sqlite3.connect(path)
try:
conn.execute("CREATE TABLE Name2Id (user_name TEXT)")
for username, messages in chats.items():
conn.execute("INSERT INTO Name2Id(user_name) VALUES (?)", (username,))
table_name = _msg_table_name(username)
conn.execute(
f"""
CREATE TABLE [{table_name}] (
local_id INTEGER,
local_type INTEGER,
create_time INTEGER,
real_sender_id INTEGER,
message_content TEXT,
WCDB_CT_message_content INTEGER
)
"""
)
for local_id, create_time, content in messages:
conn.execute(
f"""
INSERT INTO [{table_name}] (
local_id, local_type, create_time, real_sender_id,
message_content, WCDB_CT_message_content
) VALUES (?, ?, ?, ?, ?, ?)
""",
(local_id, 1, create_time, 0, content, 0),
)
conn.commit()
finally:
conn.close()
class SearchMessagesTests(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.TemporaryDirectory()
self.addCleanup(self.temp_dir.cleanup)
def create_db(self, filename, chats):
path = os.path.join(self.temp_dir.name, filename)
_create_message_db(path, chats)
return path
def test_validate_pagination_rejects_large_limit(self):
# 防止单次查询过大,保证 limit 上限校验存在。
with self.assertRaisesRegex(ValueError, "limit 不能大于 500"):
mcp_server._validate_pagination(501, 0)
def test_validate_pagination_allows_large_limit_when_limit_is_unbounded(self):
# get_chat_history 允许更大的 limit只校验正数和 offset。
mcp_server._validate_pagination(999999, 0, limit_max=None)
def test_page_search_entries_returns_chronological_results_with_offset(self):
# 结果应先按最新时间分页,再把当前页恢复成时间正序输出。
entries = [(1, "a"), (5, "e"), (3, "c"), (4, "d"), (2, "b")]
paged = mcp_server._page_search_entries(entries, limit=2, offset=1)
self.assertEqual(paged, [(3, "c"), (4, "d")])
def test_search_messages_single_chat_uses_offset_and_returns_page(self):
# 单聊分页应只返回当前页,并按聊天阅读顺序展示。
db_path = self.create_db(
"single.db",
{
"alice": [
(1, 100, "foo newest"),
(2, 90, "foo middle"),
(3, 80, "foo oldest"),
]
},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
):
result = mcp_server.search_messages("foo", chat_name="Alice", limit=2, offset=1)
self.assertIn('在 Alice 中搜索 "foo" 找到 2 条结果offset=1, limit=2', result)
self.assertLess(result.index("foo oldest"), result.index("foo middle"))
self.assertNotIn("foo newest", result)
def test_search_messages_multiple_chats_applies_global_pagination(self):
# 多个聊天联合搜索时,分页必须基于合并后的全局结果。
db_path = self.create_db(
"multi.db",
{
"alice": [
(1, 110, "foo a1"),
(2, 90, "foo a2"),
],
"bob": [
(1, 100, "foo b1"),
(2, 80, "foo b2"),
],
},
)
contexts = [
{
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
},
{
"query": "Bob",
"username": "bob",
"display_name": "Bob",
"db_path": db_path,
"table_name": _msg_table_name("bob"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("bob")}],
"is_group": False,
},
]
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice", "bob": "Bob"}), patch.object(
mcp_server, "_resolve_chat_contexts", return_value=(contexts, [], [])
):
result = mcp_server.search_messages("foo", chat_name=["Alice", "Bob"], limit=2, offset=1)
self.assertIn('在 2 个聊天对象中搜索 "foo" 找到 2 条结果offset=1, limit=2', result)
self.assertLess(result.index("foo a2"), result.index("foo b1"))
self.assertNotIn("foo a1", result)
self.assertNotIn("foo b2", result)
def test_search_messages_all_messages_merges_global_results_before_paging(self):
# 全库搜索要基于跨库合并后的全局时间线分页,不能被单个分库提前截断。
older_db = self.create_db(
"older.db",
{"older_user": [(1, 10, "foo older 1"), (2, 9, "foo older 2"), (3, 8, "foo older 3")]},
)
newer_db = self.create_db(
"newer.db",
{"newer_user": [(1, 30, "foo newer 1"), (2, 20, "foo newer 2")]},
)
fake_cache = _FakeCache({"older": older_db, "newer": newer_db})
with patch.object(mcp_server, "MSG_DB_KEYS", ["older", "newer"]), patch.object(
mcp_server, "_cache", fake_cache
), patch.object(
mcp_server,
"get_contact_names",
return_value={"older_user": "Older", "newer_user": "Newer"},
):
result = mcp_server.search_messages("foo", limit=2, offset=0)
self.assertIn('搜索 "foo" 找到 2 条结果offset=0, limit=2', result)
self.assertLess(result.index("foo newer 2"), result.index("foo newer 1"))
self.assertNotIn("foo older 1", result)
def test_search_messages_all_messages_uses_bounded_sql_pagination(self):
# 每个消息表都只应查询当前页所需的候选窗口,不能回退到 limit=None 的全量扫描。
older_db = self.create_db(
"older_paged.db",
{"older_user": [(1, 10, "foo older 1"), (2, 9, "foo older 2"), (3, 8, "foo older 3")]},
)
newer_db = self.create_db(
"newer_paged.db",
{"newer_user": [(1, 30, "foo newer 1"), (2, 20, "foo newer 2"), (3, 19, "foo newer 3")]},
)
fake_cache = _FakeCache({"older": older_db, "newer": newer_db})
original_query_messages = mcp_server._query_messages
calls = []
def recording_query_messages(*args, **kwargs):
calls.append((args[1], kwargs.get("limit"), kwargs.get("offset", 0)))
return original_query_messages(*args, **kwargs)
with patch.object(mcp_server, "MSG_DB_KEYS", ["older", "newer"]), patch.object(
mcp_server, "_cache", fake_cache
), patch.object(
mcp_server,
"get_contact_names",
return_value={"older_user": "Older", "newer_user": "Newer"},
), patch.object(
mcp_server, "_query_messages", side_effect=recording_query_messages
):
result = mcp_server.search_messages("foo", limit=2, offset=1)
self.assertIn('搜索 "foo" 找到 2 条结果offset=1, limit=2', result)
self.assertEqual(
calls,
[
(_msg_table_name("older_user"), 3, 0),
(_msg_table_name("newer_user"), 3, 0),
],
)
def test_search_messages_single_chat_respects_time_range(self):
# 单聊搜索的开始/结束时间都必须严格生效。
db_path = self.create_db(
"single_time.db",
{
"alice": [
(1, 300, "foo in range"),
(2, 200, "foo too early"),
(3, 400, "foo too late"),
]
},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
), patch.object(
mcp_server, "_parse_time_range", return_value=(250, 350)
):
result = mcp_server.search_messages(
"foo",
chat_name="Alice",
start_time="custom-start",
end_time="custom-end",
limit=20,
offset=0,
)
self.assertIn("时间范围: custom-start ~ custom-end", result)
self.assertIn("foo in range", result)
self.assertNotIn("foo too early", result)
self.assertNotIn("foo too late", result)
def test_search_messages_multiple_chats_respects_time_range(self):
# 多聊联合搜索时,每个聊天对象都要套用同一时间范围。
db_path = self.create_db(
"multi_time.db",
{
"alice": [(1, 300, "foo alice in range"), (2, 150, "foo alice too early")],
"bob": [(1, 320, "foo bob in range"), (2, 500, "foo bob too late")],
},
)
contexts = [
{
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
},
{
"query": "Bob",
"username": "bob",
"display_name": "Bob",
"db_path": db_path,
"table_name": _msg_table_name("bob"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("bob")}],
"is_group": False,
},
]
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice", "bob": "Bob"}), patch.object(
mcp_server, "_resolve_chat_contexts", return_value=(contexts, [], [])
), patch.object(
mcp_server, "_parse_time_range", return_value=(250, 400)
):
result = mcp_server.search_messages(
"foo",
chat_name=["Alice", "Bob"],
start_time="range-start",
end_time="range-end",
limit=20,
offset=0,
)
self.assertIn("时间范围: range-start ~ range-end", result)
self.assertIn("foo alice in range", result)
self.assertIn("foo bob in range", result)
self.assertNotIn("foo alice too early", result)
self.assertNotIn("foo bob too late", result)
def test_search_messages_all_messages_respects_time_range(self):
# 全库搜索也不能返回时间范围外的消息。
db_path = self.create_db(
"all_time.db",
{
"alice": [
(1, 100, "foo too early"),
(2, 300, "foo in range"),
(3, 500, "foo too late"),
]
},
)
fake_cache = _FakeCache({"all": db_path})
with patch.object(mcp_server, "MSG_DB_KEYS", ["all"]), patch.object(
mcp_server, "_cache", fake_cache
), patch.object(
mcp_server,
"get_contact_names",
return_value={"alice": "Alice"},
), patch.object(
mcp_server, "_parse_time_range", return_value=(250, 350)
):
result = mcp_server.search_messages(
"foo",
start_time="range-start",
end_time="range-end",
limit=20,
offset=0,
)
self.assertIn("时间范围: range-start ~ range-end", result)
self.assertIn("foo in range", result)
self.assertNotIn("foo too early", result)
self.assertNotIn("foo too late", result)
def test_get_chat_history_merges_sharded_message_tables(self):
# 同一联系人跨多个 message_N.db 分片时,历史查询要先合并再分页。
older_db = self.create_db("history_older.db", {"alice": [(1, 100, "old message")]})
newer_db = self.create_db(
"history_newer.db",
{"alice": [(1, 300, "new message"), (2, 250, "middle message")]},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": newer_db,
"table_name": _msg_table_name("alice"),
"message_tables": [
{"db_path": older_db, "table_name": _msg_table_name("alice")},
{"db_path": newer_db, "table_name": _msg_table_name("alice")},
],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
):
result = mcp_server.get_chat_history("Alice", limit=2, offset=0)
self.assertIn("Alice 的消息记录(返回 2 条offset=0, limit=2", result)
self.assertIn("middle message", result)
self.assertIn("new message", result)
self.assertNotIn("old message", result)
def test_get_chat_history_large_limit_reads_all_rows_across_shards(self):
# 大 limit 下,跨分片历史查询不能只返回较旧分片里的少量消息。
older_messages = [
(index, 1000 + index, f"old shard message {index}")
for index in range(1, 18)
]
newer_messages = [
(index, 2000 + index, f"new shard message {index}")
for index in range(1, 296)
]
older_db = self.create_db("history_cross_shard_older.db", {"alice": older_messages})
newer_db = self.create_db("history_cross_shard_newer.db", {"alice": newer_messages})
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": newer_db,
"table_name": _msg_table_name("alice"),
"message_tables": [
{"db_path": newer_db, "table_name": _msg_table_name("alice")},
{"db_path": older_db, "table_name": _msg_table_name("alice")},
],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
):
result = mcp_server.get_chat_history("Alice", limit=500, offset=0)
self.assertIn("Alice 的消息记录(返回 312 条offset=0, limit=500", result)
self.assertIn("new shard message 295", result)
self.assertIn("old shard message 17", result)
body = result.split(":\n\n", 1)[1]
self.assertEqual(len(body.splitlines()), 312)
def test_get_chat_history_uses_bounded_sql_pagination(self):
# 历史查询应把 offset+limit 下推到 SQL避免把整张消息表读出来后再切片。
db_path = self.create_db(
"history_paged.db",
{
"alice": [
(1, 400, "newest"),
(2, 300, "middle"),
(3, 200, "older"),
(4, 100, "oldest"),
]
},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
original_query_messages = mcp_server._query_messages
calls = []
def recording_query_messages(*args, **kwargs):
calls.append((args[1], kwargs.get("limit"), kwargs.get("offset", 0)))
return original_query_messages(*args, **kwargs)
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
), patch.object(
mcp_server, "_query_messages", side_effect=recording_query_messages
):
result = mcp_server.get_chat_history("Alice", limit=2, offset=1)
self.assertIn("middle", result)
self.assertIn("older", result)
self.assertNotIn("newest", result)
self.assertNotIn("oldest", result)
self.assertEqual(calls, [(_msg_table_name("alice"), 3, 0)])
def test_get_chat_history_allows_large_limit_values(self):
# 历史查询不应再把大 limit 直接拒绝掉。
db_path = self.create_db(
"history_large_limit.db",
{
"alice": [
(1, 200, "message 1"),
(2, 100, "message 2"),
]
},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
):
result = mcp_server.get_chat_history("Alice", limit=999999, offset=0)
self.assertNotIn("错误:", result)
self.assertIn("message 1", result)
self.assertIn("message 2", result)
def test_get_chat_history_keeps_partial_results_when_formatting_fails(self):
# 单条坏消息不应让整个历史查询失败,已有结果仍应返回并附带失败说明。
db_path = self.create_db(
"history_partial_failure.db",
{"alice": [(1, 200, "good message"), (2, 100, "bad message")]},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
original_build_history_line = mcp_server._build_history_line
def flaky_build_history_line(row, *args, **kwargs):
if row[2] == 100:
raise ValueError("bad row")
return original_build_history_line(row, *args, **kwargs)
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
), patch.object(
mcp_server, "_build_history_line", side_effect=flaky_build_history_line
):
result = mcp_server.get_chat_history("Alice", limit=2, offset=0)
self.assertIn("good message", result)
self.assertIn("查询失败:", result)
self.assertIn("bad row", result)
def test_get_chat_history_does_not_truncate_long_messages(self):
# 历史记录应返回完整消息内容,而不是固定截断到 500 字符。
long_message = "x" * 600
db_path = self.create_db(
"history_long_message.db",
{"alice": [(1, 200, long_message)]},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
):
result = mcp_server.get_chat_history("Alice", limit=1, offset=0)
self.assertIn(long_message, result)
self.assertNotIn(("x" * 500) + "...", result)
def test_search_messages_single_chat_merges_sharded_message_tables(self):
# 单聊搜索也要跨分片合并,否则最近消息可能查不到。
older_db = self.create_db("search_older.db", {"alice": [(1, 100, "foo old")]})
newer_db = self.create_db(
"search_newer.db",
{"alice": [(1, 300, "foo new"), (2, 200, "foo middle")]},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": newer_db,
"table_name": _msg_table_name("alice"),
"message_tables": [
{"db_path": older_db, "table_name": _msg_table_name("alice")},
{"db_path": newer_db, "table_name": _msg_table_name("alice")},
],
"is_group": False,
}
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
), patch.object(
mcp_server, "_parse_time_range", return_value=(150, 350)
):
result = mcp_server.search_messages(
"foo",
chat_name="Alice",
start_time="range-start",
end_time="range-end",
limit=20,
offset=0,
)
self.assertIn("foo middle", result)
self.assertIn("foo new", result)
self.assertNotIn("foo old", result)
def test_search_messages_keeps_partial_results_when_later_batch_fails(self):
# 后续批次失败时,前面已经拿到的有效结果不应被丢弃。
db_path = self.create_db(
"search_partial_failure.db",
{
"alice": [
(1, 400, "foo newest"),
(2, 300, "foo skipped"),
(3, 200, "foo older"),
(4, 100, "foo bad"),
]
},
)
ctx = {
"query": "Alice",
"username": "alice",
"display_name": "Alice",
"db_path": db_path,
"table_name": _msg_table_name("alice"),
"message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}],
"is_group": False,
}
original_build_search_entry = mcp_server._build_search_entry
def flaky_build_search_entry(row, *args, **kwargs):
if row[2] == 300:
return None
if row[2] == 100:
raise ValueError("bad row")
return original_build_search_entry(row, *args, **kwargs)
with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object(
mcp_server, "_resolve_chat_context", return_value=ctx
), patch.object(
mcp_server, "_build_search_entry", side_effect=flaky_build_search_entry
):
result = mcp_server.search_messages("foo", chat_name="Alice", limit=3, offset=0)
self.assertIn("foo newest", result)
self.assertIn("foo older", result)
self.assertIn("查询失败:", result)
self.assertIn("bad row", result)
def test_get_recent_sessions_closes_connection_when_query_fails(self):
# 会话查询抛异常时也必须关闭 sqlite3 连接,避免资源泄漏。
fake_cache = _FakeCache({os.path.join("session", "session.db"): "session.db"})
class _FakeConn:
def __init__(self):
self.closed = False
def execute(self, *args, **kwargs):
raise sqlite3.OperationalError("boom")
def close(self):
self.closed = True
fake_conn = _FakeConn()
with patch.object(mcp_server, "_cache", fake_cache), patch.object(
mcp_server, "get_contact_names", return_value={}
), patch.object(
mcp_server.sqlite3, "connect", return_value=fake_conn
):
with self.assertRaisesRegex(sqlite3.OperationalError, "boom"):
mcp_server.get_recent_sessions()
self.assertTrue(fake_conn.closed)
def test_get_new_messages_closes_connection_when_query_fails(self):
# 新消息轮询失败时也要释放 sqlite3 连接。
fake_cache = _FakeCache({os.path.join("session", "session.db"): "session.db"})
class _FakeConn:
def __init__(self):
self.closed = False
def execute(self, *args, **kwargs):
raise sqlite3.OperationalError("boom")
def close(self):
self.closed = True
fake_conn = _FakeConn()
with patch.object(mcp_server, "_cache", fake_cache), patch.object(
mcp_server, "get_contact_names", return_value={}
), patch.object(
mcp_server.sqlite3, "connect", return_value=fake_conn
):
with self.assertRaisesRegex(sqlite3.OperationalError, "boom"):
mcp_server.get_new_messages()
self.assertTrue(fake_conn.closed)
if __name__ == "__main__":
unittest.main()

128
uv.lock
View File

@ -1,128 +0,0 @@
version = 1
revision = 3
requires-python = ">=3.12"
[[package]]
name = "click"
version = "8.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/57/75/31212c6bf2503fdf920d87fee5d7a86a2e3bcf444984126f13d8e4016804/click-8.3.2.tar.gz", hash = "sha256:14162b8b3b3550a7d479eafa77dfd3c38d9dc8951f6f69c78913a8f9a7540fd5", size = 302856, upload-time = "2026-04-03T19:14:45.118Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e4/20/71885d8b97d4f3dde17b1fdb92dbd4908b00541c5a3379787137285f602e/click-8.3.2-py3-none-any.whl", hash = "sha256:1924d2c27c5653561cd2cae4548d1406039cb79b858b747cfea24924bbc1616d", size = 108379, upload-time = "2026-04-03T19:14:43.505Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "pycryptodome"
version = "3.23.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/8e/a6/8452177684d5e906854776276ddd34eca30d1b1e15aa1ee9cefc289a33f5/pycryptodome-3.23.0.tar.gz", hash = "sha256:447700a657182d60338bab09fdb27518f8856aecd80ae4c6bdddb67ff5da44ef", size = 4921276, upload-time = "2025-05-17T17:21:45.242Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/5d/bdb09489b63cd34a976cc9e2a8d938114f7a53a74d3dd4f125ffa49dce82/pycryptodome-3.23.0-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:0011f7f00cdb74879142011f95133274741778abba114ceca229adbf8e62c3e4", size = 2495152, upload-time = "2025-05-17T17:20:20.833Z" },
{ url = "https://files.pythonhosted.org/packages/a7/ce/7840250ed4cc0039c433cd41715536f926d6e86ce84e904068eb3244b6a6/pycryptodome-3.23.0-cp313-cp313t-macosx_10_13_x86_64.whl", hash = "sha256:90460fc9e088ce095f9ee8356722d4f10f86e5be06e2354230a9880b9c549aae", size = 1639348, upload-time = "2025-05-17T17:20:23.171Z" },
{ url = "https://files.pythonhosted.org/packages/ee/f0/991da24c55c1f688d6a3b5a11940567353f74590734ee4a64294834ae472/pycryptodome-3.23.0-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4764e64b269fc83b00f682c47443c2e6e85b18273712b98aa43bcb77f8570477", size = 2184033, upload-time = "2025-05-17T17:20:25.424Z" },
{ url = "https://files.pythonhosted.org/packages/54/16/0e11882deddf00f68b68dd4e8e442ddc30641f31afeb2bc25588124ac8de/pycryptodome-3.23.0-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb8f24adb74984aa0e5d07a2368ad95276cf38051fe2dc6605cbcf482e04f2a7", size = 2270142, upload-time = "2025-05-17T17:20:27.808Z" },
{ url = "https://files.pythonhosted.org/packages/d5/fc/4347fea23a3f95ffb931f383ff28b3f7b1fe868739182cb76718c0da86a1/pycryptodome-3.23.0-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d97618c9c6684a97ef7637ba43bdf6663a2e2e77efe0f863cce97a76af396446", size = 2309384, upload-time = "2025-05-17T17:20:30.765Z" },
{ url = "https://files.pythonhosted.org/packages/6e/d9/c5261780b69ce66d8cfab25d2797bd6e82ba0241804694cd48be41add5eb/pycryptodome-3.23.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:9a53a4fe5cb075075d515797d6ce2f56772ea7e6a1e5e4b96cf78a14bac3d265", size = 2183237, upload-time = "2025-05-17T17:20:33.736Z" },
{ url = "https://files.pythonhosted.org/packages/5a/6f/3af2ffedd5cfa08c631f89452c6648c4d779e7772dfc388c77c920ca6bbf/pycryptodome-3.23.0-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:763d1d74f56f031788e5d307029caef067febf890cd1f8bf61183ae142f1a77b", size = 2343898, upload-time = "2025-05-17T17:20:36.086Z" },
{ url = "https://files.pythonhosted.org/packages/9a/dc/9060d807039ee5de6e2f260f72f3d70ac213993a804f5e67e0a73a56dd2f/pycryptodome-3.23.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:954af0e2bd7cea83ce72243b14e4fb518b18f0c1649b576d114973e2073b273d", size = 2269197, upload-time = "2025-05-17T17:20:38.414Z" },
{ url = "https://files.pythonhosted.org/packages/f9/34/e6c8ca177cb29dcc4967fef73f5de445912f93bd0343c9c33c8e5bf8cde8/pycryptodome-3.23.0-cp313-cp313t-win32.whl", hash = "sha256:257bb3572c63ad8ba40b89f6fc9d63a2a628e9f9708d31ee26560925ebe0210a", size = 1768600, upload-time = "2025-05-17T17:20:40.688Z" },
{ url = "https://files.pythonhosted.org/packages/e4/1d/89756b8d7ff623ad0160f4539da571d1f594d21ee6d68be130a6eccb39a4/pycryptodome-3.23.0-cp313-cp313t-win_amd64.whl", hash = "sha256:6501790c5b62a29fcb227bd6b62012181d886a767ce9ed03b303d1f22eb5c625", size = 1799740, upload-time = "2025-05-17T17:20:42.413Z" },
{ url = "https://files.pythonhosted.org/packages/5d/61/35a64f0feaea9fd07f0d91209e7be91726eb48c0f1bfc6720647194071e4/pycryptodome-3.23.0-cp313-cp313t-win_arm64.whl", hash = "sha256:9a77627a330ab23ca43b48b130e202582e91cc69619947840ea4d2d1be21eb39", size = 1703685, upload-time = "2025-05-17T17:20:44.388Z" },
{ url = "https://files.pythonhosted.org/packages/db/6c/a1f71542c969912bb0e106f64f60a56cc1f0fabecf9396f45accbe63fa68/pycryptodome-3.23.0-cp37-abi3-macosx_10_9_universal2.whl", hash = "sha256:187058ab80b3281b1de11c2e6842a357a1f71b42cb1e15bce373f3d238135c27", size = 2495627, upload-time = "2025-05-17T17:20:47.139Z" },
{ url = "https://files.pythonhosted.org/packages/6e/4e/a066527e079fc5002390c8acdd3aca431e6ea0a50ffd7201551175b47323/pycryptodome-3.23.0-cp37-abi3-macosx_10_9_x86_64.whl", hash = "sha256:cfb5cd445280c5b0a4e6187a7ce8de5a07b5f3f897f235caa11f1f435f182843", size = 1640362, upload-time = "2025-05-17T17:20:50.392Z" },
{ url = "https://files.pythonhosted.org/packages/50/52/adaf4c8c100a8c49d2bd058e5b551f73dfd8cb89eb4911e25a0c469b6b4e/pycryptodome-3.23.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:67bd81fcbe34f43ad9422ee8fd4843c8e7198dd88dd3d40e6de42ee65fbe1490", size = 2182625, upload-time = "2025-05-17T17:20:52.866Z" },
{ url = "https://files.pythonhosted.org/packages/5f/e9/a09476d436d0ff1402ac3867d933c61805ec2326c6ea557aeeac3825604e/pycryptodome-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c8987bd3307a39bc03df5c8e0e3d8be0c4c3518b7f044b0f4c15d1aa78f52575", size = 2268954, upload-time = "2025-05-17T17:20:55.027Z" },
{ url = "https://files.pythonhosted.org/packages/f9/c5/ffe6474e0c551d54cab931918127c46d70cab8f114e0c2b5a3c071c2f484/pycryptodome-3.23.0-cp37-abi3-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:aa0698f65e5b570426fc31b8162ed4603b0c2841cbb9088e2b01641e3065915b", size = 2308534, upload-time = "2025-05-17T17:20:57.279Z" },
{ url = "https://files.pythonhosted.org/packages/18/28/e199677fc15ecf43010f2463fde4c1a53015d1fe95fb03bca2890836603a/pycryptodome-3.23.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:53ecbafc2b55353edcebd64bf5da94a2a2cdf5090a6915bcca6eca6cc452585a", size = 2181853, upload-time = "2025-05-17T17:20:59.322Z" },
{ url = "https://files.pythonhosted.org/packages/ce/ea/4fdb09f2165ce1365c9eaefef36625583371ee514db58dc9b65d3a255c4c/pycryptodome-3.23.0-cp37-abi3-musllinux_1_2_i686.whl", hash = "sha256:156df9667ad9f2ad26255926524e1c136d6664b741547deb0a86a9acf5ea631f", size = 2342465, upload-time = "2025-05-17T17:21:03.83Z" },
{ url = "https://files.pythonhosted.org/packages/22/82/6edc3fc42fe9284aead511394bac167693fb2b0e0395b28b8bedaa07ef04/pycryptodome-3.23.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:dea827b4d55ee390dc89b2afe5927d4308a8b538ae91d9c6f7a5090f397af1aa", size = 2267414, upload-time = "2025-05-17T17:21:06.72Z" },
{ url = "https://files.pythonhosted.org/packages/59/fe/aae679b64363eb78326c7fdc9d06ec3de18bac68be4b612fc1fe8902693c/pycryptodome-3.23.0-cp37-abi3-win32.whl", hash = "sha256:507dbead45474b62b2bbe318eb1c4c8ee641077532067fec9c1aa82c31f84886", size = 1768484, upload-time = "2025-05-17T17:21:08.535Z" },
{ url = "https://files.pythonhosted.org/packages/54/2f/e97a1b8294db0daaa87012c24a7bb714147c7ade7656973fd6c736b484ff/pycryptodome-3.23.0-cp37-abi3-win_amd64.whl", hash = "sha256:c75b52aacc6c0c260f204cbdd834f76edc9fb0d8e0da9fbf8352ef58202564e2", size = 1799636, upload-time = "2025-05-17T17:21:10.393Z" },
{ url = "https://files.pythonhosted.org/packages/18/3d/f9441a0d798bf2b1e645adc3265e55706aead1255ccdad3856dbdcffec14/pycryptodome-3.23.0-cp37-abi3-win_arm64.whl", hash = "sha256:11eeeb6917903876f134b56ba11abe95c0b0fd5e3330def218083c7d98bbcb3c", size = 1703675, upload-time = "2025-05-17T17:21:13.146Z" },
]
[[package]]
name = "wechat-decrypt"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "click" },
{ name = "pycryptodome" },
{ name = "zstandard" },
]
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1,<9" },
{ name = "pycryptodome", specifier = ">=3.19,<4" },
{ name = "zstandard", specifier = ">=0.22,<1" },
]
[[package]]
name = "zstandard"
version = "0.25.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/fd/aa/3e0508d5a5dd96529cdc5a97011299056e14c6505b678fd58938792794b1/zstandard-0.25.0.tar.gz", hash = "sha256:7713e1179d162cf5c7906da876ec2ccb9c3a9dcbdffef0cc7f70c3667a205f0b", size = 711513, upload-time = "2025-09-14T22:15:54.002Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/fc/f26eb6ef91ae723a03e16eddb198abcfce2bc5a42e224d44cc8b6765e57e/zstandard-0.25.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:7b3c3a3ab9daa3eed242d6ecceead93aebbb8f5f84318d82cee643e019c4b73b", size = 795738, upload-time = "2025-09-14T22:16:56.237Z" },
{ url = "https://files.pythonhosted.org/packages/aa/1c/d920d64b22f8dd028a8b90e2d756e431a5d86194caa78e3819c7bf53b4b3/zstandard-0.25.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:913cbd31a400febff93b564a23e17c3ed2d56c064006f54efec210d586171c00", size = 640436, upload-time = "2025-09-14T22:16:57.774Z" },
{ url = "https://files.pythonhosted.org/packages/53/6c/288c3f0bd9fcfe9ca41e2c2fbfd17b2097f6af57b62a81161941f09afa76/zstandard-0.25.0-cp312-cp312-manylinux2010_i686.manylinux2014_i686.manylinux_2_12_i686.manylinux_2_17_i686.whl", hash = "sha256:011d388c76b11a0c165374ce660ce2c8efa8e5d87f34996aa80f9c0816698b64", size = 5343019, upload-time = "2025-09-14T22:16:59.302Z" },
{ url = "https://files.pythonhosted.org/packages/1e/15/efef5a2f204a64bdb5571e6161d49f7ef0fffdbca953a615efbec045f60f/zstandard-0.25.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:6dffecc361d079bb48d7caef5d673c88c8988d3d33fb74ab95b7ee6da42652ea", size = 5063012, upload-time = "2025-09-14T22:17:01.156Z" },
{ url = "https://files.pythonhosted.org/packages/b7/37/a6ce629ffdb43959e92e87ebdaeebb5ac81c944b6a75c9c47e300f85abdf/zstandard-0.25.0-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:7149623bba7fdf7e7f24312953bcf73cae103db8cae49f8154dd1eadc8a29ecb", size = 5394148, upload-time = "2025-09-14T22:17:03.091Z" },
{ url = "https://files.pythonhosted.org/packages/e3/79/2bf870b3abeb5c070fe2d670a5a8d1057a8270f125ef7676d29ea900f496/zstandard-0.25.0-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:6a573a35693e03cf1d67799fd01b50ff578515a8aeadd4595d2a7fa9f3ec002a", size = 5451652, upload-time = "2025-09-14T22:17:04.979Z" },
{ url = "https://files.pythonhosted.org/packages/53/60/7be26e610767316c028a2cbedb9a3beabdbe33e2182c373f71a1c0b88f36/zstandard-0.25.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:5a56ba0db2d244117ed744dfa8f6f5b366e14148e00de44723413b2f3938a902", size = 5546993, upload-time = "2025-09-14T22:17:06.781Z" },
{ url = "https://files.pythonhosted.org/packages/85/c7/3483ad9ff0662623f3648479b0380d2de5510abf00990468c286c6b04017/zstandard-0.25.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:10ef2a79ab8e2974e2075fb984e5b9806c64134810fac21576f0668e7ea19f8f", size = 5046806, upload-time = "2025-09-14T22:17:08.415Z" },
{ url = "https://files.pythonhosted.org/packages/08/b3/206883dd25b8d1591a1caa44b54c2aad84badccf2f1de9e2d60a446f9a25/zstandard-0.25.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:aaf21ba8fb76d102b696781bddaa0954b782536446083ae3fdaa6f16b25a1c4b", size = 5576659, upload-time = "2025-09-14T22:17:10.164Z" },
{ url = "https://files.pythonhosted.org/packages/9d/31/76c0779101453e6c117b0ff22565865c54f48f8bd807df2b00c2c404b8e0/zstandard-0.25.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:1869da9571d5e94a85a5e8d57e4e8807b175c9e4a6294e3b66fa4efb074d90f6", size = 4953933, upload-time = "2025-09-14T22:17:11.857Z" },
{ url = "https://files.pythonhosted.org/packages/18/e1/97680c664a1bf9a247a280a053d98e251424af51f1b196c6d52f117c9720/zstandard-0.25.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:809c5bcb2c67cd0ed81e9229d227d4ca28f82d0f778fc5fea624a9def3963f91", size = 5268008, upload-time = "2025-09-14T22:17:13.627Z" },
{ url = "https://files.pythonhosted.org/packages/1e/73/316e4010de585ac798e154e88fd81bb16afc5c5cb1a72eeb16dd37e8024a/zstandard-0.25.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:f27662e4f7dbf9f9c12391cb37b4c4c3cb90ffbd3b1fb9284dadbbb8935fa708", size = 5433517, upload-time = "2025-09-14T22:17:16.103Z" },
{ url = "https://files.pythonhosted.org/packages/5b/60/dd0f8cfa8129c5a0ce3ea6b7f70be5b33d2618013a161e1ff26c2b39787c/zstandard-0.25.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:99c0c846e6e61718715a3c9437ccc625de26593fea60189567f0118dc9db7512", size = 5814292, upload-time = "2025-09-14T22:17:17.827Z" },
{ url = "https://files.pythonhosted.org/packages/fc/5f/75aafd4b9d11b5407b641b8e41a57864097663699f23e9ad4dbb91dc6bfe/zstandard-0.25.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:474d2596a2dbc241a556e965fb76002c1ce655445e4e3bf38e5477d413165ffa", size = 5360237, upload-time = "2025-09-14T22:17:19.954Z" },
{ url = "https://files.pythonhosted.org/packages/ff/8d/0309daffea4fcac7981021dbf21cdb2e3427a9e76bafbcdbdf5392ff99a4/zstandard-0.25.0-cp312-cp312-win32.whl", hash = "sha256:23ebc8f17a03133b4426bcc04aabd68f8236eb78c3760f12783385171b0fd8bd", size = 436922, upload-time = "2025-09-14T22:17:24.398Z" },
{ url = "https://files.pythonhosted.org/packages/79/3b/fa54d9015f945330510cb5d0b0501e8253c127cca7ebe8ba46a965df18c5/zstandard-0.25.0-cp312-cp312-win_amd64.whl", hash = "sha256:ffef5a74088f1e09947aecf91011136665152e0b4b359c42be3373897fb39b01", size = 506276, upload-time = "2025-09-14T22:17:21.429Z" },
{ url = "https://files.pythonhosted.org/packages/ea/6b/8b51697e5319b1f9ac71087b0af9a40d8a6288ff8025c36486e0c12abcc4/zstandard-0.25.0-cp312-cp312-win_arm64.whl", hash = "sha256:181eb40e0b6a29b3cd2849f825e0fa34397f649170673d385f3598ae17cca2e9", size = 462679, upload-time = "2025-09-14T22:17:23.147Z" },
{ url = "https://files.pythonhosted.org/packages/35/0b/8df9c4ad06af91d39e94fa96cc010a24ac4ef1378d3efab9223cc8593d40/zstandard-0.25.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:ec996f12524f88e151c339688c3897194821d7f03081ab35d31d1e12ec975e94", size = 795735, upload-time = "2025-09-14T22:17:26.042Z" },
{ url = "https://files.pythonhosted.org/packages/3f/06/9ae96a3e5dcfd119377ba33d4c42a7d89da1efabd5cb3e366b156c45ff4d/zstandard-0.25.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:a1a4ae2dec3993a32247995bdfe367fc3266da832d82f8438c8570f989753de1", size = 640440, upload-time = "2025-09-14T22:17:27.366Z" },
{ url = "https://files.pythonhosted.org/packages/d9/14/933d27204c2bd404229c69f445862454dcc101cd69ef8c6068f15aaec12c/zstandard-0.25.0-cp313-cp313-manylinux2010_i686.manylinux2014_i686.manylinux_2_12_i686.manylinux_2_17_i686.whl", hash = "sha256:e96594a5537722fdfb79951672a2a63aec5ebfb823e7560586f7484819f2a08f", size = 5343070, upload-time = "2025-09-14T22:17:28.896Z" },
{ url = "https://files.pythonhosted.org/packages/6d/db/ddb11011826ed7db9d0e485d13df79b58586bfdec56e5c84a928a9a78c1c/zstandard-0.25.0-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:bfc4e20784722098822e3eee42b8e576b379ed72cca4a7cb856ae733e62192ea", size = 5063001, upload-time = "2025-09-14T22:17:31.044Z" },
{ url = "https://files.pythonhosted.org/packages/db/00/87466ea3f99599d02a5238498b87bf84a6348290c19571051839ca943777/zstandard-0.25.0-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:457ed498fc58cdc12fc48f7950e02740d4f7ae9493dd4ab2168a47c93c31298e", size = 5394120, upload-time = "2025-09-14T22:17:32.711Z" },
{ url = "https://files.pythonhosted.org/packages/2b/95/fc5531d9c618a679a20ff6c29e2b3ef1d1f4ad66c5e161ae6ff847d102a9/zstandard-0.25.0-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:fd7a5004eb1980d3cefe26b2685bcb0b17989901a70a1040d1ac86f1d898c551", size = 5451230, upload-time = "2025-09-14T22:17:34.41Z" },
{ url = "https://files.pythonhosted.org/packages/63/4b/e3678b4e776db00f9f7b2fe58e547e8928ef32727d7a1ff01dea010f3f13/zstandard-0.25.0-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:8e735494da3db08694d26480f1493ad2cf86e99bdd53e8e9771b2752a5c0246a", size = 5547173, upload-time = "2025-09-14T22:17:36.084Z" },
{ url = "https://files.pythonhosted.org/packages/4e/d5/ba05ed95c6b8ec30bd468dfeab20589f2cf709b5c940483e31d991f2ca58/zstandard-0.25.0-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:3a39c94ad7866160a4a46d772e43311a743c316942037671beb264e395bdd611", size = 5046736, upload-time = "2025-09-14T22:17:37.891Z" },
{ url = "https://files.pythonhosted.org/packages/50/d5/870aa06b3a76c73eced65c044b92286a3c4e00554005ff51962deef28e28/zstandard-0.25.0-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:172de1f06947577d3a3005416977cce6168f2261284c02080e7ad0185faeced3", size = 5576368, upload-time = "2025-09-14T22:17:40.206Z" },
{ url = "https://files.pythonhosted.org/packages/5d/35/398dc2ffc89d304d59bc12f0fdd931b4ce455bddf7038a0a67733a25f550/zstandard-0.25.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:3c83b0188c852a47cd13ef3bf9209fb0a77fa5374958b8c53aaa699398c6bd7b", size = 4954022, upload-time = "2025-09-14T22:17:41.879Z" },
{ url = "https://files.pythonhosted.org/packages/9a/5c/36ba1e5507d56d2213202ec2b05e8541734af5f2ce378c5d1ceaf4d88dc4/zstandard-0.25.0-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:1673b7199bbe763365b81a4f3252b8e80f44c9e323fc42940dc8843bfeaf9851", size = 5267889, upload-time = "2025-09-14T22:17:43.577Z" },
{ url = "https://files.pythonhosted.org/packages/70/e8/2ec6b6fb7358b2ec0113ae202647ca7c0e9d15b61c005ae5225ad0995df5/zstandard-0.25.0-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:0be7622c37c183406f3dbf0cba104118eb16a4ea7359eeb5752f0794882fc250", size = 5433952, upload-time = "2025-09-14T22:17:45.271Z" },
{ url = "https://files.pythonhosted.org/packages/7b/01/b5f4d4dbc59ef193e870495c6f1275f5b2928e01ff5a81fecb22a06e22fb/zstandard-0.25.0-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:5f5e4c2a23ca271c218ac025bd7d635597048b366d6f31f420aaeb715239fc98", size = 5814054, upload-time = "2025-09-14T22:17:47.08Z" },
{ url = "https://files.pythonhosted.org/packages/b2/e5/fbd822d5c6f427cf158316d012c5a12f233473c2f9c5fe5ab1ae5d21f3d8/zstandard-0.25.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:4f187a0bb61b35119d1926aee039524d1f93aaf38a9916b8c4b78ac8514a0aaf", size = 5360113, upload-time = "2025-09-14T22:17:48.893Z" },
{ url = "https://files.pythonhosted.org/packages/8e/e0/69a553d2047f9a2c7347caa225bb3a63b6d7704ad74610cb7823baa08ed7/zstandard-0.25.0-cp313-cp313-win32.whl", hash = "sha256:7030defa83eef3e51ff26f0b7bfb229f0204b66fe18e04359ce3474ac33cbc09", size = 436936, upload-time = "2025-09-14T22:17:52.658Z" },
{ url = "https://files.pythonhosted.org/packages/d9/82/b9c06c870f3bd8767c201f1edbdf9e8dc34be5b0fbc5682c4f80fe948475/zstandard-0.25.0-cp313-cp313-win_amd64.whl", hash = "sha256:1f830a0dac88719af0ae43b8b2d6aef487d437036468ef3c2ea59c51f9d55fd5", size = 506232, upload-time = "2025-09-14T22:17:50.402Z" },
{ url = "https://files.pythonhosted.org/packages/d4/57/60c3c01243bb81d381c9916e2a6d9e149ab8627c0c7d7abb2d73384b3c0c/zstandard-0.25.0-cp313-cp313-win_arm64.whl", hash = "sha256:85304a43f4d513f5464ceb938aa02c1e78c2943b29f44a750b48b25ac999a049", size = 462671, upload-time = "2025-09-14T22:17:51.533Z" },
{ url = "https://files.pythonhosted.org/packages/3d/5c/f8923b595b55fe49e30612987ad8bf053aef555c14f05bb659dd5dbe3e8a/zstandard-0.25.0-cp314-cp314-macosx_10_13_x86_64.whl", hash = "sha256:e29f0cf06974c899b2c188ef7f783607dbef36da4c242eb6c82dcd8b512855e3", size = 795887, upload-time = "2025-09-14T22:17:54.198Z" },
{ url = "https://files.pythonhosted.org/packages/8d/09/d0a2a14fc3439c5f874042dca72a79c70a532090b7ba0003be73fee37ae2/zstandard-0.25.0-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:05df5136bc5a011f33cd25bc9f506e7426c0c9b3f9954f056831ce68f3b6689f", size = 640658, upload-time = "2025-09-14T22:17:55.423Z" },
{ url = "https://files.pythonhosted.org/packages/5d/7c/8b6b71b1ddd517f68ffb55e10834388d4f793c49c6b83effaaa05785b0b4/zstandard-0.25.0-cp314-cp314-manylinux2010_i686.manylinux_2_12_i686.manylinux_2_28_i686.whl", hash = "sha256:f604efd28f239cc21b3adb53eb061e2a205dc164be408e553b41ba2ffe0ca15c", size = 5379849, upload-time = "2025-09-14T22:17:57.372Z" },
{ url = "https://files.pythonhosted.org/packages/a4/86/a48e56320d0a17189ab7a42645387334fba2200e904ee47fc5a26c1fd8ca/zstandard-0.25.0-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:223415140608d0f0da010499eaa8ccdb9af210a543fac54bce15babbcfc78439", size = 5058095, upload-time = "2025-09-14T22:17:59.498Z" },
{ url = "https://files.pythonhosted.org/packages/f8/ad/eb659984ee2c0a779f9d06dbfe45e2dc39d99ff40a319895df2d3d9a48e5/zstandard-0.25.0-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:2e54296a283f3ab5a26fc9b8b5d4978ea0532f37b231644f367aa588930aa043", size = 5551751, upload-time = "2025-09-14T22:18:01.618Z" },
{ url = "https://files.pythonhosted.org/packages/61/b3/b637faea43677eb7bd42ab204dfb7053bd5c4582bfe6b1baefa80ac0c47b/zstandard-0.25.0-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:ca54090275939dc8ec5dea2d2afb400e0f83444b2fc24e07df7fdef677110859", size = 6364818, upload-time = "2025-09-14T22:18:03.769Z" },
{ url = "https://files.pythonhosted.org/packages/31/dc/cc50210e11e465c975462439a492516a73300ab8caa8f5e0902544fd748b/zstandard-0.25.0-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e09bb6252b6476d8d56100e8147b803befa9a12cea144bbe629dd508800d1ad0", size = 5560402, upload-time = "2025-09-14T22:18:05.954Z" },
{ url = "https://files.pythonhosted.org/packages/c9/ae/56523ae9c142f0c08efd5e868a6da613ae76614eca1305259c3bf6a0ed43/zstandard-0.25.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:a9ec8c642d1ec73287ae3e726792dd86c96f5681eb8df274a757bf62b750eae7", size = 4955108, upload-time = "2025-09-14T22:18:07.68Z" },
{ url = "https://files.pythonhosted.org/packages/98/cf/c899f2d6df0840d5e384cf4c4121458c72802e8bda19691f3b16619f51e9/zstandard-0.25.0-cp314-cp314-musllinux_1_2_i686.whl", hash = "sha256:a4089a10e598eae6393756b036e0f419e8c1d60f44a831520f9af41c14216cf2", size = 5269248, upload-time = "2025-09-14T22:18:09.753Z" },
{ url = "https://files.pythonhosted.org/packages/1b/c0/59e912a531d91e1c192d3085fc0f6fb2852753c301a812d856d857ea03c6/zstandard-0.25.0-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:f67e8f1a324a900e75b5e28ffb152bcac9fbed1cc7b43f99cd90f395c4375344", size = 5430330, upload-time = "2025-09-14T22:18:11.966Z" },
{ url = "https://files.pythonhosted.org/packages/a0/1d/7e31db1240de2df22a58e2ea9a93fc6e38cc29353e660c0272b6735d6669/zstandard-0.25.0-cp314-cp314-musllinux_1_2_s390x.whl", hash = "sha256:9654dbc012d8b06fc3d19cc825af3f7bf8ae242226df5f83936cb39f5fdc846c", size = 5811123, upload-time = "2025-09-14T22:18:13.907Z" },
{ url = "https://files.pythonhosted.org/packages/f6/49/fac46df5ad353d50535e118d6983069df68ca5908d4d65b8c466150a4ff1/zstandard-0.25.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:4203ce3b31aec23012d3a4cf4a2ed64d12fea5269c49aed5e4c3611b938e4088", size = 5359591, upload-time = "2025-09-14T22:18:16.465Z" },
{ url = "https://files.pythonhosted.org/packages/c2/38/f249a2050ad1eea0bb364046153942e34abba95dd5520af199aed86fbb49/zstandard-0.25.0-cp314-cp314-win32.whl", hash = "sha256:da469dc041701583e34de852d8634703550348d5822e66a0c827d39b05365b12", size = 444513, upload-time = "2025-09-14T22:18:20.61Z" },
{ url = "https://files.pythonhosted.org/packages/3a/43/241f9615bcf8ba8903b3f0432da069e857fc4fd1783bd26183db53c4804b/zstandard-0.25.0-cp314-cp314-win_amd64.whl", hash = "sha256:c19bcdd826e95671065f8692b5a4aa95c52dc7a02a4c5a0cac46deb879a017a2", size = 516118, upload-time = "2025-09-14T22:18:17.849Z" },
{ url = "https://files.pythonhosted.org/packages/f0/ef/da163ce2450ed4febf6467d77ccb4cd52c4c30ab45624bad26ca0a27260c/zstandard-0.25.0-cp314-cp314-win_arm64.whl", hash = "sha256:d7541afd73985c630bafcd6338d2518ae96060075f9463d7dc14cfb33514383d", size = 476940, upload-time = "2025-09-14T22:18:19.088Z" },
]

554
wx.py
View File

@ -1,554 +0,0 @@
"""
wx - 微信本地数据 CLI
自动管理 daemon 生命周期无需用户手动启动
用法:
wx sessions 最近会话
wx history "张三" 聊天记录
wx search "关键词" 搜索消息
wx contacts 联系人列表
wx watch 实时监听新消息
wx daemon status/stop/logs daemon 管理
"""
import glob
import json
import os
import platform
import socket
import subprocess
import sys
import time
import click
CLI_DIR = os.path.join(os.path.expanduser("~"), ".wechat-cli")
SOCK_PATH = os.path.join(CLI_DIR, "daemon.sock")
PID_PATH = os.path.join(CLI_DIR, "daemon.pid")
LOG_PATH = os.path.join(CLI_DIR, "daemon.log")
DAEMON_SCRIPT = os.path.join(os.path.dirname(os.path.abspath(__file__)), "wx_daemon.py")
STARTUP_TIMEOUT = 15 # 等待 daemon 启动的最长秒数
# ─── daemon 管理 ─────────────────────────────────────────────────────────────
def _is_alive() -> bool:
if not os.path.exists(SOCK_PATH):
return False
try:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(2)
s.connect(SOCK_PATH)
s.sendall(b'{"cmd":"ping"}\n')
resp = json.loads(s.makefile().readline())
s.close()
return resp.get("pong") is True
except Exception:
return False
def _start_daemon() -> None:
subprocess.Popen(
[sys.executable, DAEMON_SCRIPT],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
deadline = time.time() + STARTUP_TIMEOUT
while time.time() < deadline:
time.sleep(0.3)
if _is_alive():
return
raise click.ClickException(
f"wx-daemon 启动超时(>{STARTUP_TIMEOUT}s\n"
f"请查看日志: {LOG_PATH}"
)
def _ensure_daemon() -> None:
if not _is_alive():
click.echo("⏳ 启动 wx-daemon...", err=True)
_start_daemon()
# ─── 通信 ────────────────────────────────────────────────────────────────────
def _send(req: dict, timeout: int = 30) -> dict:
_ensure_daemon()
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.settimeout(timeout)
s.connect(SOCK_PATH)
s.sendall((json.dumps(req, ensure_ascii=False) + '\n').encode())
resp = json.loads(s.makefile().readline())
s.close()
if not resp.get("ok"):
raise click.ClickException(resp.get("error", "未知错误"))
return resp
# ─── 时间解析 ────────────────────────────────────────────────────────────────
def _parse_time(value: str, is_end: bool = False) -> int:
from datetime import datetime
for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M', '%Y-%m-%d'):
try:
dt = datetime.strptime(value, fmt)
if fmt == '%Y-%m-%d' and is_end:
dt = dt.replace(hour=23, minute=59, second=59)
return int(dt.timestamp())
except ValueError:
continue
raise click.BadParameter(
f"无法解析时间 '{value}',支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS"
)
# ─── CLI ─────────────────────────────────────────────────────────────────────
@click.group(context_settings={"help_option_names": ["-h", "--help"]})
@click.version_option("0.1.0", prog_name="wx")
def cli():
"""wx — 微信本地数据 CLI"""
# ─── init ────────────────────────────────────────────────────────────────────
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
CONFIG_FILE = os.path.join(SCRIPT_DIR, "config.json")
def _detect_db_dir() -> str | None:
"""自动检测微信数据库目录(支持 macOS/Linux"""
if platform.system() == "Darwin":
pattern = os.path.expanduser(
"~/Library/Containers/com.tencent.xinWeChat/Data/Documents"
"/xwechat_files/*/db_storage"
)
candidates = sorted(
(p for p in glob.glob(pattern) if os.path.isdir(p)),
key=os.path.getmtime,
reverse=True,
)
return candidates[0] if candidates else None
if platform.system() == "Linux":
patterns = [
os.path.expanduser("~/Documents/xwechat_files/*/db_storage"),
os.path.expanduser("~/.local/share/weixin/data/db_storage"),
]
candidates = []
for pat in patterns:
candidates.extend(p for p in glob.glob(pat) if os.path.isdir(p))
candidates.sort(key=os.path.getmtime, reverse=True)
return candidates[0] if candidates else None
return None
def _ensure_scanner() -> str:
"""确保 macOS C 扫描器已编译,返回二进制路径。"""
binary = os.path.join(SCRIPT_DIR, "find_all_keys_macos")
if os.path.exists(binary):
return binary
src = os.path.join(SCRIPT_DIR, "find_all_keys_macos.c")
if not os.path.exists(src):
raise click.ClickException(f"找不到扫描器源文件: {src}")
click.echo("编译密钥扫描器...", err=True)
# Try with Xcode SDK first, then fallback to plain clang
sdk_path = (
"/Applications/Xcode.app/Contents/Developer/Platforms"
"/MacOSX.platform/Developer/SDKs/MacOSX.sdk"
)
cmds = []
if os.path.isdir(sdk_path):
cmds.append(["clang", "-O2", "-isysroot", sdk_path, "-o", binary, src])
cmds.append(["clang", "-O2", "-o", binary, src])
for cmd in cmds:
ret = subprocess.run(cmd, capture_output=True, text=True)
if ret.returncode == 0:
click.echo("编译完成", err=True)
return binary
raise click.ClickException(f"编译失败: {ret.stderr.strip()}")
@cli.command()
@click.option('--force', is_flag=True, help='强制重新扫描(覆盖已有配置)')
def init(force):
"""初始化:检测数据目录并扫描加密密钥
\b
首次使用前运行WeChat 需正在运行
wx init
重新扫描密钥例如微信更新后
wx init --force
"""
# Check if already initialized
if not force and os.path.exists(CONFIG_FILE):
try:
cfg = json.load(open(CONFIG_FILE, encoding='utf-8'))
db_dir = cfg.get("db_dir", "")
keys_file = cfg.get("keys_file", "all_keys.json")
if not os.path.isabs(keys_file):
keys_file = os.path.join(SCRIPT_DIR, keys_file)
if (db_dir and "your_wxid" not in db_dir
and os.path.isdir(db_dir)
and os.path.exists(keys_file)):
click.echo(f"已初始化,数据目录: {db_dir}")
click.echo("如需重新扫描密钥,使用 --force")
return
except Exception:
pass
# Step 1: Detect db_dir
click.echo("检测微信数据目录...")
db_dir = _detect_db_dir()
if not db_dir:
raise click.ClickException(
"未能自动检测到微信数据目录\n"
"请手动编辑 config.json 中的 db_dir 字段\n"
"路径格式macOS: ~/Library/Containers/com.tencent.xinWeChat/..."
"/xwechat_files/<wxid>/db_storage"
)
click.echo(f"找到数据目录: {db_dir}")
# Step 2: Compile scanner (macOS only)
if platform.system() == "Darwin":
scanner = _ensure_scanner()
# Step 3: Run key extraction
keys_file = os.path.join(SCRIPT_DIR, "all_keys.json")
click.echo("扫描加密密钥(需要 sudo 权限)...")
ret = subprocess.run(
["sudo", scanner],
capture_output=False, # let stdout/stderr pass through
cwd=SCRIPT_DIR,
)
if ret.returncode != 0:
raise click.ClickException("密钥扫描失败,请确认微信正在运行")
if not os.path.exists(keys_file):
raise click.ClickException(f"扫描完成但未找到输出文件: {keys_file}")
with open(keys_file, encoding='utf-8') as f:
keys = json.load(f)
real_keys = {k: v for k, v in keys.items() if not k.startswith('_')}
click.echo(f"成功提取 {len(real_keys)} 个数据库密钥")
else:
click.echo("非 macOS 系统,请手动运行密钥提取脚本")
# Step 4: Update config.json
cfg = {}
if os.path.exists(CONFIG_FILE):
try:
cfg = json.load(open(CONFIG_FILE, encoding='utf-8'))
except Exception:
pass
cfg["db_dir"] = db_dir
if "keys_file" not in cfg:
cfg["keys_file"] = "all_keys.json"
if "decrypted_dir" not in cfg:
cfg["decrypted_dir"] = "decrypted"
with open(CONFIG_FILE, "w", encoding='utf-8') as f:
json.dump(cfg, f, indent=4, ensure_ascii=False)
click.echo(f"配置已保存: {CONFIG_FILE}")
click.echo("初始化完成,可以使用 wx sessions / wx history 等命令了")
# ─── sessions ────────────────────────────────────────────────────────────────
@cli.command()
@click.option('-n', '--limit', default=20, show_default=True, help='会话数量')
@click.option('--json', 'as_json', is_flag=True, help='输出原始 JSON')
def sessions(limit, as_json):
"""列出最近会话"""
resp = _send({"cmd": "sessions", "limit": limit})
data = resp.get("sessions", [])
if as_json:
click.echo(json.dumps(data, ensure_ascii=False, indent=2))
return
for s in data:
unread = f" \033[31m({s['unread']}未读)\033[0m" if s.get('unread', 0) > 0 else ''
group = ' [群]' if s['is_group'] else ''
sender = f"{s['last_sender']}: " if s.get('last_sender') else ''
click.echo(f"\033[90m[{s['time']}]\033[0m \033[1m{s['chat']}\033[0m{group}{unread}")
click.echo(f" {s['last_msg_type']}: {sender}{s['summary']}")
click.echo()
# ─── history ─────────────────────────────────────────────────────────────────
@cli.command()
@click.argument('chat')
@click.option('-n', '--limit', default=50, show_default=True, help='消息数量')
@click.option('--offset', default=0, help='分页偏移')
@click.option('--since', default=None, metavar='DATE', help='起始时间 YYYY-MM-DD')
@click.option('--until', default=None, metavar='DATE', help='结束时间 YYYY-MM-DD')
@click.option('--json', 'as_json', is_flag=True, help='输出原始 JSON')
def history(chat, limit, offset, since, until, as_json):
"""查看聊天记录
\b
示例:
wx history "张三"
wx history "AI群" --since 2026-04-01 --until 2026-04-15
wx history "张三" -n 100 --offset 50
"""
req = {"cmd": "history", "chat": chat, "limit": limit, "offset": offset}
if since:
req["since"] = _parse_time(since)
if until:
req["until"] = _parse_time(until, is_end=True)
resp = _send(req)
if as_json:
click.echo(json.dumps(resp.get("messages", []), ensure_ascii=False, indent=2))
return
group = ' [群]' if resp.get('is_group') else ''
click.echo(f"=== {resp['chat']}{group} ({resp['count']} 条) ===\n")
for m in resp.get("messages", []):
sender = f"\033[33m{m['sender']}\033[0m: " if m.get('sender') else ''
click.echo(f"\033[90m[{m['time']}]\033[0m {sender}{m['content']}")
# ─── search ──────────────────────────────────────────────────────────────────
@cli.command()
@click.argument('keyword')
@click.option('--in', 'chats', multiple=True, metavar='CHAT', help='限定聊天(可多次指定)')
@click.option('-n', '--limit', default=20, show_default=True)
@click.option('--since', default=None, metavar='DATE')
@click.option('--until', default=None, metavar='DATE')
@click.option('--json', 'as_json', is_flag=True)
def search(keyword, chats, limit, since, until, as_json):
"""搜索消息
\b
示例:
wx search "Claude"
wx search "deadline" --in "TeamA" --in "TeamB"
wx search "会议" --since 2026-04-01
"""
req = {"cmd": "search", "keyword": keyword, "limit": limit}
if chats:
req["chats"] = list(chats)
if since:
req["since"] = _parse_time(since)
if until:
req["until"] = _parse_time(until, is_end=True)
resp = _send(req)
results = resp.get("results", [])
if as_json:
click.echo(json.dumps(results, ensure_ascii=False, indent=2))
return
click.echo(f'搜索 "{keyword}",找到 {resp["count"]} 条:\n')
for r in results:
sender = f"\033[33m{r['sender']}\033[0m: " if r.get('sender') else ''
chat = f"\033[36m[{r['chat']}]\033[0m " if r.get('chat') else ''
click.echo(f"\033[90m[{r['time']}]\033[0m {chat}{sender}{r['content']}")
# ─── contacts ────────────────────────────────────────────────────────────────
@cli.command()
@click.option('-q', '--query', default=None, help='按名字过滤')
@click.option('-n', '--limit', default=50, show_default=True)
@click.option('--json', 'as_json', is_flag=True)
def contacts(query, limit, as_json):
"""查看联系人
\b
示例:
wx contacts
wx contacts -q ""
"""
resp = _send({"cmd": "contacts", "query": query, "limit": limit})
data = resp.get("contacts", [])
if as_json:
click.echo(json.dumps(data, ensure_ascii=False, indent=2))
return
click.echo(f"{resp.get('total', len(data))} 个联系人(显示 {len(data)} 个):\n")
for c in data:
click.echo(f" {c['display']:<20} {c['username']}")
# ─── export ──────────────────────────────────────────────────────────────────
@cli.command()
@click.argument('chat')
@click.option('--since', default=None, metavar='DATE', help='起始时间 YYYY-MM-DD')
@click.option('--until', default=None, metavar='DATE', help='结束时间 YYYY-MM-DD')
@click.option('-n', '--limit', default=500, show_default=True, help='最多导出条数')
@click.option('-f', '--format', 'fmt', type=click.Choice(['markdown', 'txt', 'json']),
default='markdown', show_default=True, help='输出格式')
@click.option('-o', '--output', default=None, metavar='FILE', help='输出文件(默认 stdout')
def export(chat, since, until, limit, fmt, output):
"""导出聊天记录到文件
\b
示例:
wx export "张三"
wx export "AI群" --since 2026-01-01 --format markdown -o chat.md
wx export "张三" --format json -o chat.json
"""
req = {"cmd": "history", "chat": chat, "limit": limit, "offset": 0}
if since:
req["since"] = _parse_time(since)
if until:
req["until"] = _parse_time(until, is_end=True)
resp = _send(req, timeout=60)
messages = resp.get("messages", [])
chat_name = resp.get("chat", chat)
is_group = resp.get("is_group", False)
count = len(messages)
if fmt == 'json':
text = json.dumps(resp, ensure_ascii=False, indent=2)
elif fmt == 'txt':
lines = [f"=== {chat_name}{'[群]' if is_group else ''} ({count} 条) ===\n"]
for m in messages:
sender = f"{m['sender']}: " if m.get('sender') else ''
lines.append(f"[{m['time']}] {sender}{m['content']}")
text = '\n'.join(lines)
else: # markdown
lines = [
f"# {chat_name}{'(群聊)' if is_group else ''}",
f"\n> 导出 {count} 条消息\n",
]
for m in messages:
sender_md = f"**{m['sender']}**: " if m.get('sender') else ''
content = m['content'].replace('\n', '\n> ')
lines.append(f"### {m['time']}\n\n{sender_md}{content}\n")
text = '\n'.join(lines)
if output:
with open(output, 'w', encoding='utf-8') as f:
f.write(text)
click.echo(f"已导出 {count} 条消息到 {output}")
else:
click.echo(text)
# ─── watch ───────────────────────────────────────────────────────────────────
@cli.command()
@click.option('--chat', default=None, help='只显示指定聊天的消息')
@click.option('--json', 'as_json', is_flag=True, help='输出 JSON lines方便 jq 处理)')
def watch(chat, as_json):
"""实时监听新消息Ctrl+C 退出)
\b
示例:
wx watch
wx watch --chat "AI交流群"
wx watch --json | jq .content
"""
_ensure_daemon()
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(SOCK_PATH)
s.sendall((json.dumps({"cmd": "watch"}) + '\n').encode())
if not as_json:
click.echo("监听中Ctrl+C 退出)...\n", err=True)
try:
for line in s.makefile():
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except Exception:
continue
evt = event.get("event", "")
if evt in ("connected", "heartbeat"):
continue
# 过滤指定聊天
if chat and event.get("chat") != chat and event.get("username") != chat:
continue
if as_json:
click.echo(line)
continue
time_s = event.get('time', '')
chat_s = event.get('chat', '')
is_group = event.get('is_group', False)
sender = event.get('sender', '')
content = event.get('content', '')
chat_part = f"\033[36m[{chat_s}]\033[0m " if is_group else f"\033[1m{chat_s}\033[0m "
sender_part = f"\033[33m{sender}\033[0m: " if sender else ''
click.echo(f"\033[90m[{time_s}]\033[0m {chat_part}{sender_part}{content}")
except KeyboardInterrupt:
pass
finally:
try:
s.close()
except Exception:
pass
# ─── daemon 子命令组 ──────────────────────────────────────────────────────────
@cli.group()
def daemon():
"""管理 wx-daemon"""
@daemon.command()
def status():
"""查看 daemon 运行状态"""
if _is_alive():
pid = open(PID_PATH).read().strip() if os.path.exists(PID_PATH) else '?'
click.echo(f"✓ wx-daemon 运行中 (PID {pid})")
else:
click.echo("✗ wx-daemon 未运行")
@daemon.command()
def stop():
"""停止 daemon"""
if not os.path.exists(PID_PATH):
click.echo("daemon 未运行")
return
try:
pid = int(open(PID_PATH).read().strip())
import signal
os.kill(pid, signal.SIGTERM)
click.echo(f"✓ 已停止 wx-daemon (PID {pid})")
except (ValueError, ProcessLookupError):
click.echo("daemon 进程不存在,清理残留文件")
for p in (SOCK_PATH, PID_PATH):
try:
os.unlink(p)
except OSError:
pass
@daemon.command()
@click.option('-f', '--follow', is_flag=True, help='持续输出tail -f')
@click.option('-n', '--lines', default=50, show_default=True, help='显示最近 N 行')
def logs(follow, lines):
"""查看 daemon 日志"""
if not os.path.exists(LOG_PATH):
click.echo("暂无日志")
return
if follow:
import subprocess as sp
sp.run(['tail', f'-{lines}', '-f', LOG_PATH])
else:
with open(LOG_PATH) as f:
all_lines = f.readlines()
click.echo(''.join(all_lines[-lines:]), nl=False)
# ─── 入口 ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
cli()

View File

@ -1,832 +0,0 @@
"""
wx-daemon: 微信数据访问守护进程
启动后常驻后台通过 Unix socket 响应 CLI 查询持续监听 WAL 变化推送实时消息
Socket : ~/.wechat-cli/daemon.sock
PID : ~/.wechat-cli/daemon.pid
Log : ~/.wechat-cli/daemon.log
Cache : ~/.wechat-cli/cache/
"""
import hashlib
import hmac as hmac_mod
import json
import os
import queue
import re
import signal
import socket
import sqlite3
import struct
import subprocess
import sys
import threading
import time
from contextlib import closing
from datetime import datetime
from Crypto.Cipher import AES
import zstandard as zstd
# ─── 路径常量 ─────────────────────────────────────────────────────────────────
CLI_DIR = os.path.join(os.path.expanduser("~"), ".wechat-cli")
SOCK_PATH = os.path.join(CLI_DIR, "daemon.sock")
PID_PATH = os.path.join(CLI_DIR, "daemon.pid")
LOG_PATH = os.path.join(CLI_DIR, "daemon.log")
CACHE_DIR = os.path.join(CLI_DIR, "cache")
MTIME_FILE = os.path.join(CACHE_DIR, "_mtimes.json")
os.makedirs(CLI_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)
# ─── 加密常量 ─────────────────────────────────────────────────────────────────
PAGE_SZ = 4096
SALT_SZ = 16
RESERVE_SZ = 80
SQLITE_HDR = b'SQLite format 3\x00'
WAL_HDR_SZ = 32
WAL_FRAME_HDR = 24
# ─── 配置加载 ─────────────────────────────────────────────────────────────────
_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, _SCRIPT_DIR)
from config import load_config
from key_utils import get_key_info, strip_key_metadata
_cfg = load_config()
DB_DIR = _cfg["db_dir"]
KEYS_FILE = _cfg["keys_file"]
with open(KEYS_FILE, encoding="utf-8") as _f:
ALL_KEYS = strip_key_metadata(json.load(_f))
_zstd = zstd.ZstdDecompressor()
# ─── 日志 ─────────────────────────────────────────────────────────────────────
def _log(msg: str) -> None:
ts = datetime.now().strftime('%H:%M:%S')
print(f"[{ts}] {msg}", flush=True)
# ─── 解密 ─────────────────────────────────────────────────────────────────────
def _decrypt_page(enc_key: bytes, page_data: bytes, pgno: int) -> bytes:
iv = page_data[PAGE_SZ - RESERVE_SZ: PAGE_SZ - RESERVE_SZ + 16]
if pgno == 1:
enc = page_data[SALT_SZ: PAGE_SZ - RESERVE_SZ]
dec = AES.new(enc_key, AES.MODE_CBC, iv).decrypt(enc)
return bytes(SQLITE_HDR + dec + b'\x00' * RESERVE_SZ)
enc = page_data[:PAGE_SZ - RESERVE_SZ]
dec = AES.new(enc_key, AES.MODE_CBC, iv).decrypt(enc)
return dec + b'\x00' * RESERVE_SZ
def _full_decrypt(db_path: str, out_path: str, enc_key: bytes) -> None:
size = os.path.getsize(db_path)
os.makedirs(os.path.dirname(out_path), exist_ok=True)
with open(db_path, 'rb') as fin, open(out_path, 'wb') as fout:
for pgno in range(1, size // PAGE_SZ + 1):
page = fin.read(PAGE_SZ)
if not page:
break
if len(page) < PAGE_SZ:
page = page + b'\x00' * (PAGE_SZ - len(page))
fout.write(_decrypt_page(enc_key, page, pgno))
def _apply_wal(wal_path: str, out_path: str, enc_key: bytes) -> None:
if not os.path.exists(wal_path):
return
wal_size = os.path.getsize(wal_path)
if wal_size <= WAL_HDR_SZ:
return
frame_size = WAL_FRAME_HDR + PAGE_SZ
with open(wal_path, 'rb') as wf, open(out_path, 'r+b') as df:
hdr = wf.read(WAL_HDR_SZ)
s1 = struct.unpack('>I', hdr[16:20])[0]
s2 = struct.unpack('>I', hdr[20:24])[0]
while wf.tell() + frame_size <= wal_size:
fh = wf.read(WAL_FRAME_HDR)
if len(fh) < WAL_FRAME_HDR:
break
pgno = struct.unpack('>I', fh[0:4])[0]
fs1 = struct.unpack('>I', fh[8:12])[0]
fs2 = struct.unpack('>I', fh[12:16])[0]
ep = wf.read(PAGE_SZ)
if len(ep) < PAGE_SZ:
break
if pgno == 0 or pgno > 1_000_000:
continue
if fs1 != s1 or fs2 != s2:
continue
dec = _decrypt_page(enc_key, ep, pgno)
df.seek((pgno - 1) * PAGE_SZ)
df.write(dec)
# ─── DB 缓存mtime 感知,跨进程重启可复用)────────────────────────────────────
class DBCache:
def __init__(self):
self._cache: dict[str, tuple[float, float, str]] = {} # rel -> (db_mt, wal_mt, path)
self._lock = threading.Lock()
self._load_persistent()
def _cache_path(self, rel_key: str) -> str:
h = hashlib.md5(rel_key.encode()).hexdigest()[:12]
return os.path.join(CACHE_DIR, f"{h}.db")
def _load_persistent(self) -> None:
if not os.path.exists(MTIME_FILE):
return
try:
saved = json.loads(open(MTIME_FILE, encoding='utf-8').read())
except Exception:
return
reused = 0
for rel_key, info in saved.items():
path = info.get("path", "")
if not os.path.exists(path):
continue
db_path = os.path.join(DB_DIR, rel_key.replace('\\', os.sep).replace('/', os.sep))
wal_path = db_path + "-wal"
try:
db_mt = os.path.getmtime(db_path)
wal_mt = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0.0
except OSError:
continue
if db_mt == info.get("db_mt") and wal_mt == info.get("wal_mt"):
self._cache[rel_key] = (db_mt, wal_mt, path)
reused += 1
if reused:
_log(f"DBCache: 复用 {reused} 个已解密 DB")
def _save_persistent(self) -> None:
data = {k: {"db_mt": v[0], "wal_mt": v[1], "path": v[2]} for k, v in self._cache.items()}
try:
with open(MTIME_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f)
except OSError:
pass
def get(self, rel_key: str) -> str | None:
key_info = get_key_info(ALL_KEYS, rel_key)
if not key_info:
return None
db_path = os.path.join(DB_DIR, rel_key.replace('\\', os.sep).replace('/', os.sep))
wal_path = db_path + "-wal"
if not os.path.exists(db_path):
return None
try:
db_mt = os.path.getmtime(db_path)
wal_mt = os.path.getmtime(wal_path) if os.path.exists(wal_path) else 0.0
except OSError:
return None
with self._lock:
cached = self._cache.get(rel_key)
if cached and cached[0] == db_mt and cached[1] == wal_mt and os.path.exists(cached[2]):
return cached[2]
out = self._cache_path(rel_key)
enc_key = bytes.fromhex(key_info["enc_key"])
t0 = time.perf_counter()
_full_decrypt(db_path, out, enc_key)
_apply_wal(wal_path, out, enc_key)
ms = (time.perf_counter() - t0) * 1000
_log(f"解密 {rel_key} ({ms:.0f}ms)")
self._cache[rel_key] = (db_mt, wal_mt, out)
self._save_persistent()
return out
_db = DBCache()
# ─── 消息 DB 列表 ─────────────────────────────────────────────────────────────
MSG_DB_KEYS = sorted([
k for k in ALL_KEYS
if re.search(r'message[/\\]message_\d+\.db$', k)
])
# ─── 联系人缓存 ───────────────────────────────────────────────────────────────
_names: dict[str, str] | None = None
_names_lock = threading.Lock()
_md5_to_uname: dict[str, str] | None = None
_md5_lock = threading.Lock()
def _load_names() -> dict[str, str]:
global _names
with _names_lock:
if _names is not None:
return _names
path = _db.get(os.path.join("contact", "contact.db"))
if not path:
_names = {}
return _names
try:
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute(
"SELECT username, nick_name, remark FROM contact"
).fetchall()
_names = {u: (r if r else (n if n else u)) for u, n, r in rows}
except Exception:
_names = {}
return _names
def _get_md5_lookup() -> dict[str, str]:
"""返回 {md5(username): username},用于全局搜索时从表名反推联系人。"""
global _md5_to_uname
with _md5_lock:
if _md5_to_uname is not None:
return _md5_to_uname
names = _load_names()
_md5_to_uname = {hashlib.md5(u.encode()).hexdigest(): u for u in names}
return _md5_to_uname
def _refresh_names() -> None:
"""强制刷新联系人缓存(新联系人/新群加入时调用)"""
global _names, _md5_to_uname
with _names_lock:
_names = None
with _md5_lock:
_md5_to_uname = None
_load_names()
_get_md5_lookup()
# ─── 辅助 ─────────────────────────────────────────────────────────────────────
_XML_BAD = re.compile(r'<!DOCTYPE|<!ENTITY', re.IGNORECASE)
def _fmt_type(t) -> str:
try:
base = int(t) & 0xFFFFFFFF if int(t) > 0xFFFFFFFF else int(t)
except (TypeError, ValueError):
return f'type={t}'
return {
1: '文本', 3: '图片', 34: '语音', 42: '名片', 43: '视频',
47: '表情', 48: '位置', 49: '链接/文件', 50: '通话',
10000: '系统', 10002: '撤回',
}.get(base, f'type={base}')
def _decompress(content, ct) -> str | None:
if ct == 4 and isinstance(content, bytes):
try:
return _zstd.decompress(content).decode('utf-8', errors='replace')
except Exception:
return None
if isinstance(content, bytes):
return content.decode('utf-8', errors='replace')
return content
def _fmt_content(local_id: int, local_type, content: str | None, is_group: bool) -> str:
try:
base = int(local_type) & 0xFFFFFFFF if int(local_type) > 0xFFFFFFFF else int(local_type)
except (TypeError, ValueError):
base = 0
if base == 3:
return f"[图片] local_id={local_id}"
if base == 47:
return "[表情]"
if base == 50:
return "[通话]"
# 群聊消息内容带 "sender:\n" 前缀,解析 XML 前先剥离
text = content or ''
if is_group and ':\n' in text:
text = text.split(':\n', 1)[1]
if base == 49 and text and '<appmsg' in text and not _XML_BAD.search(text):
try:
import xml.etree.ElementTree as ET
root = ET.fromstring(text)
appmsg = root.find('.//appmsg')
if appmsg is not None:
title = (appmsg.findtext('title') or '').strip()
atype = (appmsg.findtext('type') or '').strip()
if atype == '6':
return f"[文件] {title}" if title else "[文件]"
if atype == '57':
ref = appmsg.find('.//refermsg')
ref_content = ''
if ref is not None:
ref_content = re.sub(r'\s+', ' ', (ref.findtext('content') or '')).strip()
if len(ref_content) > 80:
ref_content = ref_content[:80] + '...'
quote = f"[引用] {title}" if title else "[引用]"
return f"{quote}\n{ref_content}" if ref_content else quote
if atype in ('33', '36', '44'):
return f"[小程序] {title}" if title else "[小程序]"
return f"[链接] {title}" if title else "[链接/文件]"
except Exception:
pass
return text
def _resolve_username(chat_name: str) -> str | None:
names = _load_names()
if chat_name in names or '@chatroom' in chat_name or chat_name.startswith('wxid_'):
return chat_name
low = chat_name.lower()
for uname, display in names.items():
if low == display.lower():
return uname
for uname, display in names.items():
if low in display.lower():
return uname
return None
def _load_id2u(conn: sqlite3.Connection) -> dict[int, str]:
try:
return {r: u for r, u in conn.execute("SELECT rowid, user_name FROM Name2Id").fetchall() if u}
except Exception:
return {}
def _sender_label(real_sender_id, content, is_group, chat_username, id2u, names) -> str:
sender_uname = id2u.get(real_sender_id, '')
if is_group:
if sender_uname and sender_uname != chat_username:
return names.get(sender_uname, sender_uname)
if content and ':\n' in content:
raw = content.split(':\n', 1)[0]
return names.get(raw, raw)
return ''
return names.get(sender_uname, '') if sender_uname and sender_uname != chat_username else ''
def _find_msg_tables(username: str) -> list[dict]:
table_name = f"Msg_{hashlib.md5(username.encode()).hexdigest()}"
if not re.fullmatch(r'Msg_[0-9a-f]{32}', table_name):
return []
results = []
for rel_key in MSG_DB_KEYS:
path = _db.get(rel_key)
if not path:
continue
try:
with closing(sqlite3.connect(path)) as conn:
exists = conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?", (table_name,)
).fetchone()
if not exists:
continue
max_ts = conn.execute(f"SELECT MAX(create_time) FROM [{table_name}]").fetchone()[0] or 0
results.append({'path': path, 'table': table_name, 'max_ts': max_ts})
except Exception:
continue
results.sort(key=lambda x: x['max_ts'], reverse=True)
return results
# ─── 查询函数 ─────────────────────────────────────────────────────────────────
def q_sessions(limit: int = 20) -> dict:
path = _db.get(os.path.join("session", "session.db"))
if not path:
return {"error": "无法解密 session.db"}
names = _load_names()
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute("""
SELECT username, unread_count, summary, last_timestamp,
last_msg_type, last_msg_sender, last_sender_display_name
FROM SessionTable
WHERE last_timestamp > 0
ORDER BY last_timestamp DESC LIMIT ?
""", (limit,)).fetchall()
results = []
for username, unread, summary, ts, msg_type, sender, sender_name in rows:
display = names.get(username, username)
is_group = '@chatroom' in username
if isinstance(summary, bytes):
try:
summary = _zstd.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
sender_display = ''
if is_group and sender:
sender_display = names.get(sender, sender_name or sender)
results.append({
"chat": display,
"username": username,
"is_group": is_group,
"unread": unread or 0,
"last_msg_type": _fmt_type(msg_type),
"last_sender": sender_display,
"summary": str(summary or ''),
"timestamp": ts,
"time": datetime.fromtimestamp(ts).strftime('%m-%d %H:%M'),
})
return {"sessions": results}
def q_history(chat_name: str, limit: int = 50, offset: int = 0,
since: int | None = None, until: int | None = None) -> dict:
username = _resolve_username(chat_name)
if not username:
return {"error": f"找不到联系人: {chat_name}"}
names = _load_names()
display = names.get(username, username)
is_group = '@chatroom' in username
tables = _find_msg_tables(username)
if not tables:
return {"error": f"找不到 {display} 的消息记录"}
all_msgs: list[dict] = []
for tbl in tables:
try:
with closing(sqlite3.connect(tbl['path'])) as conn:
id2u = _load_id2u(conn)
clauses, params = [], []
if since:
clauses.append('create_time >= ?'); params.append(since)
if until:
clauses.append('create_time <= ?'); params.append(until)
where = f"WHERE {' AND '.join(clauses)}" if clauses else ''
rows = conn.execute(
f"SELECT local_id, local_type, create_time, real_sender_id,"
f" message_content, WCDB_CT_message_content"
f" FROM [{tbl['table']}] {where}"
f" ORDER BY create_time DESC LIMIT ? OFFSET ?",
(*params, limit + offset, 0)
).fetchall()
for local_id, local_type, ts, real_sender_id, content, ct in rows:
content = _decompress(content, ct)
if content is None:
content = '(无法解压)'
sender = _sender_label(real_sender_id, content, is_group, username, id2u, names)
text = _fmt_content(local_id, local_type, content, is_group)
all_msgs.append({
"timestamp": ts,
"time": datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M'),
"sender": sender,
"content": text,
"type": _fmt_type(local_type),
"local_id": local_id,
})
except Exception:
continue
all_msgs.sort(key=lambda m: m['timestamp'], reverse=True)
paged = all_msgs[offset: offset + limit]
paged.sort(key=lambda m: m['timestamp'])
return {
"chat": display,
"username": username,
"is_group": is_group,
"count": len(paged),
"messages": paged,
}
def q_search(keyword: str, chats: list[str] | None = None,
limit: int = 20, since: int | None = None, until: int | None = None) -> dict:
names = _load_names()
results: list[dict] = []
# 构建搜索目标 (db_path, table_name, chat_display, username)
targets: list[tuple[str, str, str, str]] = []
if chats:
for chat_name in chats:
uname = _resolve_username(chat_name)
if not uname:
continue
for tbl in _find_msg_tables(uname):
targets.append((tbl['path'], tbl['table'], names.get(uname, uname), uname))
else:
md5_lookup = _get_md5_lookup()
for rel_key in MSG_DB_KEYS:
path = _db.get(rel_key)
if not path:
continue
try:
with closing(sqlite3.connect(path)) as conn:
table_rows = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'Msg_%'"
).fetchall()
for (tname,) in table_rows:
if not re.fullmatch(r'Msg_[0-9a-f]{32}', tname):
continue
uname = md5_lookup.get(tname[4:], '')
display = names.get(uname, uname) if uname else ''
targets.append((path, tname, display, uname))
except Exception:
continue
# 按 db_path 分组,减少重复打开
by_path: dict[str, list[tuple[str, str, str]]] = {}
for db_path, table, display, uname in targets:
by_path.setdefault(db_path, []).append((table, display, uname))
for db_path, table_list in by_path.items():
try:
with closing(sqlite3.connect(db_path)) as conn:
id2u = _load_id2u(conn)
for table, display, uname in table_list:
clauses = ['message_content LIKE ?']
params = [f'%{keyword}%']
if since:
clauses.append('create_time >= ?'); params.append(since)
if until:
clauses.append('create_time <= ?'); params.append(until)
where = f"WHERE {' AND '.join(clauses)}"
rows = conn.execute(
f"SELECT local_id, local_type, create_time, real_sender_id,"
f" message_content, WCDB_CT_message_content"
f" FROM [{table}] {where}"
f" ORDER BY create_time DESC LIMIT ?",
(*params, limit * 3)
).fetchall()
is_group = uname and '@chatroom' in uname
for local_id, local_type, ts, real_sender_id, content, ct in rows:
content = _decompress(content, ct)
if content is None:
continue
sender = _sender_label(real_sender_id, content, is_group or False,
uname or '', id2u, names)
text = _fmt_content(local_id, local_type, content, is_group or False)
chat_display = display or uname or table
results.append({
"timestamp": ts,
"time": datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M'),
"chat": chat_display,
"sender": sender,
"content": text,
"type": _fmt_type(local_type),
})
except Exception:
continue
results.sort(key=lambda r: r['timestamp'], reverse=True)
paged = results[:limit]
return {"keyword": keyword, "count": len(paged), "results": paged}
def q_contacts(query: str | None = None, limit: int = 50) -> dict:
names = _load_names()
contacts = [
{"username": u, "display": d}
for u, d in names.items()
if not u.startswith('gh_') # 排除公众号
and not u.startswith('biz_') # 排除服务号
]
if query:
low = query.lower()
contacts = [c for c in contacts
if low in c['display'].lower() or low in c['username'].lower()]
contacts.sort(key=lambda c: c['display'])
return {"contacts": contacts[:limit], "total": len(contacts)}
# ─── 实时推送watch────────────────────────────────────────────────────────
_watch_clients: list[queue.Queue] = []
_watch_lock = threading.Lock()
def _broadcast(event: dict) -> None:
line = json.dumps(event, ensure_ascii=False)
with _watch_lock:
dead = []
for q in _watch_clients:
try:
q.put_nowait(line)
except queue.Full:
dead.append(q)
for q in dead:
_watch_clients.remove(q)
def _wal_watcher() -> None:
"""后台线程:每 500ms 检测 session.db-wal 的 mtime有变化时推送新消息"""
last_mtime: dict[str, float] = {}
last_ts: dict[str, int] = {} # username -> last pushed timestamp
initialized = False
while True:
time.sleep(0.5)
with _watch_lock:
if not _watch_clients:
continue
session_wal = os.path.join(DB_DIR, "session", "session.db-wal")
try:
mtime = os.path.getmtime(session_wal)
except OSError:
continue
prev = last_mtime.get(session_wal, 0.0)
if mtime == prev:
continue
last_mtime[session_wal] = mtime
# 解密 session.db缓存会处理 mtime只有真的变了才重新解密
path = _db.get(os.path.join("session", "session.db"))
if not path:
continue
names = _load_names()
try:
with closing(sqlite3.connect(path)) as conn:
rows = conn.execute("""
SELECT username, summary, last_timestamp, last_msg_type, last_msg_sender
FROM SessionTable WHERE last_timestamp > 0
ORDER BY last_timestamp DESC LIMIT 50
""").fetchall()
except Exception:
continue
for username, summary, ts, msg_type, sender in rows:
if not initialized:
# 第一轮只建立基线,不推送
last_ts[username] = ts
continue
prev_ts = last_ts.get(username, 0)
if ts <= prev_ts:
continue
last_ts[username] = ts
display = names.get(username, username)
is_group = '@chatroom' in username
if isinstance(summary, bytes):
try:
summary = _zstd.decompress(summary).decode('utf-8', errors='replace')
except Exception:
summary = '(压缩内容)'
if isinstance(summary, str) and ':\n' in summary:
summary = summary.split(':\n', 1)[1]
sender_display = names.get(sender, sender) if sender else ''
_broadcast({
"event": "message",
"time": datetime.fromtimestamp(ts).strftime('%H:%M'),
"chat": display,
"username": username,
"is_group": is_group,
"sender": sender_display,
"content": str(summary or ''),
"type": _fmt_type(msg_type),
"timestamp": ts,
})
if not initialized:
initialized = True
# ─── 命令路由 ─────────────────────────────────────────────────────────────────
def _dispatch(req: dict) -> dict:
cmd = req.get("cmd", "")
try:
if cmd == "ping":
return {"ok": True, "pong": True}
if cmd == "sessions":
return {"ok": True, **q_sessions(int(req.get("limit", 20)))}
if cmd == "history":
return {"ok": True, **q_history(
req["chat"],
limit=int(req.get("limit", 50)),
offset=int(req.get("offset", 0)),
since=req.get("since"),
until=req.get("until"),
)}
if cmd == "search":
return {"ok": True, **q_search(
req["keyword"],
chats=req.get("chats"),
limit=int(req.get("limit", 20)),
since=req.get("since"),
until=req.get("until"),
)}
if cmd == "contacts":
return {"ok": True, **q_contacts(req.get("query"), int(req.get("limit", 50)))}
return {"ok": False, "error": f"未知命令: {cmd}"}
except KeyError as e:
return {"ok": False, "error": f"缺少参数: {e}"}
except Exception as e:
return {"ok": False, "error": str(e)}
# ─── Unix Socket Server ───────────────────────────────────────────────────────
def _handle_client(conn: socket.socket) -> None:
try:
f = conn.makefile('rwb', buffering=0)
line = f.readline()
if not line:
return
req = json.loads(line.decode('utf-8'))
if req.get("cmd") == "watch":
# 流式模式daemon 持续推事件,直到客户端断开
q: queue.Queue = queue.Queue(maxsize=500)
with _watch_lock:
_watch_clients.append(q)
_write_line(f, {"event": "connected"})
try:
while True:
try:
event_line = q.get(timeout=30)
f.write((event_line + '\n').encode())
f.flush()
except queue.Empty:
_write_line(f, {"event": "heartbeat"})
except (BrokenPipeError, ConnectionResetError, OSError):
pass
finally:
with _watch_lock:
try:
_watch_clients.remove(q)
except ValueError:
pass
else:
resp = _dispatch(req)
_write_line(f, resp)
except Exception as e:
try:
_write_line(conn.makefile('rwb', buffering=0), {"ok": False, "error": str(e)})
except Exception:
pass
finally:
try:
conn.close()
except Exception:
pass
def _write_line(f, obj: dict) -> None:
f.write((json.dumps(obj, ensure_ascii=False) + '\n').encode())
f.flush()
def _serve() -> None:
if os.path.exists(SOCK_PATH):
os.unlink(SOCK_PATH)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.bind(SOCK_PATH)
os.chmod(SOCK_PATH, 0o600)
server.listen(64)
_log(f"监听 {SOCK_PATH}")
while True:
try:
conn, _ = server.accept()
threading.Thread(target=_handle_client, args=(conn,), daemon=True).start()
except Exception:
pass
# ─── 守护进程化 ───────────────────────────────────────────────────────────────
def _daemonize() -> None:
if os.fork() > 0:
sys.exit(0)
os.setsid()
if os.fork() > 0:
sys.exit(0)
sys.stdin = open(os.devnull, 'r')
log_file = open(LOG_PATH, 'a', buffering=1)
sys.stdout = log_file
sys.stderr = log_file
# ─── 入口 ─────────────────────────────────────────────────────────────────────
def main(foreground: bool = False) -> None:
if not foreground:
_daemonize()
with open(PID_PATH, 'w') as f:
f.write(str(os.getpid()))
def _cleanup(sig=None, frame=None):
for p in (SOCK_PATH, PID_PATH):
try:
os.unlink(p)
except OSError:
pass
sys.exit(0)
signal.signal(signal.SIGTERM, _cleanup)
signal.signal(signal.SIGINT, _cleanup)
_log("wx-daemon 启动")
_log(f"DB_DIR: {DB_DIR}")
_log(f"密钥数量: {len(ALL_KEYS)}")
# 预热:加载联系人 + 解密 session.db最常用的两个
_load_names()
_db.get(os.path.join("session", "session.db"))
_log(f"预热完成,联系人 {len(_names or {})}")
# WAL 监听线程
threading.Thread(target=_wal_watcher, daemon=True, name='wal-watcher').start()
# Socket server阻塞
_serve()
if __name__ == "__main__":
main(foreground='--fg' in sys.argv)