feat: implement Linux V2 image AES key extraction via kvcomm

- Extract uin from ~/.xwechat/net/kvcomm/ filenames
- Derive AES key using md5(str(uin) + normalize(wxid))[:16]
- Add memory scanning fallback
- Fix kvcomm path resolution for daemon running as root (SUDO_USER)
pull/88/head
aibii-official 2026-05-28 16:41:42 +08:00
parent 08af894594
commit 34434ec6f0
2 changed files with 328 additions and 8 deletions

View File

@ -1,11 +1,330 @@
use anyhow::{bail, Result};
//! Linux V2 image AES key 提取。
//!
//! 主路径:从 `~/.xwechat/net/kvcomm/` 目录的文件名中提取 uin
//! 然后用 `md5(str(uin) + normalize(wxid)).hex()[:16]` 派生 AES key。
//!
//! fallback扫描 WeChat 进程内存,匹配 `[A-Za-z0-9]{32}` / `[A-Za-z0-9]{16}`
//! 用 V2 模板 AES block 反验。
use super::{ImageKeyMaterial, ImageKeyProvider};
use anyhow::{bail, Context, Result};
use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
pub struct LinuxImageKeyProvider;
use crate::config;
impl ImageKeyProvider for LinuxImageKeyProvider {
fn get_key(&self, _wxid: &str) -> Result<ImageKeyMaterial> {
bail!("Linux V2 图片 key 当前未实现;请先用 legacy/V1 图片或在 README 中标注 unsupported")
use super::{
ascii_alnum_candidates, attach_root_for_db_dir, configured_db_dir_for_wxid,
derive_xor_key_from_v2_dat, find_v2_template_ciphertexts, normalize_wxid, verify_aes_key,
wxid_from_db_dir, xwechat_files_root, ImageKeyMaterial, ImageKeyProvider,
};
const CHUNK_SIZE: usize = 2 * 1024 * 1024;
pub struct LinuxImageKeyProvider {
configured_db_dir: Result<PathBuf, String>,
cache: Mutex<HashMap<String, ImageKeyMaterial>>,
}
impl LinuxImageKeyProvider {
pub fn from_current_config() -> Self {
let configured_db_dir = config::load_config()
.map(|cfg| cfg.db_dir)
.map_err(|err| err.to_string());
Self {
configured_db_dir,
cache: Mutex::new(HashMap::new()),
}
}
}
impl ImageKeyProvider for LinuxImageKeyProvider {
fn get_key(&self, wxid: &str) -> Result<ImageKeyMaterial> {
let cache_key = normalize_wxid(wxid);
if let Some(found) = self.cache.lock().unwrap().get(&cache_key).copied() {
return Ok(found);
}
let configured_db_dir = self
.configured_db_dir
.as_ref()
.map_err(|err| anyhow::anyhow!("读取 config.db_dir 失败: {}", err))?;
let db_dir = configured_db_dir_for_wxid(configured_db_dir, wxid);
let attach_dir = attach_root_for_db_dir(&db_dir);
let key = derive_key_for_paths(&db_dir, &attach_dir)?;
self.cache.lock().unwrap().insert(cache_key, key);
Ok(key)
}
}
fn derive_key_for_paths(db_dir: &Path, attach_dir: &Path) -> Result<ImageKeyMaterial> {
let templates = find_v2_template_ciphertexts(attach_dir, 3, 64)?;
if templates.is_empty() {
bail!("在 {} 下找不到 V2 模板文件", attach_dir.display());
}
if let Some(found) = find_via_kvcomm(db_dir, &templates)? {
return Ok(found);
}
let xor_key = derive_xor_key_from_v2_dat(attach_dir, 10, 3)?
.map(|(key, _, _)| key)
.unwrap_or(0x88);
if let Some(aes_key) = scan_memory_for_key(&templates)? {
return Ok(ImageKeyMaterial { aes_key, xor_key });
}
bail!("Linux V2 图片 key 提取失败kvcomm 和内存扫描均未找到有效 key")
}
fn find_via_kvcomm(db_dir: &Path, templates: &[[u8; 16]]) -> Result<Option<ImageKeyMaterial>> {
let Some(kvcomm_dir) = find_existing_kvcomm_dir(db_dir) else {
return Ok(None);
};
let uins = collect_kvcomm_uins(&kvcomm_dir)?;
if uins.is_empty() {
return Ok(None);
}
let wxids = collect_wxid_candidates(db_dir);
if wxids.is_empty() {
return Ok(None);
}
for wxid in &wxids {
for uin in &uins {
let candidate = derive_image_key_material(*uin, wxid);
if verify_aes_key(&candidate.aes_key, templates) {
return Ok(Some(candidate));
}
}
}
Ok(None)
}
fn derive_image_key_material(uin: u32, wxid: &str) -> ImageKeyMaterial {
let xor_key = (uin & 0xFF) as u8;
let digest = format!("{:x}", md5::compute(format!("{}{}", uin, wxid)));
let mut aes_key = [0u8; 16];
aes_key.copy_from_slice(&digest.as_bytes()[..16]);
ImageKeyMaterial { aes_key, xor_key }
}
fn collect_wxid_candidates(db_dir: &Path) -> Vec<String> {
let Some(raw) = wxid_from_db_dir(db_dir) else {
return Vec::new();
};
let mut out = vec![raw.clone()];
let normalized = normalize_wxid(&raw);
if normalized != raw {
out.push(normalized);
}
out
}
fn derive_kvcomm_dir_candidates(db_dir: &Path) -> Vec<PathBuf> {
let mut candidates = Vec::new();
// 从 db_dir 推导xwechat_files_root 的父目录的父目录通常是 $HOME
// Linux 默认路径: $HOME/Documents/xwechat_files
if let Some(root) = xwechat_files_root(db_dir) {
let mut dir = root.as_path();
// 向上遍历,在每一级检查 .xwechat/net/kvcomm
while let Some(parent) = dir.parent() {
let kvcomm = parent.join(".xwechat/net/kvcomm");
if !candidates.contains(&kvcomm) {
candidates.push(kvcomm);
}
dir = parent;
}
}
// 从 config::cli_dir() 推导(已处理 SUDO_USER
if let Some(home) = config::cli_dir().parent() {
let path = home.join(".xwechat/net/kvcomm");
if !candidates.contains(&path) {
candidates.push(path);
}
}
// dirs::home_dir() 作为最后兜底
if let Some(home) = dirs::home_dir() {
let path = home.join(".xwechat/net/kvcomm");
if !candidates.contains(&path) {
candidates.push(path);
}
}
candidates
}
fn find_existing_kvcomm_dir(db_dir: &Path) -> Option<PathBuf> {
derive_kvcomm_dir_candidates(db_dir)
.into_iter()
.find(|path| path.is_dir())
}
fn collect_kvcomm_uins(kvcomm_dir: &Path) -> Result<Vec<u32>> {
let mut uins = std::collections::BTreeSet::new();
for entry in std::fs::read_dir(kvcomm_dir)? {
let entry = entry?;
let Some(name) = entry.file_name().to_str().map(|value| value.to_string()) else {
continue;
};
if let Some(uin) = extract_uin_from_filename(&name) {
uins.insert(uin);
}
}
Ok(uins.into_iter().collect())
}
fn extract_uin_from_filename(filename: &str) -> Option<u32> {
let parts: Vec<&str> = filename.split('_').collect();
for part in parts {
if let Ok(uin) = part.parse::<u32>() {
if uin > 1000000 {
return Some(uin);
}
}
}
None
}
fn scan_memory_for_key(templates: &[[u8; 16]]) -> Result<Option<[u8; 16]>> {
let Some(pid) = find_wechat_pid() else {
return Ok(None);
};
let regions = parse_maps(pid)?;
let mem_path = format!("/proc/{}/mem", pid);
let Ok(mut mem_file) = std::fs::File::open(&mem_path) else {
return Ok(None);
};
let mut seen = HashSet::<[u8; 16]>::new();
for (start, end) in &regions {
if let Some(key) = scan_region(&mut mem_file, *start, *end, templates, &mut seen)? {
return Ok(Some(key));
}
}
Ok(None)
}
fn find_wechat_pid() -> Option<u32> {
let proc_dir = std::fs::read_dir("/proc").ok()?;
for entry in proc_dir.flatten() {
let name = entry.file_name();
let name_str = name.to_string_lossy();
if !name_str.chars().all(|c| c.is_ascii_digit()) {
continue;
}
let comm_path = format!("/proc/{}/comm", name_str);
if let Ok(comm) = std::fs::read_to_string(&comm_path) {
let comm = comm.trim().to_lowercase();
if comm == "wechat" || comm == "weixin" {
if let Ok(pid) = name_str.parse::<u32>() {
return Some(pid);
}
}
}
}
None
}
fn parse_maps(pid: u32) -> Result<Vec<(u64, u64)>> {
let maps_path = format!("/proc/{}/maps", pid);
let content = std::fs::read_to_string(&maps_path)
.with_context(|| format!("读取 {} 失败", maps_path))?;
let mut regions = Vec::new();
for line in content.lines() {
let parts: Vec<&str> = line.splitn(2, ' ').collect();
if parts.len() < 2 {
continue;
}
let perms = parts[1].trim_start();
if !perms.starts_with("rw") {
continue;
}
let addr_parts: Vec<&str> = parts[0].splitn(2, '-').collect();
if addr_parts.len() != 2 {
continue;
}
if let (Ok(start), Ok(end)) = (
u64::from_str_radix(addr_parts[0], 16),
u64::from_str_radix(addr_parts[1], 16),
) {
regions.push((start, end));
}
}
Ok(regions)
}
fn scan_region(
mem: &mut std::fs::File,
start: u64,
end: u64,
templates: &[[u8; 16]],
seen: &mut HashSet<[u8; 16]>,
) -> Result<Option<[u8; 16]>> {
let total_len = (end - start) as usize;
let overlap = 31usize;
let mut offset = 0usize;
loop {
if offset >= total_len {
break;
}
let chunk_size = std::cmp::min(CHUNK_SIZE, total_len - offset);
let addr = start + offset as u64;
if mem.seek(SeekFrom::Start(addr)).is_err() {
break;
}
let mut buf = vec![0u8; chunk_size];
match mem.read(&mut buf) {
Ok(n) if n > 0 => {
buf.truncate(n);
if let Some(key) = scan_candidate_buffer(&buf, templates, seen) {
return Ok(Some(key));
}
}
_ => {}
}
if chunk_size > overlap {
offset += chunk_size - overlap;
} else {
offset += chunk_size;
}
}
Ok(None)
}
fn scan_candidate_buffer(
buf: &[u8],
templates: &[[u8; 16]],
seen: &mut HashSet<[u8; 16]>,
) -> Option<[u8; 16]> {
for candidate in ascii_alnum_candidates(buf, 32) {
let mut key = [0u8; 16];
key.copy_from_slice(&candidate[..16]);
if seen.insert(key) && verify_aes_key(&key, templates) {
return Some(key);
}
}
for candidate in ascii_alnum_candidates(buf, 16) {
let mut key = [0u8; 16];
key.copy_from_slice(candidate);
if seen.insert(key) && verify_aes_key(&key, templates) {
return Some(key);
}
}
None
}

View File

@ -5,7 +5,8 @@
//! + brute-force fallback`md5(str(uin))[:4] == wxid_suffix` 枚举 2^24
//! - Windows扫 `Weixin.exe` 内存,匹配 `[a-zA-Z0-9]{32}` 候选,按已知 AES ciphertext-block
//! 反验(`find_image_key.py` / `find_image_key.c` 已写实)
//! - Linux上游空白当前不实现遇到 V2 .dat 返回 unsupported 错误
//! - Linux从 `~/.xwechat/net/kvcomm/` 文件名提取 uin → `md5(str(uin) + wxid)[:16]` 派生,
//! + 内存扫描 fallback
#[cfg(target_os = "linux")]
pub mod linux;
@ -59,7 +60,7 @@ pub fn default_provider() -> Option<Box<dyn ImageKeyProvider + Send + Sync>> {
}
#[cfg(target_os = "linux")]
{
return Some(Box::new(linux::LinuxImageKeyProvider));
return Some(Box::new(linux::LinuxImageKeyProvider::from_current_config()));
}
#[cfg(not(any(target_os = "macos", target_os = "windows", target_os = "linux")))]
{