Commit b4f304c1 b4f304c1f741fbc72e44d617f1d9cc5574ca9e43 by cnb.bofCdSsphPA

Harden the Phase-1 worker contract before real extractors land

Constraint: planner outputs must be copy-runnable in the current environment and live PostgreSQL entrypoints must treat schema input as untrusted.
Rejected: defer state guards until real inference arrives | rejected because repeat execution and empty-scope drift would corrupt Phase-1 evidence now.
Confidence: high
Scope-risk: moderate
Directive: keep using the guarded job contract (expected status, schema validation, explicit python path) when replacing dry-run with real writes.
Tested: py_compile for live bootstrap/planner/worker scripts; live PostgreSQL bootstrap for model registry, reference members, and extraction jobs; regenerated extraction plan report; chromaprint + mert dry-run worker runs with scope=20; double-claim guard report returns non-zero; architect review APPROVED.
Not-tested: real fingerprint writes, real embedding writes, large-scale production reference-set ingestion beyond the 20-song acr_test sample.
1 parent 1b1096ae
{
"schema": "acr_test",
"dsn_redacted": "postgres://d2:***@127.0.0.1:5432/d2",
"set_name": "phase1_hot_reference_v1",
"member_role": "hot_reference",
"inserted_recording_ids": [],
"reused_recording_ids": [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11,
12,
13,
14,
15,
16,
17,
18,
19,
20
],
"reference_set_id": 2,
"counts": {
"reference_recordings_seen": 20,
"inserted_members": 0,
"reused_members": 20,
"member_count": 20,
"ready_asset_count": 20,
"active_window_count": 20
}
}
\ No newline at end of file
......@@ -40,16 +40,16 @@
"scope_value": "phase1_hot_reference_v1",
"reference_set_id": 2,
"reference_set_name": "phase1_hot_reference_v1",
"recording_count": 0,
"ready_asset_count": 0,
"active_window_count": 0
"recording_count": 20,
"ready_asset_count": 20,
"active_window_count": 20
},
"status_after_start": {
"extraction_job_id": 1,
"job_status": "running",
"input_count": 0,
"input_count": 20,
"output_count": null,
"started_at": "2026-06-04T13:02:56.589356+08:00",
"started_at": "2026-06-04T13:17:25.914513+08:00",
"finished_at": null,
"log_uri": null,
"metadata_json": {
......@@ -63,21 +63,21 @@
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 0,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
"active_window_count": 20
}
}
},
"status_after_complete": {
"extraction_job_id": 1,
"job_status": "completed",
"input_count": 0,
"input_count": 20,
"output_count": 0,
"started_at": "2026-06-04T13:02:56.589356+08:00",
"finished_at": "2026-06-04T13:02:56.591597+08:00",
"started_at": "2026-06-04T13:17:25.914513+08:00",
"finished_at": "2026-06-04T13:17:25.915231+08:00",
"log_uri": null,
"metadata_json": {
"lane": "exact",
......@@ -92,11 +92,11 @@
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 0,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
"active_window_count": 20
}
}
},
......
{
"double_claim_exit_code": 1,
"stdout": "",
"stderr": "failed to update feature_extraction_job=1 with expected_status=pending\n",
"expected_result": "non_zero_exit_due_to_expected_status_guard"
}
\ No newline at end of file
......@@ -39,16 +39,16 @@
"scope_value": "phase1_hot_reference_v1",
"reference_set_id": 2,
"reference_set_name": "phase1_hot_reference_v1",
"recording_count": 0,
"ready_asset_count": 0,
"active_window_count": 0
"recording_count": 20,
"ready_asset_count": 20,
"active_window_count": 20
},
"status_after_start": {
"extraction_job_id": 2,
"job_status": "running",
"input_count": 0,
"input_count": 20,
"output_count": null,
"started_at": "2026-06-04T13:02:56.714882+08:00",
"started_at": "2026-06-04T13:17:26.054365+08:00",
"finished_at": null,
"log_uri": null,
"metadata_json": {
......@@ -63,21 +63,21 @@
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 0,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
"active_window_count": 20
}
}
},
"status_after_complete": {
"extraction_job_id": 2,
"job_status": "completed",
"input_count": 0,
"input_count": 20,
"output_count": 0,
"started_at": "2026-06-04T13:02:56.714882+08:00",
"finished_at": "2026-06-04T13:02:56.715469+08:00",
"started_at": "2026-06-04T13:17:26.054365+08:00",
"finished_at": "2026-06-04T13:17:26.055184+08:00",
"log_uri": null,
"metadata_json": {
"lane": "semantic",
......@@ -93,11 +93,11 @@
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 0,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
"active_window_count": 20
}
}
},
......
{
"worker": "mark_job_status",
"schema": "acr_test",
"job_id": 1,
"update": {
"extraction_job_id": 1,
"job_status": "pending",
"input_count": 0,
"input_count": 20,
"output_count": 0,
"started_at": "2026-06-04T13:02:56.589356+08:00",
"finished_at": "2026-06-04T13:02:56.591597+08:00",
"started_at": "2026-06-04T13:17:25.914513+08:00",
"finished_at": "2026-06-04T13:17:25.915231+08:00",
"log_uri": null,
"metadata_json": {
"lane": "exact",
"phase": "phase1",
"priority": "p0"
"worker": "run_chromaprint_job",
"dry_run": true,
"priority": "p0",
"output_target": "audio_fingerprint",
"dry_run_result": "completed_without_feature_write",
"execution_mode": "dry_run",
"write_target_table": "audio_fingerprint",
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 20,
"reference_set_id": 2,
"ready_asset_count": 20,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 20
}
}
}
}
\ No newline at end of file
......
......@@ -4,11 +4,17 @@ from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_extraction_jobs_report.json'
JOB_SPECS = [
......@@ -117,6 +123,11 @@ def ensure_job(conn: psycopg.Connection, feature_set_id: int, job: dict[str, Any
"""
UPDATE feature_extraction_job
SET job_status = %s,
input_count = NULL,
output_count = NULL,
started_at = NULL,
finished_at = NULL,
log_uri = NULL,
metadata_json = %s::jsonb
WHERE extraction_job_id = %s;
""",
......@@ -148,14 +159,15 @@ def main() -> None:
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
schema = validate_schema(args.schema)
summary: dict[str, Any] = {
'schema': args.schema,
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'jobs': [],
}
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {args.schema}, public;')
conn.execute(f'SET search_path TO {schema}, public;')
for job in JOB_SPECS:
feature_set_id = resolve_feature_set_id(conn, job)
extraction_job_id, operation = ensure_job(conn, feature_set_id, job)
......
......@@ -4,11 +4,17 @@ from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_registry_bootstrap_report.json'
MODELS = [
......@@ -360,9 +366,10 @@ def main() -> None:
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
schema = validate_schema(args.schema)
summary: dict[str, Any] = {
'schema': args.schema,
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'models': [],
'feature_sets': [],
......@@ -370,7 +377,7 @@ def main() -> None:
}
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {args.schema}, public;')
conn.execute(f'SET search_path TO {schema}, public;')
model_ids: dict[tuple[str, str], int] = {}
for model in MODELS:
model_id, operation = upsert_model(conn, model)
......
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_reference_member_bootstrap_report.json'
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', required=True)
ap.add_argument('--schema', default='acr_test')
ap.add_argument('--set-name', default='phase1_hot_reference_v1')
ap.add_argument('--member-role', default='hot_reference')
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
schema = validate_schema(args.schema)
summary: dict[str, object] = {
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'set_name': args.set_name,
'member_role': args.member_role,
'inserted_recording_ids': [],
'reused_recording_ids': [],
}
with psycopg.connect(args.dsn, autocommit=True) as conn:
conn.execute(f'SET search_path TO {schema}, public;')
ref_row = conn.execute(
'SELECT reference_set_id FROM reference_set_registry WHERE set_name = %s LIMIT 1;',
(args.set_name,),
).fetchone()
if not ref_row:
raise RuntimeError(f'reference_set_registry.set_name not found: {args.set_name}')
reference_set_id = int(ref_row[0])
summary['reference_set_id'] = reference_set_id
recordings = conn.execute(
"""
SELECT recording_id
FROM recording
WHERE is_reference = TRUE
ORDER BY recording_id;
"""
).fetchall()
for row in recordings:
recording_id = int(row[0])
existing = conn.execute(
"""
SELECT 1
FROM reference_set_member
WHERE reference_set_id = %s
AND recording_id = %s
LIMIT 1;
""",
(reference_set_id, recording_id),
).fetchone()
if existing:
summary['reused_recording_ids'].append(recording_id)
continue
conn.execute(
"""
INSERT INTO reference_set_member (reference_set_id, recording_id, member_role)
VALUES (%s, %s, %s);
""",
(reference_set_id, recording_id, args.member_role),
)
summary['inserted_recording_ids'].append(recording_id)
counts = conn.execute(
"""
SELECT
count(*) AS member_count,
count(DISTINCT ra.asset_id) FILTER (WHERE ra.ingest_status = 'ready') AS ready_asset_count,
count(DISTINCT aw.window_id) FILTER (WHERE aw.active_for_index) AS active_window_count
FROM reference_set_member rsm
LEFT JOIN recording_asset ra ON ra.recording_id = rsm.recording_id
LEFT JOIN audio_window aw ON aw.recording_id = rsm.recording_id
WHERE rsm.reference_set_id = %s;
""",
(reference_set_id,),
).fetchone()
summary['counts'] = {
'reference_recordings_seen': len(recordings),
'inserted_members': len(summary['inserted_recording_ids']),
'reused_members': len(summary['reused_recording_ids']),
'member_count': int(counts[0]),
'ready_asset_count': int(counts[1]),
'active_window_count': int(counts[2]),
}
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding='utf-8')
print(json.dumps(summary, ensure_ascii=False, indent=2))
if __name__ == '__main__':
main()
......@@ -7,11 +7,17 @@ from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from statistics import median
import sys
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_SCHEMA_SQL = ROOT / 'sql' / 'acr_pg_schema_v2.sql'
DEFAULT_REFERENCE = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'reference_embeddings.jsonl'
DEFAULT_QUERY = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'query_embeddings.jsonl'
......@@ -84,6 +90,7 @@ def aggregate_song_scores(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
def reset_schema(conn: psycopg.Connection, schema: str) -> None:
schema = validate_schema(schema)
conn.execute(f'DROP SCHEMA IF EXISTS {schema} CASCADE;')
conn.execute(f'CREATE SCHEMA {schema};')
conn.execute(f'SET search_path TO {schema}, public;')
......@@ -455,16 +462,17 @@ def main() -> None:
ap.add_argument('--topk', type=int, default=10)
ap.add_argument('--reset-schema', action='store_true')
args = ap.parse_args()
schema = validate_schema(args.schema)
refs = load_jsonl(Path(args.reference_embeddings_jsonl))
queries = load_jsonl(Path(args.query_embeddings_jsonl))
with psycopg.connect(args.dsn, autocommit=True) as conn:
if args.reset_schema:
reset_schema(conn, args.schema)
reset_schema(conn, schema)
else:
conn.execute(f'CREATE SCHEMA IF NOT EXISTS {args.schema};')
conn.execute(f'SET search_path TO {args.schema}, public;')
conn.execute(f'CREATE SCHEMA IF NOT EXISTS {schema};')
conn.execute(f'SET search_path TO {schema}, public;')
apply_schema(conn, Path(args.schema_sql))
model_id, feature_set_id, reference_set_id, retrieval_index_id = seed_registry(conn)
entities = ingest_references(conn, refs, feature_set_id, reference_set_id)
......@@ -483,7 +491,7 @@ def main() -> None:
}
payload = {
'schema': args.schema,
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'input': {
'reference_embeddings_jsonl': args.reference_embeddings_jsonl,
......
......@@ -4,11 +4,17 @@ from __future__ import annotations
import argparse
import json
from pathlib import Path
import sys
from typing import Any
import psycopg
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from workers._job_common import validate_schema
DEFAULT_OUTPUT = ROOT / 'data' / 'pgvector_eval' / 'music20' / 'phase1_extraction_plan_report.json'
LANE_PRIORITY = {
......@@ -16,6 +22,7 @@ LANE_PRIORITY = {
'semantic': 1,
'cover': 2,
}
PYTHON_BIN = '/usr/local/miniconda3/bin/python'
def parse_target_scope(target_scope: str) -> dict[str, Any]:
......@@ -26,8 +33,10 @@ def parse_target_scope(target_scope: str) -> dict[str, Any]:
def build_command_suggestions(job: dict[str, Any], schema: str) -> list[str]:
command_prefix = 'cd /workspace/acr-engine && '
base_env = (
'PG_DSN="${PG_DSN:?set PG_DSN}" '
command_prefix
+ 'PG_DSN="${PG_DSN:?set PG_DSN}" '
f"EXTRACTION_JOB_ID={job['extraction_job_id']} "
f"FEATURE_SET_ID={job['feature_set_id']} "
f"TARGET_SCOPE='{job['target_scope']}' "
......@@ -37,16 +46,16 @@ def build_command_suggestions(job: dict[str, Any], schema: str) -> list[str]:
if job['lane'] == 'exact':
commands.append(
base_env
+ " OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py"
+ f" OUTPUT_TARGET=audio_fingerprint \\\n{PYTHON_BIN} workers/run_chromaprint_job.py --complete-dry-run"
)
else:
commands.append(
base_env
+ f" MODEL_NAME={job['model_name']} MODEL_VERSION={job['model_version']} VECTOR_TABLE={job['vector_table']} OUTPUT_TARGET={job['physical_target']} \\\npython workers/run_embedding_job.py"
+ f" MODEL_NAME={job['model_name']} MODEL_VERSION={job['model_version']} VECTOR_TABLE={job['vector_table']} OUTPUT_TARGET={job['physical_target']} \\\n{PYTHON_BIN} workers/run_embedding_job.py --complete-dry-run"
)
commands.append(
base_env
+ " \\\npython workers/mark_job_status.py --status running"
+ f" \\\n{PYTHON_BIN} workers/mark_job_status.py --status running --expected-status pending"
)
return commands
......@@ -59,8 +68,9 @@ def main() -> None:
ap.add_argument('--output', default=str(DEFAULT_OUTPUT))
args = ap.parse_args()
schema = validate_schema(args.schema)
with psycopg.connect(args.dsn) as conn:
conn.execute(f'SET search_path TO {args.schema}, public;')
conn.execute(f'SET search_path TO {schema}, public;')
rows = conn.execute(
"""
SELECT
......@@ -138,7 +148,7 @@ def main() -> None:
f"target scope: {row[2]}",
],
}
item['command_suggestions'] = build_command_suggestions(item, args.schema)
item['command_suggestions'] = build_command_suggestions(item, schema)
jobs.append(item)
by_lane.setdefault(lane, []).append(item)
......@@ -147,7 +157,7 @@ def main() -> None:
lane_jobs.sort(key=lambda x: x['extraction_job_id'])
payload = {
'schema': args.schema,
'schema': schema,
'dsn_redacted': 'postgres://d2:***@127.0.0.1:5432/d2',
'job_status_filter': args.job_status,
'counts': {
......
......@@ -10,6 +10,7 @@ from typing import Any
import psycopg
SCHEMA_RE = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')
ALLOWED_JOB_STATUSES = {'pending', 'running', 'completed', 'failed'}
@dataclass
......@@ -177,6 +178,7 @@ def update_job_status(
extraction_job_id: int,
*,
status: str,
expected_status: str | None = None,
input_count: int | None = None,
output_count: int | None = None,
log_uri: str | None = None,
......@@ -184,6 +186,10 @@ def update_job_status(
set_started_at: bool = False,
set_finished_at: bool = False,
) -> dict[str, Any]:
if status not in ALLOWED_JOB_STATUSES:
raise SystemExit(f'invalid job status: {status}')
if expected_status is not None and expected_status not in ALLOWED_JOB_STATUSES:
raise SystemExit(f'invalid expected job status: {expected_status}')
patch = json.dumps(metadata_patch or {}, ensure_ascii=False)
row = conn.execute(
"""
......@@ -197,11 +203,12 @@ def update_job_status(
ELSE started_at
END,
finished_at = CASE
WHEN %s THEN NOW()
WHEN %s THEN COALESCE(finished_at, NOW())
ELSE finished_at
END,
metadata_json = COALESCE(metadata_json, '{}'::jsonb) || %s::jsonb
WHERE extraction_job_id = %s
AND (%s OR job_status = %s)
RETURNING extraction_job_id, job_status, input_count, output_count, started_at, finished_at, log_uri, metadata_json;
""",
(
......@@ -213,10 +220,13 @@ def update_job_status(
set_finished_at,
patch,
extraction_job_id,
expected_status is None,
expected_status,
),
).fetchone()
if not row:
raise SystemExit(f'failed to update feature_extraction_job={extraction_job_id}')
expectation = f' with expected_status={expected_status}' if expected_status else ''
raise SystemExit(f'failed to update feature_extraction_job={extraction_job_id}{expectation}')
return {
'extraction_job_id': int(row[0]),
'job_status': row[1],
......
......@@ -5,15 +5,16 @@ import argparse
import json
import os
from _job_common import connect, emit_payload, require_env, update_job_status
from _job_common import ALLOWED_JOB_STATUSES, connect, emit_payload, update_job_status
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(require_env('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--status', required=True)
ap.add_argument('--job-id', type=int)
ap.add_argument('--status', required=True, choices=sorted(ALLOWED_JOB_STATUSES))
ap.add_argument('--expected-status', choices=sorted(ALLOWED_JOB_STATUSES))
ap.add_argument('--input-count', type=int)
ap.add_argument('--output-count', type=int)
ap.add_argument('--log-uri')
......@@ -25,7 +26,8 @@ def main() -> None:
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
job_id = args.job_id or int(os.environ.get('EXTRACTION_JOB_ID', '0'))
if not job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
metadata_patch = json.loads(args.metadata_json) if args.metadata_json else {}
......@@ -33,8 +35,9 @@ def main() -> None:
with connect(args.dsn, args.schema) as conn:
updated = update_job_status(
conn,
args.job_id,
job_id,
status=args.status,
expected_status=args.expected_status,
input_count=args.input_count,
output_count=args.output_count,
log_uri=args.log_uri,
......@@ -47,6 +50,7 @@ def main() -> None:
{
'worker': 'mark_job_status',
'schema': args.schema,
'job_id': job_id,
'update': updated,
},
args.output,
......
......@@ -31,6 +31,7 @@ def main() -> None:
conn,
job.extraction_job_id,
status='running',
expected_status='pending',
input_count=scope['ready_asset_count'],
metadata_patch={
'worker': 'run_chromaprint_job',
......@@ -47,6 +48,7 @@ def main() -> None:
conn,
job.extraction_job_id,
status='completed',
expected_status='running',
output_count=0,
metadata_patch={
'worker': 'run_chromaprint_job',
......
......@@ -34,6 +34,11 @@ def main() -> None:
job = fetch_job_context(conn, args.job_id)
if job.model_name == 'chromaprint':
raise SystemExit(f'feature_extraction_job={args.job_id} is not an embedding job')
if job.feature_name != 'semantic_embedding' or job.feature_level != 'window':
raise SystemExit(
f'feature_extraction_job={args.job_id} does not match embedding feature contract: '
f'{job.feature_name}/{job.feature_level}'
)
if args.model_name and job.model_name != args.model_name:
raise SystemExit(f'model mismatch: job={job.model_name} cli={args.model_name}')
if args.model_version and job.model_version != args.model_version:
......@@ -44,6 +49,7 @@ def main() -> None:
conn,
job.extraction_job_id,
status='running',
expected_status='pending',
input_count=scope['active_window_count'] or scope['ready_asset_count'],
metadata_patch={
'worker': 'run_embedding_job',
......@@ -61,6 +67,7 @@ def main() -> None:
conn,
job.extraction_job_id,
status='completed',
expected_status='running',
output_count=0,
metadata_patch={
'worker': 'run_embedding_job',
......
## 2026-06-04
- 新增 `bootstrap_phase1_reference_members_live.py``phase1_reference_member_bootstrap_report.json`,把 `acr_test``recording.is_reference=true` 的 20 条录音真实挂到 `phase1_hot_reference_v1`,使 worker dry-run 的 scope 从 `0` 提升为 `20 recordings / 20 assets / 20 windows`
- 根据 architect 复核修正 worker contract:`mark_job_status.py` 现支持真正的“CLI 覆盖 env”并限制状态白名单;`_job_common.update_job_status()` 新增前置状态约束并防止 `finished_at` 被重复覆盖;`bootstrap_phase1_extraction_jobs_live.py` 在恢复 pending 时会清空旧时间戳与计数;`run_embedding_job.py` 对 embedding job 契约做了更严格校验。
- 修正 `plan_phase1_extraction_jobs_live.py`:新增 schema 校验,命令模板显式锚定 `cd /workspace/acr-engine &&`,并把 `--complete-dry-run``--expected-status pending` 带入生成的命令,避免 planner 产物“看起来能跑但实际上缺关键上下文/步骤”。
- 新增 `phase1_worker_double_claim_guard_report.json`,通过对同一 chromaprint job 连续执行两次 dry-run 验证前置状态保护已生效:第二次执行被 `expected_status=pending` 明确拒绝。
-`validate_schema()` 统一推广到 `bootstrap_phase1_model_registry_live.py``bootstrap_phase1_reference_members_live.py``bootstrap_phase1_extraction_jobs_live.py``live_pgvector_music20_eval.py`,补齐整个 PostgreSQL live CLI 链的 schema 参数保护。
- 新增 [Phase-1 Worker Contract](./phase1-worker-contract.md)`acr-engine/workers/_job_common.py``mark_job_status.py``run_chromaprint_job.py``run_embedding_job.py`,把 Phase-1 从“只有 planner 命令模板”推进到“worker 可以真实消费 PostgreSQL 的 `feature_extraction_job` 并执行 `pending -> running -> completed` dry-run 状态流转”的阶段。
- 新增 `phase1_worker_chromaprint_dry_run.json``phase1_worker_embedding_dry_run.json``phase1_worker_mark_pending_report.json`,并在 live PostgreSQL `acr_test` 上验证了 worker 状态流转;同时确认当前 `phase1_hot_reference_v1` 还没有实际 members,因此 scope 计数为 `0`,这是数据未装载而不是 worker 失败
- 新增 `phase1_worker_chromaprint_dry_run.json``phase1_worker_embedding_dry_run.json``phase1_worker_mark_pending_report.json`,并在 live PostgreSQL `acr_test` 上验证了 worker 状态流转;初次 dry-run 曾暴露 `phase1_hot_reference_v1` 缺少实际 members,随后已在同日补齐到 `20` 个 members
- 修正 `plan_phase1_extraction_jobs_live.py` 的命令模板,把 `PG_DSN=\"${PG_DSN:?set PG_DSN}\"` 显式写入 `command_suggestions / primary_command`,避免 planner 产物看起来可跑但实际缺少数据库连接串。
- 更新 `plan_phase1_extraction_jobs_live.py``phase1_extraction_plan_report.json`,把 Phase-1 execution plan 从“仅有排序计划”推进到“附带 `command_suggestions / primary_command` 的可复制执行命令模板”。
- 新增 `acr-engine/scripts/plan_phase1_extraction_jobs_live.py``acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json`,支持从 PostgreSQL 的 `feature_extraction_job` 真实读取 pending jobs,并联表生成按 lane / priority 排序的 Phase-1 execution plan。
......
......@@ -223,6 +223,7 @@ flowchart TD
当前已经不只是 registry/bootstrap 了,还补上了最小真实 worker 执行面:
- `acr-engine/scripts/bootstrap_phase1_reference_members_live.py`
- `acr-engine/workers/mark_job_status.py`
- `acr-engine/workers/run_chromaprint_job.py`
- `acr-engine/workers/run_embedding_job.py`
......@@ -437,15 +438,15 @@ cd /workspace/acr-engine
#### exact lane
```bash
EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \
python workers/run_chromaprint_job.py
cd /workspace/acr-engine && PG_DSN="${PG_DSN:?set PG_DSN}" EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \
/usr/local/miniconda3/bin/python workers/run_chromaprint_job.py --complete-dry-run
```
#### semantic lane
```bash
EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \
python workers/run_embedding_job.py
cd /workspace/acr-engine && PG_DSN="${PG_DSN:?set PG_DSN}" EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \
/usr/local/miniconda3/bin/python workers/run_embedding_job.py --complete-dry-run
```
这意味着下个 session 不需要先手工拼环境变量和 job 绑定关系,而可以直接从 planner 报告里复制命令模板。
......
......@@ -26,6 +26,7 @@
位于:
- `acr-engine/scripts/bootstrap_phase1_reference_members_live.py`
- `acr-engine/workers/mark_job_status.py`
- `acr-engine/workers/run_chromaprint_job.py`
- `acr-engine/workers/run_embedding_job.py`
......@@ -40,6 +41,21 @@
| `run_embedding_job.py` | semantic lane worker |
| `_job_common.py` | 共享的 job 读取、scope 解析、状态回写逻辑 |
### 配套 bootstrap
为了让 worker 不再面对空 scope,这轮还补上了:
- `acr-engine/scripts/bootstrap_phase1_reference_members_live.py`
它会把当前 `recording.is_reference = true` 的录音挂到:
- `phase1_hot_reference_v1`
这样 worker 可以真实看到:
- `recording_count`
- `ready_asset_count`
- `active_window_count`
---
## 2. 当前状态机
......@@ -56,6 +72,30 @@ flowchart LR
- `pending -> running`
- `running -> completed`(dry-run 模式)
### 当前状态保护
- worker 认领 job 时要求前置状态为 `pending`
- worker 完成 job 时要求前置状态为 `running`
- `mark_job_status.py` 只接受:
- `pending`
- `running`
- `completed`
- `failed`
- `finished_at` 只在首次完成时落值,不再被重复覆盖
### 已验证的 guard 行为
当前已真实验证:
1. 同一 chromaprint job 第一次 dry-run:
- 成功 `pending -> running -> completed`
2. 不做 reset,直接第二次执行同一 job:
- 被前置状态保护拒绝
对应证据:
- `acr-engine/data/pgvector_eval/music20/phase1_worker_double_claim_guard_report.json`
### 设计意图
先把 **作业契约与状态流转** 固定住,再把真正的模型推理塞进去。
......@@ -94,11 +134,31 @@ flowchart LR
`plan_phase1_extraction_jobs_live.py` 现在会显式生成:
```bash
PG_DSN="${PG_DSN:?set PG_DSN}" ...
cd /workspace/acr-engine && PG_DSN="${PG_DSN:?set PG_DSN}" ...
```
这样复制命令时,如果调用方忘了提供数据库连接串,会立刻失败,而不是静默跑空。
当前 planner 还会显式使用:
```bash
/usr/local/miniconda3/bin/python
```
原因是当前环境里 `python` 不在 PATH 上,但这个解释器路径已被验证可用。
对于当前 dry-run worker,planner 的主命令模板也会显式带上:
```bash
--complete-dry-run
```
这样 `primary_command` 就能直接复现:
```text
pending -> running -> completed
```
---
## 4. PostgreSQL 读取契约
......
......@@ -69,9 +69,11 @@
| registry bootstrap 幂等性报告 | `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_idempotency_report.json` |
| extraction job bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json` |
| extraction plan 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json` |
| reference member bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_reference_member_bootstrap_report.json` |
| chromaprint worker dry-run 报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_chromaprint_dry_run.json` |
| embedding worker dry-run 报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_dry_run.json` |
| job status 手工回写报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_mark_pending_report.json` |
| double-claim guard 报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_double_claim_guard_report.json` |
| 历史对照报告 | `acr-engine/data/pgvector_eval/music20/songid_eval_report.json` |
---
......@@ -309,6 +311,13 @@ flowchart TD
- 把 job 状态恢复为 `pending`
- 保证后续 session 可以从同一批 jobs 继续推进
4. `plan_phase1_extraction_jobs_live.py`
- 当前生成的主命令模板已显式带:
- `cd /workspace/acr-engine &&`
- `PG_DSN="${PG_DSN:?set PG_DSN}"`
- `--complete-dry-run`
- 因此 `primary_command` 已经可以直接复现当前 dry-run 状态流转
### 为什么先做 dry-run
因为当前第一优先级是把下面这些东西固定住:
......@@ -324,21 +333,51 @@ flowchart TD
接进去,整体风险更低。
### 当前 live 结果的一个关键解释
### 当前 live 结果的关键更新
本轮已经新增:
本轮 worker dry-run 里,`phase1_hot_reference_v1` 已经存在,但在 `acr_test` schema 里**还没有实际 member**,所以:
- `acr-engine/scripts/bootstrap_phase1_reference_members_live.py`
- `recording_count=0`
- `ready_asset_count=0`
- `active_window_count=0`
并已把 `acr_test.phase1_hot_reference_v1` 真实挂上 `20` 条 reference recordings,因此当前 worker dry-run 看到的 scope 已变成:
这不是 worker 异常,而是当前 Phase-1 live 数据面还没把业务 reference recordings 真实装进去。
因此这轮验证证明的是:
- `recording_count=20`
- `ready_asset_count=20`
- `active_window_count=20`
这说明当前验证已经从“空 scope 状态机演示”推进到:
- planner -> worker 命令兼容
- worker -> PostgreSQL 状态流转可用
- reference_set -> recording/asset/window scope 解析可用
仍然要注意:
- 这依然是 **dry-run**
-**不是**真实特征抽取吞吐验证
### 当前并发/重试保护验证
本轮还额外做了一个故意的重复执行测试:
1. 先让 `feature_extraction_job=1``pending -> running -> completed`
2. 不做 reset,直接再次执行同一个 chromaprint dry-run worker
3. 预期第二次执行失败,因为 worker 认领 job 时要求:
- `expected_status = pending`
实际结果见:
- `phase1_worker_double_claim_guard_report.json`
关键证据:
- `double_claim_exit_code = 1`
- `stderr = failed to update feature_extraction_job=1 with expected_status=pending`
这证明当前最小 worker contract 已经具备:
**不是**真实特征抽取吞吐验证。
- 基础 claim guard
- 基础重复执行保护
- `type_7`
因此:
......
......@@ -189,12 +189,13 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- extraction plan 报告里已包含 `command_suggestions / primary_command`,下次可直接从 plan 抄 worker 命令模板
- Phase-1 worker 入口已真实落地:`run_chromaprint_job.py / run_embedding_job.py / mark_job_status.py`
- 下一阶段已经不是“补 planner”,而是把 dry-run worker 替换为真实 extractor,并把 `audio_fingerprint / audio_embedding` 写入做成幂等执行
- 最新 live worker 证据表明:`phase1_hot_reference_v1` 当前在 `acr_test` 里还没有实际 members,所以 dry-run 已验证状态机,但 scope 计数仍是 `0`
- `phase1_hot_reference_v1``acr_test` 里已经真实补齐 `20` 个 reference members,因此 worker dry-run 当前看到的 scope 已是 `20 recordings / 20 assets / 20 windows`
- worker contract 现在已有基础前置状态保护;重复执行同一 chromaprint dry-run job 会被 `expected_status=pending` 明确拒绝,证据见 `phase1_worker_double_claim_guard_report.json`
### 未验证 / 仍是缺口
- **未实际跑 MERT / MuQ encoder-only 特征抽取**
- **worker 目前仍以 dry-run 为主,尚未写真实 `audio_fingerprint / audio_embedding`**
- **未落 reference set 的真实业务数据**
- **还未落更大规模的生产 reference set 真实业务数据(当前仅验证了 `acr_test` 下的 20-song live members)**
- **未定义最终线上分数融合细则**
- **type_8 / type_16 还没有进入当前 live JSONL 的 PostgreSQL 实测链**
- **当前容器里缺少 `/workspace/downloads`,因此暂时无法直接从业务样本目录继续补 type_8 / type_16 live query**
......