phase1-worker-contract.md 10.1 KB

Phase-1 Worker Contract / 作业执行器契约

更新:2026-06-04
目标:把 Phase-1 从“只有 registry / plan”推进到“worker 可以真实消费 PostgreSQL 作业并更新状态”。


一页结论

当前 Phase-1 已经具备一条最小真实执行链:

  1. planner 从 feature_extraction_job 读 pending jobs
  2. worker 读取 extraction_job_id
  3. worker 联表解析 feature_set_registry + model_registry
  4. worker 解析 target_scope
  5. worker 回写 feature_extraction_job.job_status / input_count / output_count / metadata_json

也就是说,现在 PostgreSQL 不只是“数据字典”,已经开始承担:

  • 作业编排面
  • 状态机面
  • 执行证据面

1. 当前落地的 worker

位于:

  • acr-engine/scripts/bootstrap_phase1_reference_members_live.py
  • acr-engine/workers/mark_job_status.py
  • acr-engine/workers/run_chromaprint_job.py
  • acr-engine/workers/run_embedding_job.py
  • acr-engine/workers/_job_common.py

角色划分

worker 作用
mark_job_status.py 通用状态推进器
run_chromaprint_job.py exact lane worker
run_embedding_job.py semantic lane worker
_job_common.py 共享的 job 读取、scope 解析、状态回写逻辑

配套 bootstrap

为了让 worker 不再面对空 scope,这轮还补上了:

  • acr-engine/scripts/bootstrap_phase1_reference_members_live.py

它会把当前 recording.is_reference = true 的录音挂到:

  • phase1_hot_reference_v1

这样 worker 可以真实看到:

  • recording_count
  • ready_asset_count
  • active_window_count

2. 当前状态机

flowchart LR
    A[pending] --> B[running]
    B --> C[completed]
    B --> D[failed]

当前已验证的状态流转

  • pending -> running
  • running -> completed(dry-run 模式)

当前状态保护

  • worker 认领 job 时要求前置状态为 pending
  • worker 完成 job 时要求前置状态为 running
  • mark_job_status.py 只接受:
    • pending
    • running
    • completed
    • failed
  • finished_at 只在首次完成时落值,不再被重复覆盖

已验证的 guard 行为

当前已真实验证:

  1. 同一 chromaprint job 第一次 dry-run:
    • 成功 pending -> running -> completed
  2. 不做 reset,直接第二次执行同一 job:
    • 被前置状态保护拒绝

对应证据:

  • acr-engine/data/pgvector_eval/music20/phase1_worker_double_claim_guard_report.json

设计意图

先把 作业契约与状态流转 固定住,再把真正的模型推理塞进去。
这样后续不管换成:

  • Chromaprint
  • MERT
  • MuQ
  • CoverHunter encoder

都不需要重做 orchestration 数据结构。


3. worker 输入契约

环境变量

变量 说明
PG_DSN PostgreSQL 连接串
PG_SCHEMA 目标 schema
EXTRACTION_JOB_ID 要执行的作业 id
FEATURE_SET_ID 规划时附带,worker 可用于一致性检查
TARGET_SCOPE 规划时附带,worker 当前以 DB 中 job 记录为准
MODEL_NAME embedding worker 用于防错
MODEL_VERSION embedding worker 用于防错
VECTOR_TABLE embedding worker 目标向量表
OUTPUT_TARGET audio_fingerprintaudio_embedding

CLI 参数

三个 worker 都支持显式 CLI 参数覆盖 env。

planner 命令模板的当前约定

plan_phase1_extraction_jobs_live.py 现在会显式生成:

cd /workspace/acr-engine && PG_DSN="${PG_DSN:?set PG_DSN}" ...

这样复制命令时,如果调用方忘了提供数据库连接串,会立刻失败,而不是静默跑空。

当前 planner 还会显式使用:

/usr/local/miniconda3/bin/python

原因是当前环境里 python 不在 PATH 上,但这个解释器路径已被验证可用。

对于当前 dry-run worker,planner 的主命令模板也会显式带上:

--complete-dry-run

这样 primary_command 就能直接复现:

pending -> running -> completed

4. PostgreSQL 读取契约

worker 当前真实读取:

  1. feature_extraction_job
  2. feature_set_registry
  3. model_registry
  4. reference_set_registry / reference_set_member
  5. recording_asset
  6. audio_window

为什么要读 scope summary

因为 Phase-1 第一阶段的核心不是“立刻抽出 embedding”,而是先确定:

  • 这次 job 面向哪个 reference set
  • 涉及多少 recording
  • 涉及多少 ready asset
  • 涉及多少 active window

这样后续做:

  • 分片
  • 并行
  • 重试
  • SLA 估算

才有稳定基线。


5. 当前 dry-run 的真实意义

当前 worker 还没有真正调用模型做特征提取;它做的是:

  1. 验证 planner 命令模板可被真实消费
  2. 验证 job -> feature_set -> model 的 join 契约
  3. 验证 target scope 解析
  4. 验证 PostgreSQL 作业状态回写
  5. 为下一步真推理保留稳定入口

所以它不是假文档,而是:

先把工业执行面的骨架打通,再把模型推理填进去。


6. 推荐执行顺序

flowchart TD
    A[bootstrap model/feature/reference registry] --> B[bootstrap feature_extraction_job]
    B --> C[plan pending jobs]
    C --> D[run worker dry-run]
    D --> E[validate status transitions]
    E --> F[replace dry-run with real extractor]

7. exact lane 与 semantic lane 的后续替换点

7.1 Chromaprint worker

后续把下面逻辑塞进 run_chromaprint_job.py

  1. 读取 recording_asset
  2. 读取可用音频并提取 exact-lane hash
  3. 写 artifact JSON
  4. audio_fingerprint
  5. 更新 output_count
  6. 标记 completed

当前 exact lane 的真实状态

这轮已经把 run_chromaprint_job.py 从“只有 dry-run”推进到:

  • 如果 source audio 可读:
    • 生成 repo-local chromaprint-style hash artifact
    • 写入 audio_fingerprint
  • 如果 source audio 不可读:
    • 明确把 job 标记为 failed
    • failure_reasonmissing_asset_countmissing_asset_samples 写回 PostgreSQL

当前失败语义

当前 exact lane 采用的是 全量成功 / 否则失败

  • 只要 scope 内任意 asset:
    • 缺文件
    • 解码失败
    • hash 提取失败

就整体标记:

  • job_status = failed
  • failure_reason = unreadable_audio_assets

这样不会把“部分成功”伪装成 completed

当前依赖策略

当前 exact lane 不再强依赖 librosa

  • 优先使用 librosa(如果环境里存在)
  • 否则回退到:
    • Python wave
    • numpy 线性重采样
    • numpy FFT spectrogram

这使得 worker contract 能在更瘦的运行环境里继续工作。

当前幂等保护

audio_fingerprint 现在补了:

  • UNIQUE(feature_set_id, asset_id)

对应 worker 写入改成:

  • INSERT ... ON CONFLICT DO UPDATE

因此 exact lane 对同一 (feature_set_id, asset_id) 的重复写入不再依赖应用层先查再写。

7.2 Embedding worker

run_embedding_job.py 现在已经不再只是简单 dry-run。当前它已经具备:

  1. 真实读取 reference_set -> audio_window -> recording_asset scope
  2. 真实检查目标向量表是否存在且与维度匹配
  3. 真实检查模型 runtime 依赖是否齐全
  4. 真实检查 source audio 是否存在
  5. 把 blocker 明确写回 feature_extraction_job.metadata_json
  6. 在 blocker 存在时把 job 诚实标记为 failed

当前失败语义

semantic lane 当前采用的是 preflight all-or-nothing

  • 只要 scope 内音频路径不可达 / 文件不存在,记为:
    • unreadable_audio_assets
  • 只要模型 runtime 依赖导入不满足,记为:
    • model_runtime_unavailable
  • 只要目标向量表非法 / 缺失 / 维度不匹配,记为对应 blocker

worker 会把这些 blocker 聚合到:

  • failure_reason = preflight_failed
  • preflight_blockers = [...]

这样不会把“模型没法跑”误写成 completed,也不会只暴露第一个错误。

当前 live 证据

MERT 5s/2.5s job (extraction_job_id=2) 在 acr_test 上已经真实验证:

  • scope_window_count = 20
  • job_status = failed
  • output_count = 0
  • preflight_blockers = ['unreadable_audio_assets', 'model_runtime_unavailable']
  • runtime_report.missing_dependencies = ['torch', 'torchaudio', 'transformers']
  • audio_embedding_vector_768 已通过存在性与维度校验

对应产物:

  • acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_write_attempt.json
  • acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_write_guard_report.json
  • acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_post_state.json

当前幂等保护

为了服务后续真正的 window embedding upsert,audio_embedding 现在补了两条唯一键:

  • UNIQUE(feature_set_id, window_id) WHERE window_id IS NOT NULL
  • UNIQUE(feature_set_id, asset_id) WHERE window_id IS NULL AND asset_id IS NOT NULL

这让后续真实 encoder 接入后可以直接做:

  • window 级 embedding upsert
  • asset 级 embedding upsert

而不需要先查再写。

下一步替换点

当 runtime 与音频挂载到位后,只需要把 guarded failure path 替换成真实 inference:

  1. 加载 MERT / MuQ / ECAPA
  2. 提取向量
  3. audio_embedding
  4. audio_embedding_vector_<dim>
  5. 更新 output_count
  6. 标记 completed

也就是说,PostgreSQL worker contract 已经固定,下一步换的是 encoder adapter,不是 orchestration 结构。


8. 解决了什么问题

这次 worker contract 落地,主要解决了 4 个问题:

  1. planner 不再只是纸面计划
  2. job status 有了真实推进器
  3. 后续换模型不用重做 orchestration
  4. 可以先 dry-run 验证执行链,再接入重模型

9. 当前边界

当前还没有完成的部分:

  • exact lane 虽已有真实写入路径,但当前 live 环境仍被 /workspace/downloads 缺失阻塞
  • semantic lane 已有真实 preflight failure contract,但还没有接上真正的 MERT / MuQ / ECAPA inference adapter
  • failed 重试策略
  • job 分片执行器
  • 更完整的 embedding artifact / checksum 治理策略

但现在已经足够支撑下一阶段:

把真实 extractor 接到已经验证过的 PostgreSQL worker contract 上。