02 · Worker 服务与 AI 处理管道
长驻后台 HTTP Worker 协调 AI 压缩管道(Claude/Gemini/OpenRouter 多提供商),将工具调用转为结构化 observation 并持久化到 SQLite,PID 单例保障进程唯一性。
概览
WorkerService 是 claude-mem 的"大脑"——一个长驻后台的 HTTP 服务,负责:
- 接收所有生命周期 hook 发来的请求(session-init / observation / summarize)
- 调度 AI 提供商(Claude SDK / Gemini / OpenRouter)压缩工具调用为结构化 observation
- 将结果持久化到 SQLite 并通过 SSE 实时广播给 Viewer UI
源文件:src/services/worker-service.ts(993 行,是整个系统的入口)
1. WorkerService 作为 HTTP 服务器
1.1 启动链
1
2
3
4
5
6
7
8
|
bun-runner.js → worker-service.cjs start 子命令
│
└─ ensureWorkerStarted(port) // worker-spawner.ts
├─ 检查端口是否已被占用 // isPortInUse()
├─ 若未运行 → spawnDaemon() // ProcessManager.ts:408
│ ├─ Linux: setsid + bun --daemon
│ └─ Windows: PowerShell Start-Process Hidden
└─ waitForHealth(port, timeout) // 轮询 /api/health 直到就绪
|
💡 Tip 为什么用 setsid?
setsid 创建新 session 组,使守护进程脱离父进程的控制终端(TTY)。这样即使 hook 进程退出,worker 也继续运行。Windows 用 Start-Process -WindowStyle Hidden 实现同等效果。
1.2 --daemon 模式的单例保证
worker-service.ts:936-951 — 守护进程入口(--daemon 或默认 case):
1
2
3
4
5
6
7
8
9
10
11
|
// 双重保险:PID 文件 + 端口检测
const existingPidInfo = readPidFile();
if (verifyPidFileOwnership(existingPidInfo)) {
logger.info('SYSTEM', 'Worker already running (PID alive), refusing to start duplicate');
process.exit(0); // 已有存活进程,直接退出
}
if (await isPortInUse(port)) {
logger.info('SYSTEM', 'Port already in use, refusing to start duplicate', { port });
process.exit(0); // 端口被占,安静退出
}
|
verifyPidFileOwnership 会比对 PID 文件中的 startToken(进程启动时间戳),防止 PID 复用导致误判。
1.3 WorkerService 构造函数:依赖注入图
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// worker-service.ts:107-203
constructor() {
this.dbManager = new DatabaseManager();
this.sessionManager = new SessionManager(this.dbManager);
this.sseBroadcaster = new SSEBroadcaster();
this.sdkAgent = new ClaudeProvider(this.dbManager, this.sessionManager);
this.geminiAgent = new GeminiProvider(this.dbManager, this.sessionManager);
this.openRouterAgent = new OpenRouterProvider(this.dbManager, this.sessionManager);
this.completionHandler = new SessionCompletionHandler(...);
// ...
this.server = new Server({ getInitializationComplete, getAiStatus, ... });
this.registerRoutes();
}
|
💡 Tip 两阶段初始化
start() 先让 HTTP server 开始监听,再异步跑 initializeBackground()(初始化 DB、搜索索引等)。在 DB 就绪前,/api/* 端点返回 503,只有 /health、/readiness、/version 提前可用。这样 hook 可以快速收到 “worker ready” 响应,不会因为 DB 迁移而超时。
1.4 HTTP 端点全景
| 路由模块 |
路径前缀 |
核心功能 |
SessionRoutes |
/api/sessions/ |
init / observations / summarize / status |
DataRoutes |
/api/data/ |
查询历史 observations & summaries,SSE |
SearchRoutes |
/api/search/ |
语义搜索(Chroma + SQLite 混合) |
ViewerRoutes |
/ + /api/stream |
前端静态资源 + SSE 实时推送 |
SettingsRoutes |
/api/settings/ |
读写 settings.json |
MemoryRoutes |
/api/memory/ |
直接写入 memory observation |
CorpusRoutes |
/api/corpus/ |
知识语料库构建与查询 |
ChromaRoutes |
/api/chroma/ |
向量索引管理 |
LogsRoutes |
/api/logs/ |
日志查看 |
| Server 内置 |
/api/context/inject |
注入 context 到 session(UserPromptSubmit hook) |
| Server 内置 |
/api/instructions |
返回 SKILL.md 给 MCP 客户端 |
| Server 内置 |
/api/health, /api/readiness |
存活检测 |
2. AI 压缩管道:从 PostToolUse 到 observation 入库
2.1 整体数据流
flowchart TD
Hook["PostToolUse Hook\n(bun-runner.js)"]
HTTP["POST /api/sessions/observations\nSessionRoutes"]
Queue["PendingMessageStore\n(SQLite: pending_messages)"]
Gen["ClaudeProvider.startSession()\nasync generator loop"]
Prompt["buildObservationPrompt()\nsrc/sdk/prompts.ts:81"]
SDK["@anthropic-ai/claude-agent-sdk\nquery() — 调用 Claude CLI 子进程"]
Parse["parseAgentXml()\nsrc/sdk/parser.ts"]
Store["SessionStore.storeObservations()\nSQLite + content_hash 去重"]
Chroma["ChromaSync\n向量同步"]
SSE["SSEBroadcaster\n推送给 Viewer"]
Hook -->|"JSON: tool_name, tool_input, tool_response"| HTTP
HTTP -->|"ingestObservation()"| Queue
Queue -->|"getMessageIterator()"| Gen
Gen -->|"待处理 observation 消息"| Prompt
Prompt -->|"XML 模板化的工具调用"| SDK
SDK -->|"assistant message (XML)"| Parse
Parse -->|"ParsedObservation[]"| Store
Store --> Chroma
Store --> SSE2.2 Prompt 结构:buildObservationPrompt
src/sdk/prompts.ts:81-113
1
2
3
4
5
6
7
8
9
10
11
12
|
return `<observed_from_primary_session>
<what_happened>${obs.tool_name}</what_happened>
<occurred_at>${new Date(obs.created_at_epoch).toISOString()}</occurred_at>
<working_directory>${obs.cwd}</working_directory>
<parameters>${JSON.stringify(toolInput, null, 2)}</parameters>
<outcome>${JSON.stringify(toolOutput, null, 2)}</outcome>
</observed_from_primary_session>
Return either one or more <observation>...</observation> blocks, or an empty response if
this tool use should be skipped.
Concrete debugging findings ... count as durable discoveries and should be recorded.
Never reply with prose such as "Skipping" ... Non-XML text is discarded.`;
|
关键设计决策:
- 用 XML 标签而非自然语言描述:让 AI 响应也用 XML 格式,方便精确解析
- 明确说明"空响应优于解释性文字":避免 AI 输出 prose 导致解析失败
tool_name 作为 what_happened:AI 能理解这是一次工具调用事件
2.3 AI 会话的消息流(createMessageGenerator)
ClaudeProvider.ts:351-438 — 一个 async generator,向 SDK 持续 yield 消息:
1
2
3
|
第1条消息: buildInitPrompt() → 告知 AI 身份(Observer 角色)+ 用户原始请求
后续消息: buildObservationPrompt() → 逐条发送工具调用
Stop hook: buildSummaryPrompt() → 触发会话摘要
|
💡 Tip 为什么用 async generator 而不是一次性发所有消息?
工具调用是流式到来的——用户在和 Claude 主会话交互时,hook 实时发来每一次工具调用。generator 可以"暂停等待"新的 pending message,等消息到来再 yield 给 SDK。这样实现了流式处理而不需要等主会话结束才批量处理。
2.4 SDK 会话恢复机制(Resume)
1
2
3
4
5
6
7
8
9
10
11
|
// ClaudeProvider.ts:140-195
const shouldResume = hasRealMemorySessionId && session.lastPromptNumber > 1 && !session.forceInit;
const queryResult = query({
prompt: messageGenerator,
options: {
model: modelId,
...(shouldResume && session.memorySessionId ? { resume: session.memorySessionId } : {}),
// ...
}
});
|
memorySessionId 是 Claude Agent SDK 分配的内部会话 ID,通过 message.session_id 捕获并写入 SQLite。这样 Worker 重启后能恢复 AI 的对话上下文,而不是每次都从头 init。
⚠️ Warning Worker 重启的对齐问题
Worker 重启后 lastPromptNumber 可能 > 1(DB 里有记录),但 SDK 进程已死,memorySessionId 对应的会话上下文不存在。所以 promptNumber === 1 时强制走 fresh start,即使 DB 里有 memorySessionId:
1
2
3
4
5
6
|
// ClaudeProvider.ts:170-176
if (session.lastPromptNumber > 1) {
// 尝试 resume
} else {
// 第一个 prompt 永远 fresh start(防止 stale resume)
}
|
3. Stop Hook:SessionCompletionHandler 与会话摘要
3.1 触发时序
1
2
3
4
5
6
|
Claude 主会话结束
→ Stop hook (bun-runner.js)
→ POST /api/sessions/summarize
→ SessionRoutes.handleSummarizeByClaudeId()
→ sessionManager.queueSummarize(sessionDbId, lastAssistantMessage)
→ ensureGeneratorRunning(sessionDbId, 'summarize')
|
3.2 摘要 Prompt:buildSummaryPrompt
src/sdk/prompts.ts:115-147
1
2
3
4
5
6
7
8
9
10
11
12
13
|
--- MODE SWITCH: PROGRESS SUMMARY ---
⚠️ CRITICAL TAG REQUIREMENT:
• 你 MUST 用 <summary>...</summary> 包裹整个响应
• 禁止用 <observation> 标签,会被丢弃
<summary>
<request>...</request> ← 用户做了什么
<investigated>...</investigated> ← 探索了什么
<learned>...</learned> ← 学到了什么
<completed>...</completed> ← 完成了什么
<next_steps>...</next_steps> ← 下一步
<notes>...</notes> ← 备注
</summary>
|
摘要 vs Observation 的本质区别:
| 维度 |
Observation |
Summary |
| 触发时机 |
PostToolUse(每次工具调用后) |
Stop hook(会话结束时) |
| 粒度 |
单次工具调用的压缩 |
整个会话的高层总结 |
| XML 标签 |
<observation> |
<summary> |
| 字段 |
type/title/facts/narrative/concepts/files |
request/investigated/learned/completed/next_steps |
| 用途 |
细粒度历史检索 |
快速理解一次会话做了什么 |
3.3 SessionCompletionHandler
src/services/worker/session/SessionCompletionHandler.ts
1
2
3
4
5
6
7
8
9
10
|
finalizeSession(sessionDbId: number): void {
sessionStore.markSessionCompleted(sessionDbId); // 写 DB
pendingStore.clearPendingForSession(sessionDbId); // 清空队列中的遗留消息
this.eventBroadcaster.broadcastSessionCompleted(sessionDbId); // SSE 通知 UI
}
async completeByDbId(sessionDbId: number): Promise<void> {
this.finalizeSession(sessionDbId);
await this.sessionManager.deleteSession(sessionDbId); // 从内存 Map 移除
}
|
⚠️ Warning 为什么要 clearPendingForSession?
Stop hook 和最后一次 PostToolUse hook 可能竞争到达 Worker。如果 summarize 先处理完,队列里还残留着未处理的 observation messages,会触发 generator 重启陷入循环。finalizeSession 强制清理它们,防止僵尸 generator。
4. Worker 单例机制详解
4.1 PID 文件
src/services/infrastructure/ProcessManager.ts:134-168
1
2
3
4
5
6
7
8
9
10
11
12
|
// 写入(启动时调用)
export function writePidFile(info: PidInfo): void {
const resolvedToken = captureProcessStartToken(info.pid); // 用 ps/tasklist 获取进程启动时间
const payload = { ...info, startToken: resolvedToken };
writeFileSync(PID_FILE, JSON.stringify(payload, null, 2));
}
// PID_FILE = ~/.claude-mem/worker.pid(由 CLAUDE_MEM_DATA_DIR 驱动)
// 验证(防 PID 复用)
export function verifyPidFileOwnership(pidInfo): boolean {
// 对比进程实际启动时间与 startToken,两者一致才认为是"我们的"进程
}
|
4.2 Supervisor 单例
src/supervisor/index.ts:141 — 模块级单例:
1
|
const supervisorSingleton = new Supervisor(getProcessRegistry());
|
Supervisor 负责:
- 在
start() 时检查 PID 文件(validateWorkerPidFile)防止双启
- 注册 SIGTERM / SIGINT 信号处理器
- 在
--daemon 模式下忽略 SIGHUP(防止终端关闭导致退出)
- 维护
ProcessRegistry(记录所有受管子进程,如 SDK 进程、MCP server)
4.3 端口计算
1
2
|
默认端口 = 37700 + (uid % 100)
可覆盖:CLAUDE_MEM_WORKER_PORT 环境变量
|
💡 Tip 为什么用 uid 取模?
同一台机器多个 OS 用户(如 CI 服务多租户)自动获得不同端口,无需手动配置。同一 UID 的多 profile 场景再用 CLAUDE_MEM_WORKER_PORT 手动区分。
5. 多 AI 提供商抽象
5.1 提供商选择逻辑
worker-service.ts:481-488 和 SessionRoutes.ts:41-58:
1
2
3
4
5
|
private getActiveAgent(): ClaudeProvider | GeminiProvider | OpenRouterProvider {
if (isOpenRouterSelected() && isOpenRouterAvailable()) return this.openRouterAgent;
if (isGeminiSelected() && isGeminiAvailable()) return this.geminiAgent;
return this.sdkAgent; // 默认 Claude
}
|
三个提供商都实现相同接口:startSession(session, worker): Promise<void>,内部处理各自的 API 调用差异。
5.2 错误分类体系(ClassifiedProviderError)
src/services/worker/provider-errors.ts
1
2
3
4
5
6
7
|
type ProviderErrorClass =
| 'transient' // 网络抖动,可重试
| 'unrecoverable' // 上下文溢出、spawn 失败,不重试
| 'rate_limit' // 429,可重试(含 Retry-After)
| 'quota_exhausted' // 配额耗尽,停止处理
| 'auth_invalid' // API key 无效,停止处理
| (string & {}); // 开放联合类型,提供商可扩展
|
每个提供商有自己的 classifier:
| 提供商 |
函数 |
特殊判断 |
| Claude |
classifyClaudeError |
SDK OverloadedError、ENOENT spawn 失败 |
| Gemini |
classifyGeminiError |
500 + body 含 “quota exceeded”(Gemini quirk) |
| OpenRouter |
classifyOpenRouterError |
OpenRouter 特有的错误码 |
worker-service.ts:550-586 中的分发逻辑:
1
2
3
4
5
6
7
8
9
|
const classified = isClassified(error)
? error // 已在提供商边界分类
: this.reclassifyAtDispatch(error, agent); // 安全网:兜底分类
if (dispatchKind === 'unrecoverable' || dispatchKind === 'auth_invalid' || dispatchKind === 'quota_exhausted') {
hadUnrecoverableError = true;
return; // 停止重试
}
// 其余 transient / rate_limit → 会触发 generator 重启
|
💡 Tip 为什么不用原始 HTTP 状态码判断?
不同提供商的相同语义用不同状态码表达(Gemini 的 quota 可能是 500)。用 ClassifiedProviderError.kind 这层抽象,上层分发逻辑无需关心具体状态码,也方便未来添加新提供商。
5.3 重试机制(retry.ts)
src/services/worker/retry.ts:指数退避 + jitter
1
2
3
4
5
6
7
8
9
10
|
// POST 接口非幂等,最多重试 2 次
const DEFAULT_OPTIONS = {
maxRetries: 2,
perAttemptTimeoutMs: 30_000,
baseDelayMs: 100,
maxDelayMs: 30_000
};
// 退避公式:100ms * 2^attempt + random(0~50ms),上限 30s
computeBackoffMs(attempt) = min(100 * 2^attempt + jitter, 30000)
|
GeminiProvider / OpenRouterProvider 用 withRetry() 包装 fetch 调用;ClaudeProvider 走 Agent SDK,有自己的重试逻辑。
6. 关键设计亮点小结
6.1 响应先行,处理异步
1
2
3
|
HTTP 请求到来 → 同步写入 pending_messages → 立即返回 {"status": "queued"}
↓
异步:generator 消费队列,调用 AI,写结果
|
Hook 的执行时间有 timeout 限制,如果 AI 处理是同步阻塞的,hook 就会超时失败。这种"入队立即返回"的设计让 hook 响应时间 < 100ms,AI 处理可以花几秒。
6.2 三层 Session ID 分离
contentSessionId:Claude 主会话 ID(来自 Claude Code,所有 hook 用这个对齐)
memorySessionId:Agent SDK 内部会话 ID(用于 AI 对话 resume)
sessionDbId:SQLite 自增主键(本地数据库用)
三个 ID 各司其职,前者关联 hook 请求,中者管理 AI 上下文延续,后者是数据库主键。
6.3 content_hash 去重
storeObservations 对 observation 内容做 hash,相同内容只存一次。这解决了 processAgentResponse 注释中提到的"同一 parsed observation 被映射到同一 DB 行"导致 Chroma 重复同步的问题(issue #2240)。
6.4 Fallback 提供商链
当 ClaudeProvider 因 SDK 进程终止而失败时(isSessionTerminatedError),runFallbackForTerminatedSession 会自动尝试 Gemini → OpenRouter,保证 observation 不丢失(worker-service.ts:665-710)。
7. 流程图:完整 observation 压缩链路
sequenceDiagram
participant Hook as PostToolUse Hook
participant WS as WorkerService HTTP
participant Queue as PendingMessageStore
participant CP as ClaudeProvider
participant SDK as Claude Agent SDK (子进程)
participant DB as SQLite
participant Chroma as ChromaSync
Hook->>WS: POST /api/sessions/observations
{tool_name, tool_input, tool_response}
WS->>Queue: ingestObservation() → INSERT pending_messages
WS-->>Hook: {"status": "queued"} (立即返回)
Note over CP: generator loop 已在运行
Queue-->>CP: getMessageIterator() 返回 pending message
CP->>CP: buildObservationPrompt(obs)
包装成 XML 格式
CP->>SDK: yield {type:"user", content: obsPrompt}
SDK->>SDK: 调用 Claude API 生成摘要
SDK-->>CP: {type:"assistant", content:"..."}
CP->>CP: processAgentResponse(text)
parseAgentXml()
CP->>DB: storeObservations()
content_hash 去重
DB-->>CP: {observationIds: [42, 43]}
CP->>Chroma: 异步向量同步
CP->>WS: SSE broadcast 给 Viewer UI
阅读提示:重点代码路径 worker-service.ts → SessionRoutes → ClaudeProvider.createMessageGenerator → sdk/prompts.ts → ResponseProcessor → SessionStore,这条链路覆盖了 80% 的核心逻辑。