From 25d6a44082620da97681d25e8870e8cb9e9b3470 Mon Sep 17 00:00:00 2001 From: wangzhuc Date: Thu, 2 Apr 2026 00:34:02 +0800 Subject: [PATCH] feat: add article content extraction with anti-scraping fallback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - New `scripts/fetch_article.py`: extract WeChat article content as Markdown with three-level fetch strategy (requests → Playwright → manual HTML) - Refactor `learn_theme.py` to reuse `fetch_article.fetch_html()`, removing duplicate fetch logic - Update SKILL.md: add "学习这篇文章/导入范文" auxiliary function - Update README.md: add article extraction to feature table and directory tree Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 4 +- SKILL.md | 1 + dist/openclaw/SKILL.md | 9 +- dist/openclaw/config.example.yaml | 72 ++- dist/openclaw/references/visual-prompts.md | 159 ++++- dist/openclaw/scripts/fetch_article.py | 323 ++++++++++ dist/openclaw/scripts/learn_theme.py | 31 +- dist/openclaw/toolkit/image_gen.py | 676 ++++++++++++++++----- scripts/fetch_article.py | 323 ++++++++++ scripts/learn_theme.py | 31 +- 10 files changed, 1407 insertions(+), 222 deletions(-) create mode 100644 dist/openclaw/scripts/fetch_article.py create mode 100644 scripts/fetch_article.py diff --git a/README.md b/README.md index dd98afb..389dba4 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ | 范文风格库 | SICO 式 few-shot:从你的文章提取风格指纹,写作时注入 | `scripts/extract_exemplar.py` | | 风格飞轮 | 学习你的修改,越用越像你 | `references/learn-edits.md` | | 排版学习 | 从任意公众号文章 URL 提取排版主题 | `scripts/learn_theme.py` | +| 文章采集 | 从公众号 URL 提取正文为 Markdown,可导入范文库 | `scripts/fetch_article.py` | ## 写作人格 @@ -183,6 +184,7 @@ wewrite/ │ ├── humanness_score.py # 文章质量打分(11 项检测,供自检和 Step 5 使用) │ ├── extract_exemplar.py # 范文风格提取(SICO 式 few-shot 建库) │ ├── learn_theme.py # 从公众号文章 URL 提取排版主题 +│ ├── fetch_article.py # 从公众号 URL 提取正文为 Markdown │ ├── diagnose.py # 配置完备度检查 │ └── build_openclaw.py # SKILL.md → OpenClaw 格式转换 │ @@ -192,7 +194,7 @@ wewrite/ │ ├── theme.py # YAML 主题引擎 │ ├── publisher.py # 微信草稿箱 API + 小绿书图片帖 │ ├── wechat_api.py # access_token / 图片上传 -│ ├── image_gen.py # AI 图片生成(doubao / OpenAI / Gemini) +│ ├── image_gen.py # AI 图片生成(9 provider,自动 fallback) │ └── themes/ # 16+ 排版主题(含暗黑模式,可从文章学习新增) │ ├── personas/ # 5 套写作人格预设(含朱雀实测数据) diff --git a/SKILL.md b/SKILL.md index 15f241a..27a2c3f 100644 --- a/SKILL.md +++ b/SKILL.md @@ -49,6 +49,7 @@ allowed-tools: - **本地修改**(默认):用户在 `output/` 的 markdown 文件中修改 - **微信草稿箱同步**:`python3 {skill_dir}/scripts/learn_edits.py --from-wechat`,自动从草稿箱拉回最新内容,与本地原文做纯文本 diff - 用户说"学习排版"/"学排版" → `python3 {skill_dir}/scripts/learn_theme.py --name `,用户需提供一个公众号文章 URL 和主题名称。提取完成后提示用户设置 `style.yaml` 的 `theme` 字段。 +- 用户说"学习这篇文章"/"导入范文" + URL → `python3 {skill_dir}/scripts/fetch_article.py -o /tmp/article.md && python3 {skill_dir}/scripts/extract_exemplar.py /tmp/article.md -s <账号名>`,从公众号文章 URL 提取正文并导入范文库。支持三级降级(requests → Playwright → 手动 HTML)。 - 用户说"看看文章数据" → `读取: {skill_dir}/references/effect-review.md` - 用户说"检查一下"/"自检"/"这篇文章怎么样" → 对最近一篇生成的文章(或用户指定的文章)执行自检,输出生成报告: diff --git a/dist/openclaw/SKILL.md b/dist/openclaw/SKILL.md index 81a774b..5a87ab6 100644 --- a/dist/openclaw/SKILL.md +++ b/dist/openclaw/SKILL.md @@ -40,6 +40,7 @@ description: | - **本地修改**(默认):用户在 `output/` 的 markdown 文件中修改 - **微信草稿箱同步**:`python3 {baseDir}/scripts/learn_edits.py --from-wechat`,自动从草稿箱拉回最新内容,与本地原文做纯文本 diff - 用户说"学习排版"/"学排版" → `python3 {baseDir}/scripts/learn_theme.py --name `,用户需提供一个公众号文章 URL 和主题名称。提取完成后提示用户设置 `style.yaml` 的 `theme` 字段。 +- 用户说"学习这篇文章"/"导入范文" + URL → `python3 {baseDir}/scripts/fetch_article.py -o /tmp/article.md && python3 {baseDir}/scripts/extract_exemplar.py /tmp/article.md -s <账号名>`,从公众号文章 URL 提取正文并导入范文库。支持三级降级(requests → Playwright → 手动 HTML)。 - 用户说"看看文章数据" → `读取: {baseDir}/references/effect-review.md` - 用户说"检查一下"/"自检"/"这篇文章怎么样" → 对最近一篇生成的文章(或用户指定的文章)执行自检,输出生成报告: @@ -98,7 +99,7 @@ python3 -c "import markdown, bs4, cssutils, requests, yaml, pygments, PIL" 2>&1 | `config.yaml` 存在 | 静默 | 引导创建,或设 `skip_publish = true` | | Python 依赖 | 静默 | 提供 `pip install -r requirements.txt` | | `wechat.appid` + `secret` | 静默 | 设 `skip_publish = true` | -| `image.api_key` | 静默 | 设 `skip_image_gen = true` | +| `image.api_key` 或 `image.providers` 至少一项有效 | 静默 | 设 `skip_image_gen = true` | | `references/exemplars/index.yaml` | 静默 | 提示:"范文库为空。如果你有已发布的文章(markdown),可以说**'导入范文'**建立风格库,写出来的文章会更像你。没有也不影响使用。" | **1.2 版本检查**(静默通过或提醒): @@ -377,9 +378,11 @@ python3 {baseDir}/scripts/humanness_score.py {article_path} --json --tier3 {agen - **交互模式**:展示封面,问用户"封面效果如何?"。用户 OK → 继续;不满意 → 调整提示词重新生成。 - **全自动模式**:agent 自检——提示词中的实体是否在画面描述中可识别?如果提示词过于泛化(仅含"科技感""未来感"等抽象词,无具体实体),换一组提示词重试 1 次。 -**6.4 内文配图**:分析文章结构,生成 3-6 张内文配图提示词(按 visual-prompts.md)。风格、色调、画风沿用封面,保持视觉一致。批量调用 image_gen.py,替换 Markdown 占位符。 +**6.3b 风格锚定**:封面确认后,提取视觉锚点(色板 hex、风格关键词、画面调性),后续所有内文配图的提示词必须引用这组锚点,保证全文视觉一致。 -**降级**:生图失败 → 输出提示词 + 备选图库关键词,继续。 +**6.4 内文配图**:分析文章结构,为每个需要配图的段落选择图片类型(infographic/scene/flowchart/comparison/framework/timeline),使用对应的结构化提示词模板生成 3-6 张配图提示词(按 visual-prompts.md)。批量调用 image_gen.py,替换 Markdown 占位符。 + +**降级**:image_gen.py 支持多 provider 自动 fallback(按 config.yaml 中 providers 列表顺序尝试)。全部失败 → 输出提示词 + 备选图库关键词,继续。 --- diff --git a/dist/openclaw/config.example.yaml b/dist/openclaw/config.example.yaml index 9562d9f..194c979 100644 --- a/dist/openclaw/config.example.yaml +++ b/dist/openclaw/config.example.yaml @@ -8,27 +8,63 @@ wechat: author: "" # 默认署名(可选) # AI 图片生成 +# 支持 9 个 provider,配一个就能用,配多个自动 fallback。 +# +# ┌─────────────────┬────────────────────────────────────────────────┬────────────────────┐ +# │ Provider │ 获取 API Key │ 特点 │ +# ├─────────────────┼────────────────────────────────────────────────┼────────────────────┤ +# │ doubao │ https://console.volcengine.com/ark │ 中文提示词最优 │ +# │ dashscope │ https://dashscope.console.aliyun.com/ │ 阿里通义万相 │ +# │ jimeng │ https://console.volcengine.com/iam │ 字节即梦,中文强 │ +# │ minimax │ https://platform.minimaxi.com/ │ 国内 provider │ +# │ openai │ https://platform.openai.com/api-keys │ DALL-E,通用性强 │ +# │ azure_openai │ Azure Portal │ 国内可访问的 OpenAI│ +# │ gemini │ https://aistudio.google.com/apikey │ 免费额度较多 │ +# │ openrouter │ https://openrouter.ai/settings/keys │ 多模型代理 │ +# │ replicate │ https://replicate.com/account/api-tokens │ 开源模型丰富 │ +# └─────────────────┴────────────────────────────────────────────────┴────────────────────┘ +# +# 支持两种配置方式: + +# 方式一:单 provider(简单用法,填一个就行) image: - # 可选 provider: doubao | openai | gemini - provider: "doubao" + provider: "doubao" # 见上表 Provider 列 api_key: "your_api_key" + # model: "doubao-seedream-5-0-260128" # 可选,各 provider 有默认值 + # base_url: "https://ark.cn-beijing.volces.com/api/v3" # 可选 - # doubao-seedream(默认) - # 获取 API key: https://console.volcengine.com/ark - # model: "doubao-seedream-5-0-260128" - # base_url: "https://ark.cn-beijing.volces.com/api/v3" - - # OpenAI DALL-E 3 - # provider: "openai" - # api_key: "sk-..." - # model: "dall-e-3" - # base_url: "https://api.openai.com/v1" - - # Google Gemini Imagen - # provider: "gemini" - # api_key: "AIza..." - # 获取 API key: https://aistudio.google.com/apikey - # model: "gemini-3.1-flash-image-preview" +# 方式二:多 provider 自动 fallback(推荐) +# 按顺序尝试,第一个失败自动切换下一个,不需要全部填写 +# image: +# providers: +# - provider: doubao +# api_key: "your_volcengine_key" +# - provider: dashscope +# api_key: "your_dashscope_key" +# # model: "qwen-image-2.0-pro" +# - provider: jimeng +# api_key: "your_access_key_id" # 即梦需要 access_key_id + secret_key +# secret_key: "your_secret_access_key" +# # model: "jimeng_t2i_v40" +# - provider: minimax +# api_key: "your_minimax_key" +# # model: "image-01" +# - provider: openai +# api_key: "sk-..." +# # model: "dall-e-3" +# - provider: azure_openai +# api_key: "your_azure_key" +# base_url: "https://YOUR-RESOURCE.openai.azure.com/openai" # 必填 +# # deployment: "dall-e-3" +# - provider: gemini +# api_key: "AIza..." +# # model: "gemini-3.1-flash-image-preview" +# - provider: openrouter +# api_key: "sk-or-..." +# # model: "google/gemini-3.1-flash-image-preview" +# - provider: replicate +# api_key: "r8_..." +# # model: "google/nano-banana-pro" # 默认排版主题 theme: "professional-clean" diff --git a/dist/openclaw/references/visual-prompts.md b/dist/openclaw/references/visual-prompts.md index f212b57..8431f11 100644 --- a/dist/openclaw/references/visual-prompts.md +++ b/dist/openclaw/references/visual-prompts.md @@ -73,6 +73,24 @@ --- +## 风格锚定 + +封面确认后,**立即提取视觉锚点**,后续所有内文配图必须复用: + +``` +视觉锚点: +- 色板:{封面的主色 hex + 辅色 hex,如 #2563EB + #F97316} +- 风格关键词:{封面的风格描述,如 "flat illustration, minimalist, bold outlines"} +- 画面调性:{冷调/暖调/中性} +``` + +**规则**: +- 每条内文配图提示词的末尾,必须附加视觉锚点中的色板和风格关键词 +- 如果封面是暖调,内文配图不能突然切换为冷调科技风(反之亦然) +- 视觉锚点在整篇文章的所有配图中保持一致 + +--- + ## 二、内文配图(3-6 张) ### 分析流程 @@ -94,7 +112,20 @@ | 转折/高潮处 → 视觉冲击 | 紧接着另一张配图(间距不足300字) | | 长段落后(>400字无图) → 节奏调节 | 结尾 CTA 段落 | -**第三步:确定位置** +**第三步:确定图片类型** + +根据段落内容,为每张配图选择最匹配的类型: + +| 类型 | 适用内容 | 核心构图 | +|------|---------|---------| +| infographic | 数据、统计、指标对比 | 区域分块 + 标签标注 | +| scene | 叙事场景、情绪渲染、人物故事 | 焦点主体 + 氛围光影 | +| flowchart | 流程、步骤、工作流 | 步骤节点 + 连接箭头 | +| comparison | 两个方案/观点对比 | 左右分栏 + 分隔线 | +| framework | 概念模型、架构关系 | 层级节点 + 关系连线 | +| timeline | 时间线、发展历程 | 时间轴 + 里程碑标记 | + +**第四步:确定位置** - 配图插入在对应段落**之后**(不是之前) - 具体到"H2 XX 下的第 N 段之后" @@ -104,24 +135,132 @@ - 不要在文章第一段之前放配图 - 不要在结尾 CTA 段落放配图 -### 提示词格式 +### 结构化提示词模板 -每张输出: +根据图片类型,使用对应的结构化模板生成提示词。**禁止自由文本描述**——所有提示词必须填写模板的每个字段。 + +#### infographic(信息图) ``` ### 配图 {序号}: 位于「{H2标题}」第{N}段后 -- 配图目的:{信息强化/场景还原/节奏调节} -- 对应内容:{这段讲了什么,1句话概括} -- 画面描述:{具体的画面内容,80-120字} -- AI 绘图提示词: - "{中文提示词,给 doubao-seedream 用}" +- 类型:infographic +- 对应内容:{1句话概括} + +Layout: {grid / radial / hierarchical} +Zones: + - Zone 1: {具体数据点,用文章真实数字} + - Zone 2: {对比/趋势,用文章真实数字} + - Zone 3: {结论/要点} +Labels: {文章中的真实数字、术语、指标名} +Colors: {视觉锚点色板} +Style: {视觉锚点风格关键词}, clean infographic, no text +Aspect: 16:9 + - 备选方案:{Unsplash/Pexels 搜索关键词} ``` -### 内文配图的特殊要求 +#### scene(场景) + +``` +### 配图 {序号}: 位于「{H2标题}」第{N}段后 +- 类型:scene +- 对应内容:{1句话概括} + +Focal Point: {画面主体,必须是文章实体} +Atmosphere: {光影、环境、时间} +Mood: {情绪基调} +Color Temperature: {warm / cool / neutral,与视觉锚点一致} +Style: {视觉锚点风格关键词}, no text no letters +Aspect: 16:9 + +- 备选方案:{Unsplash/Pexels 搜索关键词} +``` + +#### flowchart(流程图) + +``` +### 配图 {序号}: 位于「{H2标题}」第{N}段后 +- 类型:flowchart +- 对应内容:{1句话概括} + +Layout: {left-right / top-down / circular} +Steps: + 1. {步骤名} — {简述} + 2. {步骤名} — {简述} + 3. {步骤名} — {简述} +Connections: {箭头方向、决策分支} +Colors: {视觉锚点色板} +Style: {视觉锚点风格关键词}, clean diagram, no text +Aspect: 16:9 + +- 备选方案:{Unsplash/Pexels 搜索关键词} +``` + +#### comparison(对比图) + +``` +### 配图 {序号}: 位于「{H2标题}」第{N}段后 +- 类型:comparison +- 对应内容:{1句话概括} + +Left Side — {选项A名称}: + - {要点1} + - {要点2} +Right Side — {选项B名称}: + - {要点1} + - {要点2} +Divider: {分隔线样式} +Colors: {视觉锚点色板,左右各用一个主色} +Style: {视觉锚点风格关键词}, split layout, no text +Aspect: 16:9 + +- 备选方案:{Unsplash/Pexels 搜索关键词} +``` + +#### framework(架构图) + +``` +### 配图 {序号}: 位于「{H2标题}」第{N}段后 +- 类型:framework +- 对应内容:{1句话概括} + +Structure: {hierarchical / network / matrix} +Nodes: + - {概念1} — {角色} + - {概念2} — {角色} + - {概念3} — {角色} +Relationships: {节点间如何连接} +Colors: {视觉锚点色板} +Style: {视觉锚点风格关键词}, clean diagram, no text +Aspect: 16:9 + +- 备选方案:{Unsplash/Pexels 搜索关键词} +``` + +#### timeline(时间线) + +``` +### 配图 {序号}: 位于「{H2标题}」第{N}段后 +- 类型:timeline +- 对应内容:{1句话概括} + +Direction: {horizontal / vertical} +Events: + - {时间点1}: {里程碑} + - {时间点2}: {里程碑} + - {时间点3}: {里程碑} +Markers: {视觉标记样式} +Colors: {视觉锚点色板} +Style: {视觉锚点风格关键词}, clean timeline, no text +Aspect: 16:9 + +- 备选方案:{Unsplash/Pexels 搜索关键词} +``` + +### 内文配图通用要求 - 尺寸统一 **16:9 横版**(image_gen.py --size article) -- **风格一致性**:沿用封面确定的色调、画风、视觉语言。在每条提示词中显式复用封面的风格描述(如 "flat illustration, blue-orange palette, minimalist") +- **视觉锚定**:每条提示词的 Colors 和 Style 字段必须引用封面提取的视觉锚点 - 实体锚定规则同封面——每条提示词至少包含 2 个文章实体 - 不要太复杂——手机屏幕上看,简洁的图比复杂的图好 - 提示词用中文(seedream 中文理解强) diff --git a/dist/openclaw/scripts/fetch_article.py b/dist/openclaw/scripts/fetch_article.py new file mode 100644 index 0000000..811cf3a --- /dev/null +++ b/dist/openclaw/scripts/fetch_article.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +"""fetch_article.py — extract WeChat article content as Markdown. + +Three-level fetching strategy: + Level 1: requests (fast, zero overhead, works for most articles) + Level 2: Playwright headless Chrome (bypasses anti-scraping JS checks) + Level 3: Prompt user to save HTML manually and pass via --file + +Usage: + python3 scripts/fetch_article.py # auto fetch + python3 scripts/fetch_article.py -o article.md # save to file + python3 scripts/fetch_article.py --file saved.html # from local HTML + python3 scripts/fetch_article.py --json # JSON output for agent +""" + +import argparse +import json +import re +import sys +from pathlib import Path + +import requests +from bs4 import BeautifulSoup, NavigableString + +_BROWSER_UA = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" +) + + +# --------------------------------------------------------------------------- +# Fetching: three-level strategy +# --------------------------------------------------------------------------- + +def _fetch_requests(url: str, timeout: int = 20) -> str | None: + """Level 1: plain requests. Returns HTML string or None on failure.""" + try: + resp = requests.get(url, headers={"User-Agent": _BROWSER_UA}, timeout=timeout) + resp.raise_for_status() + resp.encoding = "utf-8" + return resp.text + except requests.exceptions.RequestException: + return None + + +def _fetch_playwright(url: str, timeout: int = 30000) -> str | None: + """Level 2: Playwright headless Chrome. Returns HTML or None.""" + try: + from playwright.sync_api import sync_playwright + except ImportError: + return None + + try: + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page(user_agent=_BROWSER_UA) + page.goto(url, wait_until="networkidle", timeout=timeout) + # Wait for WeChat content to render + page.wait_for_selector("#js_content", timeout=10000) + html = page.content() + browser.close() + return html + except Exception: + return None + + +def fetch_html(url: str) -> str: + """Fetch article HTML with automatic fallback. + + Returns HTML string. Exits with error if all levels fail. + """ + # Level 1 + html = _fetch_requests(url) + if html and _has_content(html): + return html + + # Level 2 + print("requests 未获取到正文,尝试 Playwright...", file=sys.stderr) + html = _fetch_playwright(url) + if html and _has_content(html): + return html + + # Level 3 + print( + "Error: 无法获取文章内容。请在浏览器中打开文章 → 右键另存为 HTML → 使用 --file 参数传入。", + file=sys.stderr, + ) + sys.exit(1) + + +def _has_content(html: str) -> bool: + """Check if HTML contains non-empty #js_content.""" + soup = BeautifulSoup(html, "html.parser") + content = soup.find(id="js_content") + if content is None: + return False + text = content.get_text(strip=True) + return len(text) > 50 # must have real content, not just whitespace + + +# --------------------------------------------------------------------------- +# HTML → Markdown conversion +# --------------------------------------------------------------------------- + +def _extract_metadata(soup: BeautifulSoup) -> dict: + """Extract article metadata from WeChat page.""" + title_tag = soup.find("h1", class_="rich_media_title") or soup.find( + "h1", id="activity-name" + ) + title = title_tag.get_text(strip=True) if title_tag else "" + + author_tag = soup.find("a", id="js_name") or soup.find( + "span", class_="rich_media_meta_nickname" + ) + author = author_tag.get_text(strip=True) if author_tag else "" + + # Publish time + pub_tag = soup.find("em", id="publish_time") + pub_time = pub_tag.get_text(strip=True) if pub_tag else "" + + return {"title": title, "author": author, "publish_time": pub_time} + + +def _elem_to_md(elem, depth: int = 0) -> str: + """Convert a single HTML element to Markdown.""" + tag = elem.name if hasattr(elem, "name") else None + + if isinstance(elem, NavigableString): + text = str(elem).strip() + return text if text else "" + + if tag is None: + return "" + + # Skip hidden/empty elements + style = elem.get("style", "") + if "display:none" in style.replace(" ", "").lower(): + return "" + if "visibility:hidden" in style.replace(" ", "").lower(): + return "" + + # Get inner content recursively + inner = "" + for child in elem.children: + inner += _elem_to_md(child, depth + 1) + + inner = inner.strip() + if not inner: + return "" + + # Headings + if tag in ("h1", "h2", "h3", "h4"): + level = int(tag[1]) + return f"\n\n{'#' * level} {inner}\n\n" + + # Paragraphs + if tag == "p": + return f"\n\n{inner}\n\n" + + # Line breaks + if tag == "br": + return "\n" + + # Bold + if tag in ("strong", "b"): + return f"**{inner}**" + + # Italic + if tag in ("em", "i"): + return f"*{inner}*" + + # Links + if tag == "a": + href = elem.get("href", "") + if href and not href.startswith("javascript:"): + return f"[{inner}]({href})" + return inner + + # Images + if tag == "img": + src = elem.get("data-src") or elem.get("src") or "" + alt = elem.get("alt", "") + if src: + return f"\n\n![{alt}]({src})\n\n" + return "" + + # Blockquotes + if tag == "blockquote": + lines = inner.split("\n") + quoted = "\n".join(f"> {line}" for line in lines if line.strip()) + return f"\n\n{quoted}\n\n" + + # Lists + if tag in ("ul", "ol"): + return f"\n\n{inner}\n\n" + if tag == "li": + parent = elem.parent + if parent and parent.name == "ol": + # Ordered list — position tracking is imperfect but functional + return f"1. {inner}\n" + return f"- {inner}\n" + + # Code + if tag == "code": + if elem.parent and elem.parent.name == "pre": + return inner + return f"`{inner}`" + if tag == "pre": + return f"\n\n```\n{inner}\n```\n\n" + + # Horizontal rule + if tag == "hr": + return "\n\n---\n\n" + + # Section / div / span — pass through + if tag in ("section", "div", "span", "article", "main", "figure", + "figcaption", "table", "thead", "tbody", "tr"): + return inner + + # Table cells + if tag in ("td", "th"): + return f" {inner} |" + + return inner + + +def html_to_markdown(soup: BeautifulSoup) -> str: + """Convert WeChat article HTML to clean Markdown.""" + content = soup.find(id="js_content") + if content is None: + return "" + + raw = _elem_to_md(content) + + # Clean up excessive whitespace + md = re.sub(r"\n{3,}", "\n\n", raw) + md = md.strip() + return md + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def fetch_article(url: str = None, file_path: str = None) -> dict: + """Fetch and parse a WeChat article. + + Args: + url: WeChat article URL. + file_path: Path to saved HTML file (alternative to URL). + + Returns: + dict with keys: title, author, publish_time, markdown, url + """ + if file_path: + html = Path(file_path).read_text(encoding="utf-8") + elif url: + html = fetch_html(url) + else: + raise ValueError("Either url or file_path must be provided") + + soup = BeautifulSoup(html, "html.parser") + meta = _extract_metadata(soup) + md = html_to_markdown(soup) + + return { + "title": meta["title"], + "author": meta["author"], + "publish_time": meta["publish_time"], + "markdown": md, + "url": url or "", + } + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + ap = argparse.ArgumentParser( + description="Extract WeChat article content as Markdown." + ) + ap.add_argument("url", nargs="?", help="WeChat article URL") + ap.add_argument("--file", dest="file_path", + help="Local HTML file instead of URL") + ap.add_argument("-o", "--output", help="Save Markdown to file") + ap.add_argument("--json", dest="as_json", action="store_true", + help="Output as JSON (for agent use)") + args = ap.parse_args() + + if not args.url and not args.file_path: + ap.error("Provide a URL or --file path") + + result = fetch_article(url=args.url, file_path=args.file_path) + + if args.as_json: + print(json.dumps(result, ensure_ascii=False, indent=2)) + elif args.output: + # Write Markdown with YAML frontmatter + out = Path(args.output) + frontmatter = f"---\ntitle: \"{result['title']}\"\nauthor: \"{result['author']}\"\n" + if result["publish_time"]: + frontmatter += f"date: \"{result['publish_time']}\"\n" + if result["url"]: + frontmatter += f"source: \"{result['url']}\"\n" + frontmatter += "---\n\n" + out.write_text(frontmatter + result["markdown"], encoding="utf-8") + print(f"Saved: {out}") + else: + if result["title"]: + print(f"# {result['title']}\n") + if result["author"]: + print(f"> {result['author']}") + if result["publish_time"]: + print(f"> {result['publish_time']}") + if result["author"] or result["publish_time"]: + print() + print(result["markdown"]) + + +if __name__ == "__main__": + main() diff --git a/dist/openclaw/scripts/learn_theme.py b/dist/openclaw/scripts/learn_theme.py index e0ef85d..ce32978 100644 --- a/dist/openclaw/scripts/learn_theme.py +++ b/dist/openclaw/scripts/learn_theme.py @@ -12,7 +12,6 @@ import sys from collections import Counter from pathlib import Path -import requests import yaml from bs4 import BeautifulSoup @@ -154,12 +153,6 @@ _TARGET_TAGS = { "blockquote", "code", "pre", "img", "a", } -_BROWSER_UA = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/124.0.0.0 Safari/537.36" -) - TEMPLATE_THEME = "professional-clean" THEMES_DIR = Path(__file__).resolve().parent.parent / "toolkit" / "themes" @@ -175,26 +168,20 @@ def _attach_title(soup, content) -> None: def fetch_article(url: str, timeout: int = 20) -> "BeautifulSoup tag": """Fetch a WeChat article, return the ``#js_content`` element. - The article title is attached as ``content._wewrite_title`` (empty string - if not found). Exits with code 1 on network errors or missing content. + Delegates to fetch_article.fetch_html() for three-level fetching + (requests → Playwright → manual fallback). - Parameters - ---------- - url: WeChat article URL (mp.weixin.qq.com/…) - timeout: HTTP request timeout in seconds (default 20). + The article title is attached as ``content._wewrite_title`` (empty string + if not found). """ - try: - resp = requests.get(url, headers={"User-Agent": _BROWSER_UA}, timeout=timeout) - resp.raise_for_status() - except requests.exceptions.RequestException as exc: - print(f"Error: failed to fetch URL: {exc}", file=sys.stderr) - sys.exit(1) - resp.encoding = "utf-8" - soup = BeautifulSoup(resp.text, "html.parser") + from scripts.fetch_article import fetch_html + + html = fetch_html(url) + soup = BeautifulSoup(html, "html.parser") content = soup.find(id="js_content") if content is None: - print("Error: #js_content not found — the page may require verification.", file=sys.stderr) + print("Error: #js_content not found.", file=sys.stderr) sys.exit(1) _attach_title(soup, content) diff --git a/dist/openclaw/toolkit/image_gen.py b/dist/openclaw/toolkit/image_gen.py index a59b317..7b3a69e 100644 --- a/dist/openclaw/toolkit/image_gen.py +++ b/dist/openclaw/toolkit/image_gen.py @@ -6,6 +6,12 @@ Supports multiple providers via a simple abstraction: - doubao-seedream (Volcengine Ark) — default, good for Chinese prompts - openai (DALL-E 3) — broad availability - gemini (Google Gemini Imagen) — multimodal image generation + - dashscope (Alibaba Tongyi Wanxiang) — good for Chinese prompts + - minimax — Chinese provider + - replicate — open-source models + - azure_openai — Azure-hosted DALL-E + - openrouter — multi-model proxy + - jimeng (ByteDance) — good for Chinese prompts - Custom providers via ImageProvider base class Usage as CLI: @@ -21,8 +27,12 @@ Usage as module: import abc import argparse import base64 +import hashlib +import hmac import json import sys +import time +from datetime import datetime, timezone from pathlib import Path import requests @@ -51,11 +61,31 @@ def _load_config() -> dict: # Cover: 2.35:1 微信封面比例 # Article: 16:9 横版内文配图 # Vertical: 9:16 竖版 +_DEFAULT = "1792x1024" +_DEFAULT_V = "1024x1792" +_DEFAULT_SQ = "1024x1024" + SIZE_PRESETS = { - "cover": {"doubao": "2952x1256", "openai": "1792x1024", "gemini": "1792x1024"}, - "article": {"doubao": "2560x1440", "openai": "1792x1024", "gemini": "1792x1024"}, - "vertical": {"doubao": "1088x2560", "openai": "1024x1792", "gemini": "1024x1792"}, - "square": {"doubao": "2048x2048", "openai": "1024x1024", "gemini": "1024x1024"}, + "cover": { + "doubao": "2952x1256", "openai": _DEFAULT, "gemini": _DEFAULT, + "dashscope": _DEFAULT, "minimax": _DEFAULT, "replicate": _DEFAULT, + "azure_openai": _DEFAULT, "openrouter": _DEFAULT, "jimeng": _DEFAULT, + }, + "article": { + "doubao": "2560x1440", "openai": _DEFAULT, "gemini": _DEFAULT, + "dashscope": _DEFAULT, "minimax": _DEFAULT, "replicate": _DEFAULT, + "azure_openai": _DEFAULT, "openrouter": _DEFAULT, "jimeng": _DEFAULT, + }, + "vertical": { + "doubao": "1088x2560", "openai": _DEFAULT_V, "gemini": _DEFAULT_V, + "dashscope": _DEFAULT_V, "minimax": _DEFAULT_V, "replicate": _DEFAULT_V, + "azure_openai": _DEFAULT_V, "openrouter": _DEFAULT_V, "jimeng": _DEFAULT_V, + }, + "square": { + "doubao": "2048x2048", "openai": _DEFAULT_SQ, "gemini": _DEFAULT_SQ, + "dashscope": _DEFAULT_SQ, "minimax": _DEFAULT_SQ, "replicate": _DEFAULT_SQ, + "azure_openai": _DEFAULT_SQ, "openrouter": _DEFAULT_SQ, "jimeng": _DEFAULT_SQ, + }, } MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB @@ -79,6 +109,29 @@ def _compress_image(raw_bytes: bytes, max_size: int) -> bytes: return buf.getvalue() +def _size_to_aspect(size: str) -> str: + """Convert 'WxH' to nearest standard aspect ratio string.""" + if ":" in size: + return size + try: + w, h = (int(x) for x in size.split("x", 1)) + except ValueError: + return "16:9" + ratio = w / h + for ar, val in [("1:1", 1.0), ("16:9", 16/9), ("9:16", 9/16), + ("4:3", 4/3), ("3:4", 3/4), ("3:2", 3/2), ("2:3", 2/3)]: + if abs(ratio - val) < 0.15: + return ar + return "16:9" + + +def _download_image(url: str) -> bytes: + """Download image bytes from URL.""" + resp = requests.get(url, timeout=60) + resp.raise_for_status() + return resp.content + + # --- Provider abstraction --- class ImageProvider(abc.ABC): @@ -86,15 +139,7 @@ class ImageProvider(abc.ABC): @abc.abstractmethod def generate(self, prompt: str, size: str) -> bytes: - """Generate an image and return raw bytes. - - Args: - prompt: Image description (Chinese or English). - size: Resolved size string (e.g. "1792x1024"). - - Returns: - Raw image bytes. - """ + """Generate an image and return raw bytes.""" ... def resolve_size(self, preset: str) -> str: @@ -102,63 +147,45 @@ class ImageProvider(abc.ABC): provider_key = self.provider_key if preset in SIZE_PRESETS: return SIZE_PRESETS[preset].get(provider_key, list(SIZE_PRESETS[preset].values())[0]) - return preset # assume explicit WxH + return preset @property @abc.abstractmethod def provider_key(self) -> str: - """Short identifier used for size preset lookup.""" ... +# --- Providers --- + class DoubaoProvider(ImageProvider): """doubao-seedream via Volcengine Ark API.""" provider_key = "doubao" def __init__(self, api_key: str, model: str = "doubao-seedream-5-0-260128", - base_url: str = "https://ark.cn-beijing.volces.com/api/v3"): + base_url: str = "https://ark.cn-beijing.volces.com/api/v3", **_kw): self._api_key = api_key self._model = model self._base_url = base_url def generate(self, prompt: str, size: str) -> bytes: - body = { - "model": self._model, - "prompt": prompt, - "response_format": "url", - "size": size, - "stream": False, - "watermark": False, - } - resp = requests.post( f"{self._base_url}/images/generations", - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {self._api_key}", - }, - json=body, + headers={"Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}"}, + json={"model": self._model, "prompt": prompt, + "response_format": "url", "size": size, + "stream": False, "watermark": False}, timeout=120, ) - data = resp.json() if resp.status_code != 200: - error = data.get("error", {}) - msg = error.get("message", json.dumps(data, ensure_ascii=False)) - raise ValueError(f"Doubao API error ({resp.status_code}): {msg}") - - image_data = data.get("data", []) - if not image_data: - raise ValueError(f"No image returned: {json.dumps(data, ensure_ascii=False)}") - - image_url = image_data[0].get("url") - if not image_url: - raise ValueError(f"No image URL in response: {json.dumps(data, ensure_ascii=False)}") - - img_resp = requests.get(image_url, timeout=60) - img_resp.raise_for_status() - return img_resp.content + raise ValueError(f"Doubao error ({resp.status_code}): " + f"{data.get('error', {}).get('message', str(data))}") + url = data.get("data", [{}])[0].get("url") + if not url: + raise ValueError(f"No image URL: {data}") + return _download_image(url) class OpenAIProvider(ImageProvider): @@ -167,50 +194,28 @@ class OpenAIProvider(ImageProvider): provider_key = "openai" def __init__(self, api_key: str, model: str = "dall-e-3", - base_url: str = "https://api.openai.com/v1"): + base_url: str = "https://api.openai.com/v1", **_kw): self._api_key = api_key self._model = model self._base_url = base_url def generate(self, prompt: str, size: str) -> bytes: - # DALL-E 3 expects size as "WxH" format - dall_e_size = size.replace("x", "x") # normalize - - body = { - "model": self._model, - "prompt": prompt, - "n": 1, - "size": dall_e_size, - "response_format": "url", - } - resp = requests.post( f"{self._base_url}/images/generations", - headers={ - "Content-Type": "application/json", - "Authorization": f"Bearer {self._api_key}", - }, - json=body, + headers={"Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}"}, + json={"model": self._model, "prompt": prompt, + "n": 1, "size": size, "response_format": "url"}, timeout=120, ) - data = resp.json() if resp.status_code != 200: - error = data.get("error", {}) - msg = error.get("message", json.dumps(data, ensure_ascii=False)) - raise ValueError(f"OpenAI API error ({resp.status_code}): {msg}") - - image_data = data.get("data", []) - if not image_data: - raise ValueError(f"No image returned: {json.dumps(data, ensure_ascii=False)}") - - image_url = image_data[0].get("url") - if not image_url: - raise ValueError(f"No image URL in response: {json.dumps(data, ensure_ascii=False)}") - - img_resp = requests.get(image_url, timeout=60) - img_resp.raise_for_status() - return img_resp.content + raise ValueError(f"OpenAI error ({resp.status_code}): " + f"{data.get('error', {}).get('message', str(data))}") + url = data.get("data", [{}])[0].get("url") + if not url: + raise ValueError(f"No image URL: {data}") + return _download_image(url) class GeminiProvider(ImageProvider): @@ -219,47 +224,371 @@ class GeminiProvider(ImageProvider): provider_key = "gemini" def __init__(self, api_key: str, model: str = "gemini-3.1-flash-image-preview", - base_url: str = "https://generativelanguage.googleapis.com/v1beta"): + base_url: str = "https://generativelanguage.googleapis.com/v1beta", **_kw): self._api_key = api_key self._model = model self._base_url = base_url def generate(self, prompt: str, size: str) -> bytes: - # Append size instruction to prompt (Gemini doesn't have a native size param) if "x" in size: w, h = size.split("x", 1) prompt = f"{prompt}\n\nGenerate this image at {w}x{h} resolution." - - body = { - "contents": [{"parts": [{"text": prompt}]}], - "generationConfig": {"responseModalities": ["TEXT", "IMAGE"]}, - } resp = requests.post( f"{self._base_url}/models/{self._model}:generateContent", - headers={ - "Content-Type": "application/json", - "x-goog-api-key": self._api_key, - }, - json=body, + headers={"Content-Type": "application/json", + "x-goog-api-key": self._api_key}, + json={"contents": [{"parts": [{"text": prompt}]}], + "generationConfig": {"responseModalities": ["TEXT", "IMAGE"]}}, timeout=120, ) if resp.status_code != 200: + msg = resp.text[:200] try: - error = resp.json().get("error", {}) - msg = error.get("message", resp.text[:200]) - except (ValueError, KeyError): - msg = resp.text[:200] - raise ValueError(f"Gemini API error ({resp.status_code}): {msg}") + msg = resp.json().get("error", {}).get("message", msg) + except Exception: + pass + raise ValueError(f"Gemini error ({resp.status_code}): {msg}") + for part in resp.json().get("candidates", [{}])[0].get("content", {}).get("parts", []): + inline = part.get("inlineData") + if inline and inline.get("mimeType", "").startswith("image/"): + return base64.b64decode(inline["data"]) + raise ValueError("No image in Gemini response") + + +class DashScopeProvider(ImageProvider): + """Alibaba Tongyi Wanxiang (通义万相) via DashScope API.""" + + provider_key = "dashscope" + + def __init__(self, api_key: str, model: str = "qwen-image-2.0-pro", + base_url: str = "https://dashscope.aliyuncs.com/api/v1", **_kw): + self._api_key = api_key + self._model = model + self._base_url = base_url + + def generate(self, prompt: str, size: str) -> bytes: + ds_size = size.replace("x", "*") # DashScope uses "W*H" + resp = requests.post( + f"{self._base_url}/services/aigc/multimodal-generation/generation", + headers={"Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}"}, + json={ + "model": self._model, + "input": {"messages": [{"role": "user", "content": [{"text": prompt}]}]}, + "parameters": {"prompt_extend": False, "size": ds_size, "watermark": False}, + }, + timeout=120, + ) data = resp.json() - candidates = data.get("candidates", []) - if not candidates: - raise ValueError("No candidates in Gemini response") - parts = candidates[0].get("content", {}).get("parts", []) - for part in parts: - inline_data = part.get("inlineData") - if inline_data and inline_data.get("mimeType", "").startswith("image/"): - return base64.b64decode(inline_data["data"]) - raise ValueError("No image found in Gemini response parts") + if resp.status_code != 200: + raise ValueError(f"DashScope error ({resp.status_code}): " + f"{data.get('message', str(data))}") + # Try output.result_image first, then output.choices + output = data.get("output", {}) + img = output.get("result_image") + if not img: + choices = output.get("choices", []) + if choices: + for c in choices[0].get("message", {}).get("content", []): + if "image" in c: + img = c["image"] + break + if not img: + raise ValueError(f"No image in DashScope response: {data}") + if img.startswith("http"): + return _download_image(img) + return base64.b64decode(img) + + +class MiniMaxProvider(ImageProvider): + """MiniMax image generation.""" + + provider_key = "minimax" + + def __init__(self, api_key: str, model: str = "image-01", + base_url: str = "https://api.minimax.io/v1", **_kw): + self._api_key = api_key + self._model = model + self._base_url = base_url + + def generate(self, prompt: str, size: str) -> bytes: + w, h = 1024, 1024 + try: + w, h = (int(x) for x in size.split("x", 1)) + except ValueError: + pass + resp = requests.post( + f"{self._base_url}/image_generation", + headers={"Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}"}, + json={"model": self._model, "prompt": prompt, + "response_format": "base64", + "width": w, "height": h, "n": 1}, + timeout=120, + ) + data = resp.json() + if resp.status_code != 200: + raise ValueError(f"MiniMax error ({resp.status_code}): {data}") + b64_list = data.get("data", {}).get("image_base64", []) + if not b64_list: + raise ValueError(f"No image in MiniMax response: {data}") + return base64.b64decode(b64_list[0]) + + +class ReplicateProvider(ImageProvider): + """Replicate API — supports many open-source image models.""" + + provider_key = "replicate" + _POLL_INTERVAL = 2 + _POLL_TIMEOUT = 300 + + def __init__(self, api_key: str, model: str = "google/nano-banana-pro", + base_url: str = "https://api.replicate.com/v1", **_kw): + self._api_key = api_key + self._model = model + self._base_url = base_url + + def generate(self, prompt: str, size: str) -> bytes: + aspect = _size_to_aspect(size) + headers = {"Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}", + "Prefer": "wait=60"} + resp = requests.post( + f"{self._base_url}/models/{self._model}/predictions", + headers=headers, + json={"input": {"prompt": prompt, "aspect_ratio": aspect, + "number_of_images": 1, "output_format": "png"}}, + timeout=120, + ) + data = resp.json() + if resp.status_code not in (200, 201): + raise ValueError(f"Replicate error ({resp.status_code}): {data}") + + # Poll if not completed yet + poll_url = data.get("urls", {}).get("get") + deadline = time.monotonic() + self._POLL_TIMEOUT + while data.get("status") not in ("succeeded", "failed", "canceled"): + if time.monotonic() > deadline: + raise ValueError("Replicate polling timeout") + time.sleep(self._POLL_INTERVAL) + data = requests.get(poll_url, headers=headers, timeout=30).json() + + if data.get("status") != "succeeded": + raise ValueError(f"Replicate failed: {data.get('error')}") + + output = data.get("output") + if isinstance(output, list): + output = output[0] + if isinstance(output, dict): + output = output.get("url", output.get("uri")) + if not output or not isinstance(output, str): + raise ValueError(f"No image URL in Replicate output: {data}") + return _download_image(output) + + +class AzureOpenAIProvider(ImageProvider): + """Azure-hosted OpenAI DALL-E.""" + + provider_key = "azure_openai" + + def __init__(self, api_key: str, model: str = "dall-e-3", + base_url: str = "", deployment: str = "", **_kw): + self._api_key = api_key + self._deployment = deployment or model + self._base_url = base_url.rstrip("/") + + def generate(self, prompt: str, size: str) -> bytes: + if not self._base_url: + raise ValueError("Azure OpenAI requires base_url " + "(e.g. https://YOUR-RESOURCE.openai.azure.com/openai)") + resp = requests.post( + f"{self._base_url}/deployments/{self._deployment}" + f"/images/generations?api-version=2025-04-01-preview", + headers={"Content-Type": "application/json", + "api-key": self._api_key}, + json={"prompt": prompt, "size": size, "n": 1, "quality": "medium"}, + timeout=120, + ) + data = resp.json() + if resp.status_code != 200: + raise ValueError(f"Azure OpenAI error ({resp.status_code}): {data}") + item = data.get("data", [{}])[0] + if item.get("url"): + return _download_image(item["url"]) + if item.get("b64_json"): + return base64.b64decode(item["b64_json"]) + raise ValueError(f"No image in Azure response: {data}") + + +class OpenRouterProvider(ImageProvider): + """OpenRouter — multi-model proxy using chat completions format.""" + + provider_key = "openrouter" + + def __init__(self, api_key: str, model: str = "google/gemini-3.1-flash-image-preview", + base_url: str = "https://openrouter.ai/api/v1", **_kw): + self._api_key = api_key + self._model = model + self._base_url = base_url + + def generate(self, prompt: str, size: str) -> bytes: + aspect = _size_to_aspect(size) + resp = requests.post( + f"{self._base_url}/chat/completions", + headers={"Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}"}, + json={ + "model": self._model, + "messages": [{"role": "user", "content": prompt}], + "modalities": ["image"], + "stream": False, + "image_config": {"aspect_ratio": aspect}, + "provider": {"require_parameters": True}, + }, + timeout=120, + ) + data = resp.json() + if resp.status_code != 200: + raise ValueError(f"OpenRouter error ({resp.status_code}): {data}") + + # Extract image from multiple possible locations + choice = data.get("choices", [{}])[0].get("message", {}) + # Path 1: images array + images = choice.get("images", []) + if images: + img = images[0] + if img.startswith("http"): + return _download_image(img) + if img.startswith("data:"): + _, b64 = img.split(",", 1) + return base64.b64decode(b64) + # Path 2: content array with image items + content = choice.get("content", []) + if isinstance(content, list): + for item in content: + if isinstance(item, dict) and item.get("type") == "image": + url = item.get("url") or item.get("image_url", {}).get("url") + if url: + if url.startswith("data:"): + _, b64 = url.split(",", 1) + return base64.b64decode(b64) + return _download_image(url) + raise ValueError(f"No image in OpenRouter response: {data}") + + +class JimengProvider(ImageProvider): + """ByteDance Jimeng (即梦) — async submit + poll with HMAC-SHA256 auth.""" + + provider_key = "jimeng" + _POLL_INTERVAL = 2 + _POLL_MAX_ATTEMPTS = 60 + + def __init__(self, api_key: str, secret_key: str = "", + model: str = "jimeng_t2i_v40", + base_url: str = "https://visual.volcengineapi.com", **_kw): + self._access_key = api_key + self._secret_key = secret_key + self._model = model + self._base_url = base_url + + def _sign(self, method: str, path: str, query: str, + headers: dict, payload: bytes) -> dict: + """Generate Volcengine HMAC-SHA256 signed headers.""" + now = datetime.now(timezone.utc) + date_stamp = now.strftime("%Y%m%d") + amz_date = now.strftime("%Y%m%dT%H%M%SZ") + + signed_headers_list = sorted(k.lower() for k in headers) + signed_headers_str = ";".join(signed_headers_list) + + canonical = "\n".join([ + method, path, query, + "".join(f"{k.lower()}:{headers[k]}\n" for k in sorted(headers)), + signed_headers_str, + hashlib.sha256(payload).hexdigest(), + ]) + + region = "cn-north-1" + service = "cv" + scope = f"{date_stamp}/{region}/{service}/request" + string_to_sign = "\n".join([ + "HMAC-SHA256", amz_date, scope, + hashlib.sha256(canonical.encode()).hexdigest(), + ]) + + def _hmac(key: bytes, msg: str) -> bytes: + return hmac.new(key, msg.encode(), hashlib.sha256).digest() + + k_date = _hmac(self._secret_key.encode(), date_stamp) + k_region = _hmac(k_date, region) + k_service = _hmac(k_region, service) + k_signing = _hmac(k_service, "request") + signature = hmac.new(k_signing, string_to_sign.encode(), + hashlib.sha256).hexdigest() + + auth = (f"HMAC-SHA256 Credential={self._access_key}/{scope}, " + f"SignedHeaders={signed_headers_str}, Signature={signature}") + return {**headers, "Authorization": auth, "X-Date": amz_date} + + def _request(self, action: str, body: dict) -> dict: + payload = json.dumps(body).encode() + path = "/" + query = f"Action={action}&Version=2022-08-31" + headers = { + "Content-Type": "application/json", + "Host": self._base_url.replace("https://", "").replace("http://", ""), + } + signed = self._sign("POST", path, query, headers, payload) + resp = requests.post( + f"{self._base_url}/?{query}", + headers=signed, data=payload, timeout=120, + ) + data = resp.json() + if resp.status_code != 200: + raise ValueError(f"Jimeng error ({resp.status_code}): {data}") + return data + + def generate(self, prompt: str, size: str) -> bytes: + if not self._secret_key: + raise ValueError("Jimeng requires both api_key (access_key_id) " + "and secret_key (secret_access_key)") + w, h = 1024, 1024 + try: + w, h = (int(x) for x in size.split("x", 1)) + except ValueError: + pass + + # Submit task + submit = self._request("CVSync2AsyncSubmitTask", { + "req_key": self._model, "prompt": prompt, + "width": w, "height": h, + }) + task_id = submit.get("data", {}).get("task_id") + if not task_id: + raise ValueError(f"No task_id from Jimeng: {submit}") + + # Poll for result + for _ in range(self._POLL_MAX_ATTEMPTS): + time.sleep(self._POLL_INTERVAL) + result = self._request("CVSync2AsyncGetResult", { + "req_key": self._model, "task_id": task_id, + }) + code = result.get("code") + if code == 10000: + data = result.get("data", {}) + b64_list = data.get("binary_data_base64", []) + if b64_list: + return base64.b64decode(b64_list[0]) + urls = data.get("image_urls", []) + if urls: + return _download_image(urls[0]) + raise ValueError(f"No image data in Jimeng result: {result}") + if code and code != 10000: + status = result.get("data", {}).get("status") + if status in ("failed", "canceled"): + raise ValueError(f"Jimeng task failed: {result}") + + raise ValueError("Jimeng polling timeout") # --- Provider registry --- @@ -268,37 +597,82 @@ PROVIDERS = { "doubao": DoubaoProvider, "openai": OpenAIProvider, "gemini": GeminiProvider, + "dashscope": DashScopeProvider, + "minimax": MiniMaxProvider, + "replicate": ReplicateProvider, + "azure_openai": AzureOpenAIProvider, + "openrouter": OpenRouterProvider, + "jimeng": JimengProvider, } -def _build_provider(config: dict) -> ImageProvider: - """Build an ImageProvider from config.yaml's image section.""" - img_cfg = config.get("image", {}) - provider_name = img_cfg.get("provider", "doubao") - api_key = img_cfg.get("api_key") +def _build_provider_from_entry(entry: dict) -> ImageProvider: + """Build a single ImageProvider from a provider config entry.""" + provider_name = entry.get("provider", "doubao") + api_key = entry.get("api_key") if not api_key: - raise ValueError( - f"image.api_key not set in config.yaml. " - f"Configure your {provider_name} API key to enable image generation." - ) + raise ValueError(f"No api_key for provider '{provider_name}'") provider_cls = PROVIDERS.get(provider_name) if not provider_cls: raise ValueError( - f"Unknown image provider: '{provider_name}'. " + f"Unknown provider: '{provider_name}'. " f"Available: {', '.join(PROVIDERS.keys())}" ) kwargs = {"api_key": api_key} - if img_cfg.get("model"): - kwargs["model"] = img_cfg["model"] - if img_cfg.get("base_url"): - kwargs["base_url"] = img_cfg["base_url"] + if entry.get("model"): + kwargs["model"] = entry["model"] + if entry.get("base_url"): + kwargs["base_url"] = entry["base_url"] + if entry.get("secret_key"): + kwargs["secret_key"] = entry["secret_key"] + if entry.get("deployment"): + kwargs["deployment"] = entry["deployment"] return provider_cls(**kwargs) +def _build_provider_chain(config: dict) -> list[ImageProvider]: + """Build an ordered list of providers to try. + + Supports two config formats: + - Legacy: image.provider + image.api_key (single provider) + - New: image.providers (list, tried in order with auto-fallback) + """ + img_cfg = config.get("image", {}) + providers_list = img_cfg.get("providers") + + if providers_list and isinstance(providers_list, list): + chain = [] + for entry in providers_list: + try: + chain.append(_build_provider_from_entry(entry)) + except ValueError: + continue # skip misconfigured entries + if not chain: + raise ValueError( + "No valid providers in image.providers list. " + "Each entry needs 'provider' and 'api_key'." + ) + return chain + + # Legacy single-provider format + api_key = img_cfg.get("api_key") + if not api_key: + raise ValueError( + "image.api_key not set in config.yaml. " + "Configure your API key to enable image generation." + ) + return [_build_provider_from_entry(img_cfg)] + + +def _build_provider(config: dict) -> ImageProvider: + """Build an ImageProvider from config.yaml (backward-compatible entry point).""" + return _build_provider_chain(config)[0] + + # --- Public API --- def generate_image( @@ -308,7 +682,10 @@ def generate_image( config: dict = None, ) -> str: """ - Generate an image using the configured provider. + Generate an image using configured providers with auto-fallback. + + Tries each provider in order. If one fails, falls back to the next. + Supports both single-provider (legacy) and multi-provider config. Args: prompt: Image generation prompt (Chinese or English). @@ -322,38 +699,45 @@ def generate_image( if config is None: config = _load_config() - provider = _build_provider(config) - resolved_size = provider.resolve_size(size) + chain = _build_provider_chain(config) + last_error = None - raw_bytes = provider.generate(prompt, resolved_size) + for provider in chain: + resolved_size = provider.resolve_size(size) + try: + raw_bytes = provider.generate(prompt, resolved_size) + except Exception as e: + last_error = e + print( + f"Provider '{provider.provider_key}' failed: {e}. " + f"Trying next...", + file=sys.stderr, + ) + continue - # Compress if over 5MB (WeChat upload limit) - if len(raw_bytes) > MAX_FILE_SIZE: - raw_bytes = _compress_image(raw_bytes, MAX_FILE_SIZE) + # Compress if over 5MB (WeChat upload limit) + if len(raw_bytes) > MAX_FILE_SIZE: + raw_bytes = _compress_image(raw_bytes, MAX_FILE_SIZE) - output = Path(output_path) - output.parent.mkdir(parents=True, exist_ok=True) - output.write_bytes(raw_bytes) - return str(output) + output = Path(output_path) + output.parent.mkdir(parents=True, exist_ok=True) + output.write_bytes(raw_bytes) + return str(output) + + raise ValueError( + f"All providers failed. Last error: {last_error}" + ) def main(): - parser = argparse.ArgumentParser( - description="Generate images using AI (doubao-seedream, OpenAI DALL-E, Gemini Imagen, etc.)" - ) - parser.add_argument("--prompt", required=True, help="Image generation prompt") - parser.add_argument("--output", required=True, help="Output file path") - parser.add_argument( - "--size", - default="cover", - help="Size: cover, article, vertical, square, or WxH", - ) - parser.add_argument( - "--provider", - default=None, - help="Override provider (doubao, openai, gemini). Default: from config.yaml", - ) - args = parser.parse_args() + ap = argparse.ArgumentParser(description="Generate images using AI") + ap.add_argument("--prompt", required=True, help="Image generation prompt") + ap.add_argument("--output", required=True, help="Output file path") + ap.add_argument("--size", default="cover", + help="Size: cover, article, vertical, square, or WxH") + ap.add_argument("--provider", default=None, + help=f"Override provider ({', '.join(PROVIDERS)})") + args = ap.parse_args() try: config = _load_config() diff --git a/scripts/fetch_article.py b/scripts/fetch_article.py new file mode 100644 index 0000000..811cf3a --- /dev/null +++ b/scripts/fetch_article.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 +"""fetch_article.py — extract WeChat article content as Markdown. + +Three-level fetching strategy: + Level 1: requests (fast, zero overhead, works for most articles) + Level 2: Playwright headless Chrome (bypasses anti-scraping JS checks) + Level 3: Prompt user to save HTML manually and pass via --file + +Usage: + python3 scripts/fetch_article.py # auto fetch + python3 scripts/fetch_article.py -o article.md # save to file + python3 scripts/fetch_article.py --file saved.html # from local HTML + python3 scripts/fetch_article.py --json # JSON output for agent +""" + +import argparse +import json +import re +import sys +from pathlib import Path + +import requests +from bs4 import BeautifulSoup, NavigableString + +_BROWSER_UA = ( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" +) + + +# --------------------------------------------------------------------------- +# Fetching: three-level strategy +# --------------------------------------------------------------------------- + +def _fetch_requests(url: str, timeout: int = 20) -> str | None: + """Level 1: plain requests. Returns HTML string or None on failure.""" + try: + resp = requests.get(url, headers={"User-Agent": _BROWSER_UA}, timeout=timeout) + resp.raise_for_status() + resp.encoding = "utf-8" + return resp.text + except requests.exceptions.RequestException: + return None + + +def _fetch_playwright(url: str, timeout: int = 30000) -> str | None: + """Level 2: Playwright headless Chrome. Returns HTML or None.""" + try: + from playwright.sync_api import sync_playwright + except ImportError: + return None + + try: + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page(user_agent=_BROWSER_UA) + page.goto(url, wait_until="networkidle", timeout=timeout) + # Wait for WeChat content to render + page.wait_for_selector("#js_content", timeout=10000) + html = page.content() + browser.close() + return html + except Exception: + return None + + +def fetch_html(url: str) -> str: + """Fetch article HTML with automatic fallback. + + Returns HTML string. Exits with error if all levels fail. + """ + # Level 1 + html = _fetch_requests(url) + if html and _has_content(html): + return html + + # Level 2 + print("requests 未获取到正文,尝试 Playwright...", file=sys.stderr) + html = _fetch_playwright(url) + if html and _has_content(html): + return html + + # Level 3 + print( + "Error: 无法获取文章内容。请在浏览器中打开文章 → 右键另存为 HTML → 使用 --file 参数传入。", + file=sys.stderr, + ) + sys.exit(1) + + +def _has_content(html: str) -> bool: + """Check if HTML contains non-empty #js_content.""" + soup = BeautifulSoup(html, "html.parser") + content = soup.find(id="js_content") + if content is None: + return False + text = content.get_text(strip=True) + return len(text) > 50 # must have real content, not just whitespace + + +# --------------------------------------------------------------------------- +# HTML → Markdown conversion +# --------------------------------------------------------------------------- + +def _extract_metadata(soup: BeautifulSoup) -> dict: + """Extract article metadata from WeChat page.""" + title_tag = soup.find("h1", class_="rich_media_title") or soup.find( + "h1", id="activity-name" + ) + title = title_tag.get_text(strip=True) if title_tag else "" + + author_tag = soup.find("a", id="js_name") or soup.find( + "span", class_="rich_media_meta_nickname" + ) + author = author_tag.get_text(strip=True) if author_tag else "" + + # Publish time + pub_tag = soup.find("em", id="publish_time") + pub_time = pub_tag.get_text(strip=True) if pub_tag else "" + + return {"title": title, "author": author, "publish_time": pub_time} + + +def _elem_to_md(elem, depth: int = 0) -> str: + """Convert a single HTML element to Markdown.""" + tag = elem.name if hasattr(elem, "name") else None + + if isinstance(elem, NavigableString): + text = str(elem).strip() + return text if text else "" + + if tag is None: + return "" + + # Skip hidden/empty elements + style = elem.get("style", "") + if "display:none" in style.replace(" ", "").lower(): + return "" + if "visibility:hidden" in style.replace(" ", "").lower(): + return "" + + # Get inner content recursively + inner = "" + for child in elem.children: + inner += _elem_to_md(child, depth + 1) + + inner = inner.strip() + if not inner: + return "" + + # Headings + if tag in ("h1", "h2", "h3", "h4"): + level = int(tag[1]) + return f"\n\n{'#' * level} {inner}\n\n" + + # Paragraphs + if tag == "p": + return f"\n\n{inner}\n\n" + + # Line breaks + if tag == "br": + return "\n" + + # Bold + if tag in ("strong", "b"): + return f"**{inner}**" + + # Italic + if tag in ("em", "i"): + return f"*{inner}*" + + # Links + if tag == "a": + href = elem.get("href", "") + if href and not href.startswith("javascript:"): + return f"[{inner}]({href})" + return inner + + # Images + if tag == "img": + src = elem.get("data-src") or elem.get("src") or "" + alt = elem.get("alt", "") + if src: + return f"\n\n![{alt}]({src})\n\n" + return "" + + # Blockquotes + if tag == "blockquote": + lines = inner.split("\n") + quoted = "\n".join(f"> {line}" for line in lines if line.strip()) + return f"\n\n{quoted}\n\n" + + # Lists + if tag in ("ul", "ol"): + return f"\n\n{inner}\n\n" + if tag == "li": + parent = elem.parent + if parent and parent.name == "ol": + # Ordered list — position tracking is imperfect but functional + return f"1. {inner}\n" + return f"- {inner}\n" + + # Code + if tag == "code": + if elem.parent and elem.parent.name == "pre": + return inner + return f"`{inner}`" + if tag == "pre": + return f"\n\n```\n{inner}\n```\n\n" + + # Horizontal rule + if tag == "hr": + return "\n\n---\n\n" + + # Section / div / span — pass through + if tag in ("section", "div", "span", "article", "main", "figure", + "figcaption", "table", "thead", "tbody", "tr"): + return inner + + # Table cells + if tag in ("td", "th"): + return f" {inner} |" + + return inner + + +def html_to_markdown(soup: BeautifulSoup) -> str: + """Convert WeChat article HTML to clean Markdown.""" + content = soup.find(id="js_content") + if content is None: + return "" + + raw = _elem_to_md(content) + + # Clean up excessive whitespace + md = re.sub(r"\n{3,}", "\n\n", raw) + md = md.strip() + return md + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def fetch_article(url: str = None, file_path: str = None) -> dict: + """Fetch and parse a WeChat article. + + Args: + url: WeChat article URL. + file_path: Path to saved HTML file (alternative to URL). + + Returns: + dict with keys: title, author, publish_time, markdown, url + """ + if file_path: + html = Path(file_path).read_text(encoding="utf-8") + elif url: + html = fetch_html(url) + else: + raise ValueError("Either url or file_path must be provided") + + soup = BeautifulSoup(html, "html.parser") + meta = _extract_metadata(soup) + md = html_to_markdown(soup) + + return { + "title": meta["title"], + "author": meta["author"], + "publish_time": meta["publish_time"], + "markdown": md, + "url": url or "", + } + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + +def main(): + ap = argparse.ArgumentParser( + description="Extract WeChat article content as Markdown." + ) + ap.add_argument("url", nargs="?", help="WeChat article URL") + ap.add_argument("--file", dest="file_path", + help="Local HTML file instead of URL") + ap.add_argument("-o", "--output", help="Save Markdown to file") + ap.add_argument("--json", dest="as_json", action="store_true", + help="Output as JSON (for agent use)") + args = ap.parse_args() + + if not args.url and not args.file_path: + ap.error("Provide a URL or --file path") + + result = fetch_article(url=args.url, file_path=args.file_path) + + if args.as_json: + print(json.dumps(result, ensure_ascii=False, indent=2)) + elif args.output: + # Write Markdown with YAML frontmatter + out = Path(args.output) + frontmatter = f"---\ntitle: \"{result['title']}\"\nauthor: \"{result['author']}\"\n" + if result["publish_time"]: + frontmatter += f"date: \"{result['publish_time']}\"\n" + if result["url"]: + frontmatter += f"source: \"{result['url']}\"\n" + frontmatter += "---\n\n" + out.write_text(frontmatter + result["markdown"], encoding="utf-8") + print(f"Saved: {out}") + else: + if result["title"]: + print(f"# {result['title']}\n") + if result["author"]: + print(f"> {result['author']}") + if result["publish_time"]: + print(f"> {result['publish_time']}") + if result["author"] or result["publish_time"]: + print() + print(result["markdown"]) + + +if __name__ == "__main__": + main() diff --git a/scripts/learn_theme.py b/scripts/learn_theme.py index e0ef85d..ce32978 100644 --- a/scripts/learn_theme.py +++ b/scripts/learn_theme.py @@ -12,7 +12,6 @@ import sys from collections import Counter from pathlib import Path -import requests import yaml from bs4 import BeautifulSoup @@ -154,12 +153,6 @@ _TARGET_TAGS = { "blockquote", "code", "pre", "img", "a", } -_BROWSER_UA = ( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/124.0.0.0 Safari/537.36" -) - TEMPLATE_THEME = "professional-clean" THEMES_DIR = Path(__file__).resolve().parent.parent / "toolkit" / "themes" @@ -175,26 +168,20 @@ def _attach_title(soup, content) -> None: def fetch_article(url: str, timeout: int = 20) -> "BeautifulSoup tag": """Fetch a WeChat article, return the ``#js_content`` element. - The article title is attached as ``content._wewrite_title`` (empty string - if not found). Exits with code 1 on network errors or missing content. + Delegates to fetch_article.fetch_html() for three-level fetching + (requests → Playwright → manual fallback). - Parameters - ---------- - url: WeChat article URL (mp.weixin.qq.com/…) - timeout: HTTP request timeout in seconds (default 20). + The article title is attached as ``content._wewrite_title`` (empty string + if not found). """ - try: - resp = requests.get(url, headers={"User-Agent": _BROWSER_UA}, timeout=timeout) - resp.raise_for_status() - except requests.exceptions.RequestException as exc: - print(f"Error: failed to fetch URL: {exc}", file=sys.stderr) - sys.exit(1) - resp.encoding = "utf-8" - soup = BeautifulSoup(resp.text, "html.parser") + from scripts.fetch_article import fetch_html + + html = fetch_html(url) + soup = BeautifulSoup(html, "html.parser") content = soup.find(id="js_content") if content is None: - print("Error: #js_content not found — the page may require verification.", file=sys.stderr) + print("Error: #js_content not found.", file=sys.stderr) sys.exit(1) _attach_title(soup, content)