Commit f13caa3e f13caa3e163e38b3dc795507128a084e42d2edc6 by cnb.bofCdSsphPA

Generate a live execution plan from pending extraction jobs

Constraint: Ralph must keep turning PostgreSQL state into concrete next-step artifacts rather than leaving implied manual steps
Rejected: Stop at creating pending jobs only | It still leaves future sessions to infer ordering and physical targets by hand
Confidence: high
Scope-risk: narrow
Directive: Treat the planner report as the canonical bridge between pending jobs and real extraction workers
Tested: /usr/local/miniconda3/bin/python scripts/plan_phase1_extraction_jobs_live.py --dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' --schema acr_test --job-status pending --output data/pgvector_eval/music20/phase1_extraction_plan_report.json; /usr/local/miniconda3/bin/python -m py_compile scripts/plan_phase1_extraction_jobs_live.py; git diff --check -- acr-engine/scripts/plan_phase1_extraction_jobs_live.py acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json docs/model-feature-registry-bootstrap.md docs/postgres_db_schema_samples.md docs/session-handoff.md docs/CHANGELOG.md
Not-tested: Actual worker that consumes the plan to run MERT/MuQ/Chromaprint extraction end-to-end
1 parent 5be68c1d
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_extraction_plan_report.json'
LANE_PRIORITY = {
'exact': 0,
'semantic': 1,
'cover': 2,
}
def parse_target_scope(target_scope: str) -> dict[str, Any]:
if ':' in target_scope:
scope_type, scope_value = target_scope.split(':', 1)
return {'scope_type': scope_type, 'scope_value': scope_value}
return {'scope_type': 'unknown', 'scope_value': target_scope}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--job-status', default='pending')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
with psycopg.connect(args.dsn) as conn:
conn.execute(f'SET search_path TO {args.schema}, public;')
rows = conn.execute(
"""
SELECT
fej.extraction_job_id,
fej.feature_set_id,
fej.target_scope,
fej.job_status,
fej.shard_key,
fej.metadata_json,
fs.feature_name,
fs.feature_level,
fs.extraction_granularity,
fs.window_sec,
fs.hop_sec,
fs.embedding_dim,
fs.distance_metric,
mr.model_name,
mr.model_version,
mr.model_family,
mr.output_embedding_dim,
mr.input_sample_rate,
mr.default_window_sec,
mr.default_hop_sec,
mr.metadata_json
FROM feature_extraction_job fej
JOIN feature_set_registry fs ON fs.feature_set_id = fej.feature_set_id
JOIN model_registry mr ON mr.model_id = fs.model_id
WHERE fej.job_status = %s
ORDER BY fej.extraction_job_id;
""",
(args.job_status,),
).fetchall()
jobs = []
by_lane: dict[str, list[dict[str, Any]]] = {}
for row in rows:
job_meta = row[5] or {}
model_meta = row[20] or {}
lane = job_meta.get('lane') or model_meta.get('lane') or 'unknown'
scope = parse_target_scope(row[2])
physical_target = 'audio_fingerprint' if row[6] == 'fingerprint_asset' else 'audio_embedding'
vector_table = None
if row[11] == 192:
vector_table = 'audio_embedding_vector_192'
elif row[11] == 768:
vector_table = 'audio_embedding_vector_768'
item = {
'priority_rank': LANE_PRIORITY.get(lane, 99),
'lane': lane,
'extraction_job_id': row[0],
'feature_set_id': row[1],
'target_scope': row[2],
'scope': scope,
'job_status': row[3],
'shard_key': row[4],
'model_name': row[13],
'model_version': row[14],
'model_family': row[15],
'input_sample_rate': row[17],
'feature_name': row[6],
'feature_level': row[7],
'extraction_granularity': row[8],
'window_sec': float(row[9]) if row[9] is not None else None,
'hop_sec': float(row[10]) if row[10] is not None else None,
'embedding_dim': row[11],
'distance_metric': row[12],
'physical_target': physical_target,
'vector_table': vector_table,
'job_metadata': job_meta,
'model_metadata': model_meta,
'execution_notes': [
f"run feature extraction for {row[13]} {row[14]}",
f"write to {physical_target}" + (f" + {vector_table}" if vector_table else ''),
f"target scope: {row[2]}",
],
}
jobs.append(item)
by_lane.setdefault(lane, []).append(item)
jobs.sort(key=lambda x: (x['priority_rank'], x['extraction_job_id']))
for lane_jobs in by_lane.values():
lane_jobs.sort(key=lambda x: x['extraction_job_id'])
payload = {
'schema': args.schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'job_status_filter': args.job_status,
'counts': {
'jobs': len(jobs),
'lanes': {lane: len(items) for lane, items in sorted(by_lane.items())},
},
'ordered_jobs': jobs,
'by_lane': by_lane,
'execution_order_summary': [
{
'order': idx + 1,
'extraction_job_id': job['extraction_job_id'],
'lane': job['lane'],
'model_name': job['model_name'],
'feature_name': job['feature_name'],
'window_sec': job['window_sec'],
'hop_sec': job['hop_sec'],
'physical_target': job['physical_target'],
}
for idx, job in enumerate(jobs)
],
}
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
## 2026-06-04
- 新增 `acr-engine/scripts/plan_phase1_extraction_jobs_live.py``acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json`,支持从 PostgreSQL 的 `feature_extraction_job` 真实读取 pending jobs,并联表生成按 lane / priority 排序的 Phase-1 execution plan。
- 新增 `acr-engine/scripts/bootstrap_phase1_extraction_jobs_live.py``acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json`,把 Phase-1 的 `feature_extraction_job` 初始化做成可直接连 PostgreSQL 的 live 脚本,并已在 `acr_test` schema 真实创建 5 条 pending jobs。
- 补充 `phase1_registry_bootstrap_idempotency_report.json` 与文档说明,验证 `bootstrap_phase1_model_registry_live.py``acr_test` schema 上连续执行两次后表计数保持稳定,证明 Phase-1 registry bootstrap 具备可重复执行的幂等性。
- 新增 `acr-engine/scripts/bootstrap_phase1_model_registry_live.py``acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_report.json`,把 Phase-1 的 `chromaprint / mert / muq / ecapa` 与对应 `feature_set_registry / reference_set_registry` 初始化做成可直接连 PostgreSQL 的 live bootstrap 脚本,并已在 `acr_test` schema 验证通过。
......
......@@ -344,3 +344,56 @@ cd /workspace/acr-engine
这意味着:
> 现在 PostgreSQL 里已经不只是“模型定义”和“特征定义”,而是连 **下一步该跑哪些抽特征任务** 都已经具备结构化入口了。
---
## 10. Phase-1 extraction plan(从 pending jobs 生成)
`feature_extraction_job` 已经存在后,下一步通常不是马上手敲命令,而是先从 PostgreSQL 生成一个**统一执行计划**
本仓库现在已经提供:
- `acr-engine/scripts/plan_phase1_extraction_jobs_live.py`
用途:
- 读取 `feature_extraction_job`
- 过滤 `job_status=pending`
- 联表 `feature_set_registry + model_registry`
- 生成按 lane / priority 排序的 execution plan
### 10.1 执行命令
```bash
cd /workspace/acr-engine
/usr/local/miniconda3/bin/python scripts/plan_phase1_extraction_jobs_live.py \
--dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' \
--schema acr_test \
--job-status pending \
--output data/pgvector_eval/music20/phase1_extraction_plan_report.json
```
### 10.2 当前已验证结果(acr_test)
本轮已真实生成一份 ordered execution plan:
| order | lane | model | feature | physical_target |
|---|---|---|---|---|
| 1 | `exact` | `chromaprint` | `fingerprint_asset` | `audio_fingerprint` |
| 2 | `semantic` | `mert` | `semantic_embedding 5s/2.5s` | `audio_embedding` |
| 3 | `semantic` | `mert` | `semantic_embedding 10s/5s` | `audio_embedding` |
| 4 | `semantic` | `muq` | `semantic_embedding 5s/2.5s` | `audio_embedding` |
| 5 | `semantic` | `ecapa` | `semantic_embedding 5s/2.5s` | `audio_embedding` |
其中 planner 还会自动给出:
- `vector_table`
- `audio_embedding_vector_768`
- `audio_embedding_vector_192`
- `target_scope`
- `execution_notes`
当前产物:
- `acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json`
结论:
> 现在 PostgreSQL 里已经不仅能描述“有哪些 job”,还可以直接生成**按执行顺序排好的抽特征计划**。
......
......@@ -68,6 +68,7 @@
| registry bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_report.json` |
| registry bootstrap 幂等性报告 | `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_idempotency_report.json` |
| extraction job bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json` |
| extraction plan 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json` |
| 历史对照报告 | `acr-engine/data/pgvector_eval/music20/songid_eval_report.json` |
---
......@@ -416,6 +417,19 @@ flowchart LR
对应 live 报告:
- `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json`
### 本轮继续新增:pending jobs 已可生成 live execution plan
在 extraction jobs 之后,本轮又新增:
- `acr-engine/scripts/plan_phase1_extraction_jobs_live.py`
它已经在 `acr_test` schema 上真实读取 5 条 `pending` jobs,并生成按执行顺序排列的 plan:
- `chromaprint exact lane` 优先
- 然后是 `mert / muq / ecapa` 的 semantic lanes
对应 live 报告:
- `acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json`
### 路线 1:继续做 PostgreSQL 工程化
1.`live_pgvector_music20_eval.py` 泛化成:
......
......@@ -184,6 +184,7 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- PostgreSQL `acr_test` schema 上已真实写入 Phase-1 registry bootstrap:`chromaprint / mert / muq / ecapa` + 5 组 feature set + `phase1_hot_reference_v1`
- Phase-1 registry bootstrap 已有幂等性证据:同 schema 连续执行两次后,`model_registry=5 / feature_set_registry=6 / reference_set_registry=2` 保持不变
- PostgreSQL `acr_test` schema 上已真实创建 5 条 `feature_extraction_job`,后续 MERT / MuQ 接入可直接从 pending jobs 启动
- PostgreSQL `acr_test` schema 上已真实生成 Phase-1 extraction execution plan,当前顺序是 `chromaprint -> mert -> mert-long -> muq -> ecapa`
### 未验证 / 仍是缺口
- **未实际跑 MERT / MuQ encoder-only 特征抽取**
......