diff --git a/README.md b/README.md index 8807e33..b63a550 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ claude mcp add wechat -- python C:\Users\你的用户名\wechat-decrypt\mcp_serv 前置条件:需要先运行 `python main.py` 或 `python find_all_keys.py` 完成密钥提取。 -说明:`get_chat_history` 和 `search_messages` 的 `limit` 最大为 `500`。 +说明:`search_messages` 的 `limit` 最大为 `500`;`get_chat_history` 支持更大的 `limit`,但消息很多时仍建议配合 `offset` 分页读取。 **[查看使用案例 →](USAGE.md)** diff --git a/USAGE.md b/USAGE.md index c193191..0d8717c 100644 --- a/USAGE.md +++ b/USAGE.md @@ -191,7 +191,7 @@ Claude 可以获取大量消息后自动分析活跃度、话题分布、关键 > 帮我分析一下██群最近一周的情况 ``` -Claude 会调用 `get_chat_history(chat_name="██群", limit=500)` 获取消息,然后输出: +Claude 会调用 `get_chat_history(chat_name="██群", limit=500)` 获取消息,然后输出。消息很多时,也可以把 `limit` 设得更大,或配合 `offset` 分页读取: ``` ## ██群最近一周分析 diff --git a/mcp_server.py b/mcp_server.py index f1c1547..8ef1f05 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -224,6 +224,7 @@ _self_username = None _XML_UNSAFE_RE = re.compile(r' _QUERY_LIMIT_MAX: - raise ValueError(f"limit 不能大于 {_QUERY_LIMIT_MAX}") + if limit_max is not None and limit > limit_max: + raise ValueError(f"limit 不能大于 {limit_max}") if offset < 0: raise ValueError("offset 不能小于 0") @@ -841,8 +842,6 @@ def _build_history_line(row, ctx, names, id_to_username): sender, text = _format_message_text( local_id, local_type, content, ctx['is_group'], ctx['username'], ctx['display_name'], names ) - if text and len(text) > 500: - text = text[:500] + '...' sender_label = _resolve_sender_label( real_sender_id, sender, ctx['is_group'], ctx['username'], ctx['display_name'], names, id_to_username @@ -880,6 +879,10 @@ def _message_query_batch_size(candidate_limit): return candidate_limit +def _history_query_batch_size(candidate_limit): + return min(candidate_limit, _HISTORY_QUERY_BATCH_SIZE) + + def _page_ranked_entries(entries, limit, offset): ordered = sorted(entries, key=lambda item: item[0], reverse=True) paged = ordered[offset:offset + limit] @@ -891,22 +894,40 @@ def _collect_chat_history_lines(ctx, names, start_ts=None, end_ts=None, limit=20 collected = [] failures = [] candidate_limit = _candidate_page_size(limit, offset) + batch_size = _history_query_batch_size(candidate_limit) for table_ctx in _iter_table_contexts(ctx): try: with closing(sqlite3.connect(table_ctx['db_path'])) as conn: id_to_username = _load_name2id_maps(conn) + fetch_offset = 0 + collected_before_table = len(collected) # 当前页上的消息一定落在各分表最近的 offset+limit 条记录内。 - rows = _query_messages( - conn, - table_ctx['table_name'], - start_ts=start_ts, - end_ts=end_ts, - limit=candidate_limit, - offset=0, - ) - for row in rows: - collected.append(_build_history_line(row, table_ctx, names, id_to_username)) + while len(collected) - collected_before_table < candidate_limit: + rows = _query_messages( + conn, + table_ctx['table_name'], + start_ts=start_ts, + end_ts=end_ts, + limit=batch_size, + offset=fetch_offset, + ) + if not rows: + break + fetch_offset += len(rows) + + for row in rows: + try: + collected.append(_build_history_line(row, table_ctx, names, id_to_username)) + except Exception as e: + failures.append( + f"{table_ctx['display_name']} local_id={row[0]} create_time={row[2]}: {e}" + ) + if len(collected) - collected_before_table >= candidate_limit: + break + + if len(rows) < batch_size: + break except Exception as e: failures.append(f"{table_ctx['db_path']}: {e}") @@ -1221,16 +1242,16 @@ def get_chat_history(chat_name: str, limit: int = 50, offset: int = 0, start_tim Args: chat_name: 聊天对象的名字、备注名或wxid,自动模糊匹配 - limit: 返回的消息数量,默认50,最大500 + limit: 返回的消息数量,默认50;支持较大的值,建议配合 offset 分页使用 offset: 分页偏移量,默认0 start_time: 起始时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS end_time: 结束时间,支持 YYYY-MM-DD / YYYY-MM-DD HH:MM / YYYY-MM-DD HH:MM:SS - """ - try: - _validate_pagination(limit, offset) - start_ts, end_ts = _parse_time_range(start_time, end_time) - except ValueError as e: - return f"错误: {e}" + """ + try: + _validate_pagination(limit, offset, limit_max=None) + start_ts, end_ts = _parse_time_range(start_time, end_time) + except ValueError as e: + return f"错误: {e}" ctx = _resolve_chat_context(chat_name) if not ctx: diff --git a/tests/test_mcp_server_search.py b/tests/test_mcp_server_search.py index 859d996..49b9ec7 100644 --- a/tests/test_mcp_server_search.py +++ b/tests/test_mcp_server_search.py @@ -72,6 +72,10 @@ class SearchMessagesTests(unittest.TestCase): with self.assertRaisesRegex(ValueError, "limit 不能大于 500"): mcp_server._validate_pagination(501, 0) + def test_validate_pagination_allows_large_limit_when_limit_is_unbounded(self): + # get_chat_history 允许更大的 limit,只校验正数和 offset。 + mcp_server._validate_pagination(999999, 0, limit_max=None) + def test_page_search_entries_returns_chronological_results_with_offset(self): # 结果应先按最新时间分页,再把当前页恢复成时间正序输出。 entries = [(1, "a"), (5, "e"), (3, "c"), (4, "d"), (2, "b")] @@ -377,6 +381,43 @@ class SearchMessagesTests(unittest.TestCase): self.assertIn("new message", result) self.assertNotIn("old message", result) + def test_get_chat_history_large_limit_reads_all_rows_across_shards(self): + # 大 limit 下,跨分片历史查询不能只返回较旧分片里的少量消息。 + older_messages = [ + (index, 1000 + index, f"old shard message {index}") + for index in range(1, 18) + ] + newer_messages = [ + (index, 2000 + index, f"new shard message {index}") + for index in range(1, 296) + ] + older_db = self.create_db("history_cross_shard_older.db", {"alice": older_messages}) + newer_db = self.create_db("history_cross_shard_newer.db", {"alice": newer_messages}) + ctx = { + "query": "Alice", + "username": "alice", + "display_name": "Alice", + "db_path": newer_db, + "table_name": _msg_table_name("alice"), + "message_tables": [ + {"db_path": newer_db, "table_name": _msg_table_name("alice")}, + {"db_path": older_db, "table_name": _msg_table_name("alice")}, + ], + "is_group": False, + } + + with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object( + mcp_server, "_resolve_chat_context", return_value=ctx + ): + result = mcp_server.get_chat_history("Alice", limit=500, offset=0) + + self.assertIn("Alice 的消息记录(返回 312 条,offset=0, limit=500)", result) + self.assertIn("new shard message 295", result) + self.assertIn("old shard message 17", result) + + body = result.split(":\n\n", 1)[1] + self.assertEqual(len(body.splitlines()), 312) + def test_get_chat_history_uses_bounded_sql_pagination(self): # 历史查询应把 offset+limit 下推到 SQL,避免把整张消息表读出来后再切片。 db_path = self.create_db( @@ -419,6 +460,36 @@ class SearchMessagesTests(unittest.TestCase): self.assertNotIn("oldest", result) self.assertEqual(calls, [(_msg_table_name("alice"), 3, 0)]) + def test_get_chat_history_allows_large_limit_values(self): + # 历史查询不应再把大 limit 直接拒绝掉。 + db_path = self.create_db( + "history_large_limit.db", + { + "alice": [ + (1, 200, "message 1"), + (2, 100, "message 2"), + ] + }, + ) + ctx = { + "query": "Alice", + "username": "alice", + "display_name": "Alice", + "db_path": db_path, + "table_name": _msg_table_name("alice"), + "message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}], + "is_group": False, + } + + with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object( + mcp_server, "_resolve_chat_context", return_value=ctx + ): + result = mcp_server.get_chat_history("Alice", limit=999999, offset=0) + + self.assertNotIn("错误:", result) + self.assertIn("message 1", result) + self.assertIn("message 2", result) + def test_get_chat_history_keeps_partial_results_when_formatting_fails(self): # 单条坏消息不应让整个历史查询失败,已有结果仍应返回并附带失败说明。 db_path = self.create_db( @@ -452,6 +523,31 @@ class SearchMessagesTests(unittest.TestCase): self.assertIn("查询失败:", result) self.assertIn("bad row", result) + def test_get_chat_history_does_not_truncate_long_messages(self): + # 历史记录应返回完整消息内容,而不是固定截断到 500 字符。 + long_message = "x" * 600 + db_path = self.create_db( + "history_long_message.db", + {"alice": [(1, 200, long_message)]}, + ) + ctx = { + "query": "Alice", + "username": "alice", + "display_name": "Alice", + "db_path": db_path, + "table_name": _msg_table_name("alice"), + "message_tables": [{"db_path": db_path, "table_name": _msg_table_name("alice")}], + "is_group": False, + } + + with patch.object(mcp_server, "get_contact_names", return_value={"alice": "Alice"}), patch.object( + mcp_server, "_resolve_chat_context", return_value=ctx + ): + result = mcp_server.get_chat_history("Alice", limit=1, offset=0) + + self.assertIn(long_message, result) + self.assertNotIn(("x" * 500) + "...", result) + def test_search_messages_single_chat_merges_sharded_message_tables(self): # 单聊搜索也要跨分片合并,否则最近消息可能查不到。 older_db = self.create_db("search_older.db", {"alice": [(1, 100, "foo old")]})