phase1-worker-contract.md 7.72 KB

Phase-1 Worker Contract / 作业执行器契约

更新:2026-06-04
目标:把 Phase-1 从“只有 registry / plan”推进到“worker 可以真实消费 PostgreSQL 作业并更新状态”。


一页结论

当前 Phase-1 已经具备一条最小真实执行链:

  1. planner 从 feature_extraction_job 读 pending jobs
  2. worker 读取 extraction_job_id
  3. worker 联表解析 feature_set_registry + model_registry
  4. worker 解析 target_scope
  5. worker 回写 feature_extraction_job.job_status / input_count / output_count / metadata_json

也就是说,现在 PostgreSQL 不只是“数据字典”,已经开始承担:

  • 作业编排面
  • 状态机面
  • 执行证据面

1. 当前落地的 worker

位于:

  • acr-engine/scripts/bootstrap_phase1_reference_members_live.py
  • acr-engine/workers/mark_job_status.py
  • acr-engine/workers/run_chromaprint_job.py
  • acr-engine/workers/run_embedding_job.py
  • acr-engine/workers/_job_common.py

角色划分

worker 作用
mark_job_status.py 通用状态推进器
run_chromaprint_job.py exact lane worker
run_embedding_job.py semantic lane worker
_job_common.py 共享的 job 读取、scope 解析、状态回写逻辑

配套 bootstrap

为了让 worker 不再面对空 scope,这轮还补上了:

  • acr-engine/scripts/bootstrap_phase1_reference_members_live.py

它会把当前 recording.is_reference = true 的录音挂到:

  • phase1_hot_reference_v1

这样 worker 可以真实看到:

  • recording_count
  • ready_asset_count
  • active_window_count

2. 当前状态机

flowchart LR
    A[pending] --> B[running]
    B --> C[completed]
    B --> D[failed]

当前已验证的状态流转

  • pending -> running
  • running -> completed(dry-run 模式)

当前状态保护

  • worker 认领 job 时要求前置状态为 pending
  • worker 完成 job 时要求前置状态为 running
  • mark_job_status.py 只接受:
    • pending
    • running
    • completed
    • failed
  • finished_at 只在首次完成时落值,不再被重复覆盖

已验证的 guard 行为

当前已真实验证:

  1. 同一 chromaprint job 第一次 dry-run:
    • 成功 pending -> running -> completed
  2. 不做 reset,直接第二次执行同一 job:
    • 被前置状态保护拒绝

对应证据:

  • acr-engine/data/pgvector_eval/music20/phase1_worker_double_claim_guard_report.json

设计意图

先把 作业契约与状态流转 固定住,再把真正的模型推理塞进去。
这样后续不管换成:

  • Chromaprint
  • MERT
  • MuQ
  • CoverHunter encoder

都不需要重做 orchestration 数据结构。


3. worker 输入契约

环境变量

变量 说明
PG_DSN PostgreSQL 连接串
PG_SCHEMA 目标 schema
EXTRACTION_JOB_ID 要执行的作业 id
FEATURE_SET_ID 规划时附带,worker 可用于一致性检查
TARGET_SCOPE 规划时附带,worker 当前以 DB 中 job 记录为准
MODEL_NAME embedding worker 用于防错
MODEL_VERSION embedding worker 用于防错
VECTOR_TABLE embedding worker 目标向量表
OUTPUT_TARGET audio_fingerprintaudio_embedding

CLI 参数

三个 worker 都支持显式 CLI 参数覆盖 env。

planner 命令模板的当前约定

plan_phase1_extraction_jobs_live.py 现在会显式生成:

cd /workspace/acr-engine && PG_DSN="${PG_DSN:?set PG_DSN}" ...

这样复制命令时,如果调用方忘了提供数据库连接串,会立刻失败,而不是静默跑空。

当前 planner 还会显式使用:

/usr/local/miniconda3/bin/python

原因是当前环境里 python 不在 PATH 上,但这个解释器路径已被验证可用。

对于当前 dry-run worker,planner 的主命令模板也会显式带上:

--complete-dry-run

这样 primary_command 就能直接复现:

pending -> running -> completed

4. PostgreSQL 读取契约

worker 当前真实读取:

  1. feature_extraction_job
  2. feature_set_registry
  3. model_registry
  4. reference_set_registry / reference_set_member
  5. recording_asset
  6. audio_window

为什么要读 scope summary

因为 Phase-1 第一阶段的核心不是“立刻抽出 embedding”,而是先确定:

  • 这次 job 面向哪个 reference set
  • 涉及多少 recording
  • 涉及多少 ready asset
  • 涉及多少 active window

这样后续做:

  • 分片
  • 并行
  • 重试
  • SLA 估算

才有稳定基线。


5. 当前 dry-run 的真实意义

当前 worker 还没有真正调用模型做特征提取;它做的是:

  1. 验证 planner 命令模板可被真实消费
  2. 验证 job -> feature_set -> model 的 join 契约
  3. 验证 target scope 解析
  4. 验证 PostgreSQL 作业状态回写
  5. 为下一步真推理保留稳定入口

所以它不是假文档,而是:

先把工业执行面的骨架打通,再把模型推理填进去。


6. 推荐执行顺序

flowchart TD
    A[bootstrap model/feature/reference registry] --> B[bootstrap feature_extraction_job]
    B --> C[plan pending jobs]
    C --> D[run worker dry-run]
    D --> E[validate status transitions]
    E --> F[replace dry-run with real extractor]

7. exact lane 与 semantic lane 的后续替换点

7.1 Chromaprint worker

后续把下面逻辑塞进 run_chromaprint_job.py

  1. 读取 recording_asset
  2. 读取可用音频并提取 exact-lane hash
  3. 写 artifact JSON
  4. audio_fingerprint
  5. 更新 output_count
  6. 标记 completed

当前 exact lane 的真实状态

这轮已经把 run_chromaprint_job.py 从“只有 dry-run”推进到:

  • 如果 source audio 可读:
    • 生成 repo-local chromaprint-style hash artifact
    • 写入 audio_fingerprint
  • 如果 source audio 不可读:
    • 明确把 job 标记为 failed
    • failure_reasonmissing_asset_countmissing_asset_samples 写回 PostgreSQL

当前失败语义

当前 exact lane 采用的是 全量成功 / 否则失败

  • 只要 scope 内任意 asset:
    • 缺文件
    • 解码失败
    • hash 提取失败

就整体标记:

  • job_status = failed
  • failure_reason = unreadable_audio_assets

这样不会把“部分成功”伪装成 completed

当前依赖策略

当前 exact lane 不再强依赖 librosa

  • 优先使用 librosa(如果环境里存在)
  • 否则回退到:
    • Python wave
    • numpy 线性重采样
    • numpy FFT spectrogram

这使得 worker contract 能在更瘦的运行环境里继续工作。

当前幂等保护

audio_fingerprint 现在补了:

  • UNIQUE(feature_set_id, asset_id)

对应 worker 写入改成:

  • INSERT ... ON CONFLICT DO UPDATE

因此 exact lane 对同一 (feature_set_id, asset_id) 的重复写入不再依赖应用层先查再写。

7.2 Embedding worker

后续把下面逻辑塞进 run_embedding_job.py

  1. 读取 audio_window
  2. 加载 MERT / MuQ / ECAPA
  3. 提取向量
  4. audio_embedding
  5. audio_embedding_vector_<dim>
  6. 更新 output_count
  7. 标记 completed

8. 解决了什么问题

这次 worker contract 落地,主要解决了 4 个问题:

  1. planner 不再只是纸面计划
  2. job status 有了真实推进器
  3. 后续换模型不用重做 orchestration
  4. 可以先 dry-run 验证执行链,再接入重模型

9. 当前边界

当前还没有完成的部分:

  • 真实 chromaprint 特征写入
  • 真实 MERT / MuQ / ECAPA embedding 写入
  • failed 重试策略
  • job 分片执行器
  • 幂等去重写入策略

但现在已经足够支撑下一阶段:

把真实 extractor 接到已经验证过的 PostgreSQL worker contract 上。