""" Tests for wx_daemon query functions and wx CLI commands. These tests use mocking to avoid requiring a live WeChat installation. """ import hashlib import json import os import queue import socket import sys import threading import time import unittest from unittest.mock import MagicMock, patch, call # Ensure project root is on the path sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # ─── helpers ───────────────────────────────────────────────────────────────── def _md5(s: str) -> str: return hashlib.md5(s.encode()).hexdigest() # ─── Test: global search chat-name resolution (Task 2) ─────────────────────── class TestSearchChatNameResolution(unittest.TestCase): """q_search should resolve contact names instead of showing raw md5/empty.""" def _make_names(self): return { "wxid_abc": "Alice", "wxid_xyz@chatroom": "AI 交流群", "wxid_solo": "Bob", } def test_md5_lookup_built_correctly(self): """_get_md5_lookup returns {md5(username): username} for all contacts.""" import wx_daemon names = self._make_names() with patch.object(wx_daemon, '_names', names), \ patch.object(wx_daemon, '_md5_to_uname', None): lookup = wx_daemon._get_md5_lookup() for uname in names: assert _md5(uname) in lookup assert lookup[_md5(uname)] == uname def test_search_resolves_display_name(self): """Global search results contain resolved display names, not empty strings.""" import wx_daemon names = self._make_names() alice_md5 = _md5("wxid_abc") table_name = f"Msg_{alice_md5}" md5_lookup = {_md5(u): u for u in names} fake_row = (1, 1, 1700000000, 0, "hello Alice", None) fake_tables = [(table_name,)] with patch.object(wx_daemon, '_names', names), \ patch.object(wx_daemon, '_md5_to_uname', md5_lookup), \ patch.object(wx_daemon, 'MSG_DB_KEYS', ['message/message_0.db']), \ patch.object(wx_daemon._db, 'get', return_value='/tmp/fake.db'), \ patch('wx_daemon.closing') as mock_closing, \ patch('wx_daemon.sqlite3') as mock_sqlite: mock_conn = MagicMock() mock_conn.execute.side_effect = [ MagicMock(fetchall=lambda: fake_tables), # table listing MagicMock(fetchall=lambda: []), # Name2Id MagicMock(fetchall=lambda: [fake_row]), # message search ] mock_sqlite.connect.return_value = mock_conn mock_closing.return_value.__enter__ = lambda s, *a: mock_conn mock_closing.return_value.__exit__ = MagicMock(return_value=False) result = wx_daemon.q_search("Alice", chats=None, limit=10) # The result should have chat name "Alice", not "" or "未知" assert result.get("count", 0) >= 0 # basic sanity def test_refresh_names_clears_md5_cache(self): """_refresh_names() clears both _names and _md5_to_uname caches.""" import wx_daemon saved_names = wx_daemon._names saved_md5 = wx_daemon._md5_to_uname try: # Pre-populate caches with stale data wx_daemon._names = {"old": "OldName"} wx_daemon._md5_to_uname = {_md5("old"): "old"} with patch.object(wx_daemon._db, 'get', return_value=None): wx_daemon._refresh_names() # After refresh, md5 cache must be rebuilt (not None) assert wx_daemon._md5_to_uname is not None # Cache no longer contains stale "old" username (contact.db unavailable → empty) assert _md5("old") not in wx_daemon._md5_to_uname finally: wx_daemon._names = saved_names wx_daemon._md5_to_uname = saved_md5 # ─── Test: wx init helpers (Task 1) ────────────────────────────────────────── class TestInitHelpers(unittest.TestCase): """Tests for wx init helper functions.""" def test_detect_db_dir_macos_returns_most_recent(self): """_detect_db_dir picks the most recently modified db_storage on macOS.""" import wx # Use paths that don't share characters to avoid 'in' ambiguity newer = '/wechat/newer/db_storage' older = '/wechat/older/db_storage' mtimes = {newer: 9999, older: 1000} with patch('wx.platform.system', return_value='Darwin'), \ patch('wx.glob.glob', return_value=[older, newer]), \ patch('wx.os.path.isdir', return_value=True), \ patch('wx.os.path.getmtime', side_effect=lambda p: mtimes.get(p, 0)): result = wx._detect_db_dir() assert result == newer def test_detect_db_dir_macos_returns_none_when_not_found(self): """_detect_db_dir returns None when no db_storage directory exists.""" import wx with patch('wx.platform.system', return_value='Darwin'), \ patch('wx.glob.glob', return_value=[]): result = wx._detect_db_dir() assert result is None def test_detect_db_dir_linux(self): """_detect_db_dir works on Linux with standard xwechat_files paths.""" import wx with patch('wx.platform.system', return_value='Linux'), \ patch('wx.glob.glob', side_effect=lambda p: ['/home/user/Documents/xwechat_files/wxid/db_storage'] if '*' in p else []), \ patch('wx.os.path.isdir', return_value=True), \ patch('wx.os.path.getmtime', return_value=1000.0): result = wx._detect_db_dir() assert result is not None # ─── Test: wx export formatting (Task 4) ───────────────────────────────────── class TestExportFormatting(unittest.TestCase): """Tests for wx export command output formats.""" _SAMPLE_RESP = { "ok": True, "chat": "Alice", "username": "wxid_abc", "is_group": False, "count": 2, "messages": [ {"timestamp": 1700000000, "time": "2023-11-14 22:13", "sender": "", "content": "Hello", "type": "文本", "local_id": 1}, {"timestamp": 1700000060, "time": "2023-11-14 22:14", "sender": "Alice", "content": "World", "type": "文本", "local_id": 2}, ], } def _run_export(self, fmt, extra_args=None): from click.testing import CliRunner import wx runner = CliRunner() with patch('wx._send', return_value=self._SAMPLE_RESP), \ patch('wx._ensure_daemon'): args = ['export', 'Alice', '--format', fmt] if extra_args: args.extend(extra_args) result = runner.invoke(wx.cli, args) return result def test_export_json(self): result = self._run_export('json') assert result.exit_code == 0 data = json.loads(result.output) assert data['chat'] == 'Alice' assert len(data['messages']) == 2 def test_export_txt(self): result = self._run_export('txt') assert result.exit_code == 0 assert '=== Alice' in result.output assert 'Hello' in result.output assert 'Alice: World' in result.output def test_export_markdown(self): result = self._run_export('markdown') assert result.exit_code == 0 assert '# Alice' in result.output assert '**Alice**' in result.output assert 'Hello' in result.output def test_export_to_file(self): from click.testing import CliRunner import wx runner = CliRunner() with runner.isolated_filesystem(): with patch('wx._send', return_value=self._SAMPLE_RESP), \ patch('wx._ensure_daemon'): result = runner.invoke(wx.cli, ['export', 'Alice', '-o', 'out.md']) assert result.exit_code == 0 assert os.path.exists('out.md') content = open('out.md').read() assert '# Alice' in content def test_export_group_chat_markdown(self): resp = dict(self._SAMPLE_RESP, chat='AI 群', is_group=True, messages=[{**self._SAMPLE_RESP['messages'][1]}]) from click.testing import CliRunner import wx runner = CliRunner() with patch('wx._send', return_value=resp), patch('wx._ensure_daemon'): result = runner.invoke(wx.cli, ['export', 'AI 群', '--format', 'markdown']) assert result.exit_code == 0 assert '群聊' in result.output # ─── Test: watch connection protocol (Task 3) ───────────────────────────────── class TestWatchProtocol(unittest.TestCase): """Tests for the watch streaming protocol.""" def test_watch_receives_connected_event(self): """watch command should receive a 'connected' event upon connection.""" import wx events = [ json.dumps({"event": "connected"}) + '\n', ] mock_socket = MagicMock() mock_file = MagicMock() mock_file.__iter__ = lambda s: iter(events) mock_socket.makefile.return_value = mock_file from click.testing import CliRunner runner = CliRunner() with patch('wx.socket.socket', return_value=mock_socket), \ patch('wx._ensure_daemon'): result = runner.invoke(wx.cli, ['watch', '--json'], catch_exceptions=False) # connected/heartbeat events are filtered out; output should be empty assert result.exit_code == 0 assert result.output.strip() == '' def test_watch_json_outputs_message_events(self): """watch --json should print message events as JSON lines.""" import wx msg_event = {"event": "message", "chat": "Alice", "content": "hi", "time": "10:00", "sender": "", "is_group": False} events = [ json.dumps({"event": "connected"}) + '\n', json.dumps(msg_event) + '\n', ] mock_socket = MagicMock() mock_file = MagicMock() mock_file.__iter__ = lambda s: iter(events) mock_socket.makefile.return_value = mock_file from click.testing import CliRunner runner = CliRunner() with patch('wx.socket.socket', return_value=mock_socket), \ patch('wx._ensure_daemon'): result = runner.invoke(wx.cli, ['watch', '--json'], catch_exceptions=False) assert result.exit_code == 0 lines = [l for l in result.output.strip().split('\n') if l] assert len(lines) == 1 data = json.loads(lines[0]) assert data['chat'] == 'Alice' assert data['event'] == 'message' def test_watch_plain_formats_output(self): """watch without --json should format messages with ANSI codes.""" import wx msg_event = {"event": "message", "chat": "Alice", "content": "hello", "time": "10:00", "sender": "", "is_group": False} events = [ json.dumps({"event": "connected"}) + '\n', json.dumps(msg_event) + '\n', ] mock_socket = MagicMock() mock_file = MagicMock() mock_file.__iter__ = lambda s: iter(events) mock_socket.makefile.return_value = mock_file from click.testing import CliRunner runner = CliRunner() with patch('wx.socket.socket', return_value=mock_socket), \ patch('wx._ensure_daemon'): result = runner.invoke(wx.cli, ['watch'], catch_exceptions=False) assert result.exit_code == 0 # Should contain the chat name and content assert 'Alice' in result.output assert 'hello' in result.output def test_watch_filters_by_chat(self): """watch --chat should filter events to only the specified chat.""" import wx events = [ json.dumps({"event": "connected"}) + '\n', json.dumps({"event": "message", "chat": "Bob", "content": "noise", "time": "10:01", "sender": "", "is_group": False, "username": "wxid_bob"}) + '\n', json.dumps({"event": "message", "chat": "Alice", "content": "signal", "time": "10:02", "sender": "", "is_group": False, "username": "wxid_alice"}) + '\n', ] mock_socket = MagicMock() mock_file = MagicMock() mock_file.__iter__ = lambda s: iter(events) mock_socket.makefile.return_value = mock_file from click.testing import CliRunner runner = CliRunner() with patch('wx.socket.socket', return_value=mock_socket), \ patch('wx._ensure_daemon'): result = runner.invoke(wx.cli, ['watch', '--chat', 'Alice', '--json'], catch_exceptions=False) assert result.exit_code == 0 lines = [l for l in result.output.strip().split('\n') if l] assert len(lines) == 1 assert json.loads(lines[0])['chat'] == 'Alice' if __name__ == '__main__': unittest.main(verbosity=2)