Commit 5be68c1d 5be68c1d9db63369538b4ae82dc0aecb72ff6a6c by cnb.bofCdSsphPA

Create live Phase-1 extraction jobs in PostgreSQL

Constraint: Continue Phase-1 industrialization without waiting on missing audio mounts, and keep every Ralph step documented and pushed
Rejected: Leave extraction scheduling as an implicit next step after registry bootstrap | It forces future sessions to reconstruct pending jobs by hand
Confidence: high
Scope-risk: narrow
Directive: Use feature_extraction_job as the canonical handoff between registry bootstrap and actual encoder extraction runs
Tested: /usr/local/miniconda3/bin/python scripts/bootstrap_phase1_extraction_jobs_live.py --dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' --schema acr_test --output data/pgvector_eval/music20/phase1_extraction_jobs_report.json; /usr/local/miniconda3/bin/python -m py_compile scripts/bootstrap_phase1_extraction_jobs_live.py; git diff --check -- acr-engine/scripts/bootstrap_phase1_extraction_jobs_live.py acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json docs/model-feature-registry-bootstrap.md docs/postgres_db_schema_samples.md docs/session-handoff.md docs/CHANGELOG.md
Not-tested: Downstream worker that consumes these pending jobs to run real MERT/MuQ extraction
1 parent f0c82687
{
"schema": "acr_test",
"dsn_redacted": "postgres://d2:***@127.0.0.1:5432/d2",
"jobs": [
{
"extraction_job_id": 1,
"feature_set_id": 2,
"model_name": "chromaprint",
"model_version": "v1",
"feature_name": "fingerprint_asset",
"window_sec": 5.0,
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
},
{
"extraction_job_id": 2,
"feature_set_id": 3,
"model_name": "mert",
"model_version": "v1-95m",
"feature_name": "semantic_embedding",
"window_sec": 5.0,
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
},
{
"extraction_job_id": 3,
"feature_set_id": 4,
"model_name": "mert",
"model_version": "v1-95m",
"feature_name": "semantic_embedding",
"window_sec": 10.0,
"hop_sec": 5.0,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
},
{
"extraction_job_id": 4,
"feature_set_id": 5,
"model_name": "muq",
"model_version": "large-msd-iter",
"feature_name": "semantic_embedding",
"window_sec": 5.0,
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
},
{
"extraction_job_id": 5,
"feature_set_id": 6,
"model_name": "ecapa",
"model_version": "acr-baseline-v1",
"feature_name": "semantic_embedding",
"window_sec": 5.0,
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
}
],
"counts": {
"feature_extraction_job": 5,
"pending_jobs": 5
}
}
\ No newline at end of file
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_extraction_jobs_report.json'
JOB_SPECS = [
{
'model_name': 'chromaprint',
'model_version': 'v1',
'feature_name': 'fingerprint_asset',
'window_sec': 5.0,
'hop_sec': 2.5,
'target_scope': 'reference_set:phase1_hot_reference_v1',
'job_status': 'pending',
'shard_key': 'phase1/reference/chromaprint/v1',
'metadata_json': {'lane': 'exact', 'phase': 'phase1', 'priority': 'p0'},
},
{
'model_name': 'mert',
'model_version': 'v1-95m',
'feature_name': 'semantic_embedding',
'window_sec': 5.0,
'hop_sec': 2.5,
'target_scope': 'reference_set:phase1_hot_reference_v1',
'job_status': 'pending',
'shard_key': 'phase1/reference/mert/v1-95m/5s_2.5s',
'metadata_json': {'lane': 'semantic', 'role': 'primary_baseline', 'phase': 'phase1'},
},
{
'model_name': 'mert',
'model_version': 'v1-95m',
'feature_name': 'semantic_embedding',
'window_sec': 10.0,
'hop_sec': 5.0,
'target_scope': 'reference_set:phase1_hot_reference_v1',
'job_status': 'pending',
'shard_key': 'phase1/reference/mert/v1-95m/10s_5s',
'metadata_json': {'lane': 'semantic', 'role': 'long_context_validation', 'phase': 'phase1'},
},
{
'model_name': 'muq',
'model_version': 'large-msd-iter',
'feature_name': 'semantic_embedding',
'window_sec': 5.0,
'hop_sec': 2.5,
'target_scope': 'reference_set:phase1_hot_reference_v1',
'job_status': 'pending',
'shard_key': 'phase1/reference/muq/large-msd-iter/5s_2.5s',
'metadata_json': {'lane': 'semantic', 'role': 'challenger', 'phase': 'phase1'},
},
{
'model_name': 'ecapa',
'model_version': 'acr-baseline-v1',
'feature_name': 'semantic_embedding',
'window_sec': 5.0,
'hop_sec': 2.5,
'target_scope': 'reference_set:phase1_hot_reference_v1',
'job_status': 'pending',
'shard_key': 'phase1/reference/ecapa/acr-baseline-v1/5s_2.5s',
'metadata_json': {'lane': 'semantic', 'role': 'historical_baseline', 'phase': 'phase1'},
},
]
def resolve_feature_set_id(conn: psycopg.Connection, job: dict[str, Any]) -> int:
row = conn.execute(
"""
SELECT fs.feature_set_id
FROM feature_set_registry fs
JOIN model_registry mr ON mr.model_id = fs.model_id
WHERE mr.model_name = %s
AND mr.model_version = %s
AND fs.feature_name = %s
AND coalesce(fs.window_sec, -1) = coalesce(%s, -1)
AND coalesce(fs.hop_sec, -1) = coalesce(%s, -1)
ORDER BY fs.feature_set_id
LIMIT 1;
""",
(
job['model_name'],
job['model_version'],
job['feature_name'],
job['window_sec'],
job['hop_sec'],
),
).fetchone()
if not row:
raise RuntimeError(
f"Feature set not found for {job['model_name']} {job['model_version']} {job['feature_name']} {job['window_sec']}/{job['hop_sec']}"
)
return int(row[0])
def ensure_job(conn: psycopg.Connection, feature_set_id: int, job: dict[str, Any]) -> tuple[int, str]:
existing = conn.execute(
"""
SELECT extraction_job_id
FROM feature_extraction_job
WHERE feature_set_id = %s
AND target_scope = %s
AND coalesce(shard_key, '') = coalesce(%s, '')
ORDER BY extraction_job_id
LIMIT 1;
""",
(feature_set_id, job['target_scope'], job['shard_key']),
).fetchone()
if existing:
conn.execute(
"""
UPDATE feature_extraction_job
SET job_status = %s,
metadata_json = %s::jsonb
WHERE extraction_job_id = %s;
""",
(job['job_status'], json.dumps(job['metadata_json']), existing[0]),
)
return int(existing[0]), 'reused'
row = conn.execute(
"""
INSERT INTO feature_extraction_job (
feature_set_id, target_scope, job_status, shard_key,
input_count, output_count, started_at, finished_at,
log_uri, metadata_json
) VALUES (
%s, %s, %s, %s,
NULL, NULL, NULL, NULL,
NULL, %s::jsonb
)
RETURNING extraction_job_id;
""",
(feature_set_id, job['target_scope'], job['job_status'], job['shard_key'], json.dumps(job['metadata_json'])),
).fetchone()
return int(row[0]), 'inserted'
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
summary: dict[str, Any] = {
'schema': args.schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'jobs': [],
}
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {args.schema}, public;')
for job in JOB_SPECS:
feature_set_id = resolve_feature_set_id(conn, job)
extraction_job_id, operation = ensure_job(conn, feature_set_id, job)
summary['jobs'].append({
'extraction_job_id': extraction_job_id,
'feature_set_id': feature_set_id,
'model_name': job['model_name'],
'model_version': job['model_version'],
'feature_name': job['feature_name'],
'window_sec': job['window_sec'],
'hop_sec': job['hop_sec'],
'target_scope': job['target_scope'],
'job_status': job['job_status'],
'operation': operation,
})
summary['counts'] = {
'feature_extraction_job': int(conn.execute('SELECT count(*) FROM feature_extraction_job;').fetchone()[0]),
'pending_jobs': int(conn.execute("SELECT count(*) FROM feature_extraction_job WHERE job_status = 'pending';").fetchone()[0]),
}
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(summary, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
## 2026-06-04
- 新增 `acr-engine/scripts/bootstrap_phase1_extraction_jobs_live.py``acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json`,把 Phase-1 的 `feature_extraction_job` 初始化做成可直接连 PostgreSQL 的 live 脚本,并已在 `acr_test` schema 真实创建 5 条 pending jobs。
- 补充 `phase1_registry_bootstrap_idempotency_report.json` 与文档说明,验证 `bootstrap_phase1_model_registry_live.py``acr_test` schema 上连续执行两次后表计数保持稳定,证明 Phase-1 registry bootstrap 具备可重复执行的幂等性。
- 新增 `acr-engine/scripts/bootstrap_phase1_model_registry_live.py``acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_report.json`,把 Phase-1 的 `chromaprint / mert / muq / ecapa` 与对应 `feature_set_registry / reference_set_registry` 初始化做成可直接连 PostgreSQL 的 live bootstrap 脚本,并已在 `acr_test` schema 验证通过。
- 补充文档阻塞事实:当前容器里缺少 `/workspace/downloads`,因此本轮无法直接从业务样本目录继续生成 `type_8 / type_16` 的 live PostgreSQL query JSONL;已把该环境前提写入 handoff 与 PostgreSQL 样例文档。
......
......@@ -300,3 +300,47 @@ cd /workspace/acr-engine
结论:
> 当前 bootstrap 脚本可重复执行,不会把 Phase-1 registry 数据重复灌爆。
---
## 9. Phase-1 extraction job bootstrap
`model_registry / feature_set_registry / reference_set_registry` 都已经存在后,下一步不是立刻手工跑抽特征,而是先把 **待执行 job** 写到 `feature_extraction_job`
本仓库现在已经提供:
- `acr-engine/scripts/bootstrap_phase1_extraction_jobs_live.py`
用途:
- 根据已存在的 `feature_set_registry`
-`phase1_hot_reference_v1` 生成待执行 extraction jobs
- 把 Phase-1 的 exact / semantic lanes 统一放进 PostgreSQL job 表
### 9.1 执行命令
```bash
cd /workspace/acr-engine
/usr/local/miniconda3/bin/python scripts/bootstrap_phase1_extraction_jobs_live.py \
--dsn 'postgres://d2:d2pass@127.0.0.1:5432/d2' \
--schema acr_test \
--output data/pgvector_eval/music20/phase1_extraction_jobs_report.json
```
### 9.2 当前已验证结果(acr_test)
本轮已真实创建 5 条待执行 job:
| lane | model | feature | target_scope | status |
|---|---|---|---|---|
| exact | `chromaprint` | `fingerprint_asset` | `reference_set:phase1_hot_reference_v1` | `pending` |
| semantic | `mert` | `semantic_embedding` 5s/2.5s | `reference_set:phase1_hot_reference_v1` | `pending` |
| semantic | `mert` | `semantic_embedding` 10s/5s | `reference_set:phase1_hot_reference_v1` | `pending` |
| semantic | `muq` | `semantic_embedding` 5s/2.5s | `reference_set:phase1_hot_reference_v1` | `pending` |
| semantic | `ecapa` | `semantic_embedding` 5s/2.5s | `reference_set:phase1_hot_reference_v1` | `pending` |
对应 live 报告:
- `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json`
这意味着:
> 现在 PostgreSQL 里已经不只是“模型定义”和“特征定义”,而是连 **下一步该跑哪些抽特征任务** 都已经具备结构化入口了。
......
......@@ -67,6 +67,7 @@
| FAISS 对照报告 | `acr-engine/data/pgvector_eval/music20/songid_eval_report_fresh.json` |
| registry bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_report.json` |
| registry bootstrap 幂等性报告 | `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_idempotency_report.json` |
| extraction job bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json` |
| 历史对照报告 | `acr-engine/data/pgvector_eval/music20/songid_eval_report.json` |
---
......@@ -399,6 +400,22 @@ flowchart LR
对应 live 报告:
- `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_report.json`
### 本轮继续新增:Phase-1 extraction jobs 已可 live bootstrap
在 registry bootstrap 之后,本轮又新增:
- `acr-engine/scripts/bootstrap_phase1_extraction_jobs_live.py`
它已经在 `acr_test` schema 上真实创建了 5 条 `feature_extraction_job`
- `chromaprint`
- `mert 5s/2.5s`
- `mert 10s/5s`
- `muq 5s/2.5s`
- `ecapa 5s/2.5s`
对应 live 报告:
- `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json`
### 路线 1:继续做 PostgreSQL 工程化
1.`live_pgvector_music20_eval.py` 泛化成:
......
......@@ -183,6 +183,7 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- 机械校验已补齐:`live_pgvector_music20_eval.py``py_compile` 通过,相关变更 `diff --check` 通过
- PostgreSQL `acr_test` schema 上已真实写入 Phase-1 registry bootstrap:`chromaprint / mert / muq / ecapa` + 5 组 feature set + `phase1_hot_reference_v1`
- Phase-1 registry bootstrap 已有幂等性证据:同 schema 连续执行两次后,`model_registry=5 / feature_set_registry=6 / reference_set_registry=2` 保持不变
- PostgreSQL `acr_test` schema 上已真实创建 5 条 `feature_extraction_job`,后续 MERT / MuQ 接入可直接从 pending jobs 启动
### 未验证 / 仍是缺口
- **未实际跑 MERT / MuQ encoder-only 特征抽取**
......