Commit 1b1096ae 1b1096ae768a14efef8d211141864cd01e76eb4a by cnb.bofCdSsphPA

Make Phase-1 extraction jobs executable through PostgreSQL workers

Constraint: Phase-1 must stay encoder-only and use PostgreSQL as the orchestration/state plane before real extractor inference lands.
Rejected: implement real MERT/MuQ inference first | rejected because planner/job/state contracts were not yet executable or verified end-to-end.
Confidence: high
Scope-risk: moderate
Directive: preserve the worker job contract and replace dry-run incrementally with real fingerprint/embedding writes.
Tested: py_compile for new workers and planner; live PostgreSQL dry-run for chromaprint job 1 and mert job 2; planner report regeneration; bootstrap restore to pending; git diff --check.
Not-tested: real chromaprint extraction, real MERT/MuQ/ECAPA embedding writes, failed-job retry handling.
1 parent 06794812
......@@ -12,7 +12,7 @@
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
"operation": "reused"
},
{
"extraction_job_id": 2,
......@@ -24,7 +24,7 @@
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
"operation": "reused"
},
{
"extraction_job_id": 3,
......@@ -36,7 +36,7 @@
"hop_sec": 5.0,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
"operation": "reused"
},
{
"extraction_job_id": 4,
......@@ -48,7 +48,7 @@
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
"operation": "reused"
},
{
"extraction_job_id": 5,
......@@ -60,7 +60,7 @@
"hop_sec": 2.5,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"operation": "inserted"
"operation": "reused"
}
],
"counts": {
......
......@@ -51,8 +51,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py",
"EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -96,8 +96,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -141,8 +141,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -186,8 +186,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=muq MODEL_VERSION=large-msd-iter VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=muq MODEL_VERSION=large-msd-iter VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -231,8 +231,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=ecapa MODEL_VERSION=acr-baseline-v1 VECTOR_TABLE=audio_embedding_vector_192 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=ecapa MODEL_VERSION=acr-baseline-v1 VECTOR_TABLE=audio_embedding_vector_192 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
}
],
......@@ -279,8 +279,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py",
"EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
}
],
......@@ -326,8 +326,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -371,8 +371,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -416,8 +416,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=muq MODEL_VERSION=large-msd-iter VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=muq MODEL_VERSION=large-msd-iter VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
},
{
......@@ -461,8 +461,8 @@
"target scope: reference_set:phase1_hot_reference_v1"
],
"command_suggestions": [
"EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=ecapa MODEL_VERSION=acr-baseline-v1 VECTOR_TABLE=audio_embedding_vector_192 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=ecapa MODEL_VERSION=acr-baseline-v1 VECTOR_TABLE=audio_embedding_vector_192 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py",
"PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test \\\npython workers/mark_job_status.py --status running"
]
}
]
......@@ -477,7 +477,7 @@
"window_sec": 5.0,
"hop_sec": 2.5,
"physical_target": "audio_fingerprint",
"primary_command": "EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py"
"primary_command": "PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=1 FEATURE_SET_ID=2 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test OUTPUT_TARGET=audio_fingerprint \\\npython workers/run_chromaprint_job.py"
},
{
"order": 2,
......@@ -488,7 +488,7 @@
"window_sec": 5.0,
"hop_sec": 2.5,
"physical_target": "audio_embedding",
"primary_command": "EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
"primary_command": "PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=2 FEATURE_SET_ID=3 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
},
{
"order": 3,
......@@ -499,7 +499,7 @@
"window_sec": 10.0,
"hop_sec": 5.0,
"physical_target": "audio_embedding",
"primary_command": "EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
"primary_command": "PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=3 FEATURE_SET_ID=4 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=mert MODEL_VERSION=v1-95m VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
},
{
"order": 4,
......@@ -510,7 +510,7 @@
"window_sec": 5.0,
"hop_sec": 2.5,
"physical_target": "audio_embedding",
"primary_command": "EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=muq MODEL_VERSION=large-msd-iter VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
"primary_command": "PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=4 FEATURE_SET_ID=5 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=muq MODEL_VERSION=large-msd-iter VECTOR_TABLE=audio_embedding_vector_768 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
},
{
"order": 5,
......@@ -521,7 +521,7 @@
"window_sec": 5.0,
"hop_sec": 2.5,
"physical_target": "audio_embedding",
"primary_command": "EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=ecapa MODEL_VERSION=acr-baseline-v1 VECTOR_TABLE=audio_embedding_vector_192 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
"primary_command": "PG_DSN=\"${PG_DSN:?set PG_DSN}\" EXTRACTION_JOB_ID=5 FEATURE_SET_ID=6 TARGET_SCOPE='reference_set:phase1_hot_reference_v1' PG_SCHEMA=acr_test MODEL_NAME=ecapa MODEL_VERSION=acr-baseline-v1 VECTOR_TABLE=audio_embedding_vector_192 OUTPUT_TARGET=audio_embedding \\\npython workers/run_embedding_job.py"
}
]
}
\ No newline at end of file
......
{
"worker": "run_chromaprint_job",
"schema": "acr_test",
"job": {
"extraction_job_id": 1,
"feature_set_id": 2,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"shard_key": "phase1/reference/chromaprint/v1",
"job_metadata": {
"lane": "exact",
"phase": "phase1",
"priority": "p0"
},
"feature_name": "fingerprint_asset",
"feature_level": "asset",
"extraction_granularity": "full_asset",
"window_sec": 5.0,
"hop_sec": 2.5,
"embedding_dim": null,
"distance_metric": "hamming",
"feature_config": {
"lane": "exact",
"index_target": "audio_fingerprint"
},
"model_id": 2,
"model_name": "chromaprint",
"model_version": "v1",
"model_family": "fingerprint",
"input_sample_rate": 16000,
"output_embedding_dim": null,
"model_metadata": {
"lane": "exact",
"note": "exact fingerprint lane baseline",
"phase": "phase1"
}
},
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"reference_set_id": 2,
"reference_set_name": "phase1_hot_reference_v1",
"recording_count": 0,
"ready_asset_count": 0,
"active_window_count": 0
},
"status_after_start": {
"extraction_job_id": 1,
"job_status": "running",
"input_count": 0,
"output_count": null,
"started_at": "2026-06-04T13:02:56.589356+08:00",
"finished_at": null,
"log_uri": null,
"metadata_json": {
"lane": "exact",
"phase": "phase1",
"worker": "run_chromaprint_job",
"dry_run": true,
"priority": "p0",
"output_target": "audio_fingerprint",
"execution_mode": "dry_run",
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"reference_set_id": 2,
"ready_asset_count": 0,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
}
}
},
"status_after_complete": {
"extraction_job_id": 1,
"job_status": "completed",
"input_count": 0,
"output_count": 0,
"started_at": "2026-06-04T13:02:56.589356+08:00",
"finished_at": "2026-06-04T13:02:56.591597+08:00",
"log_uri": null,
"metadata_json": {
"lane": "exact",
"phase": "phase1",
"worker": "run_chromaprint_job",
"dry_run": true,
"priority": "p0",
"output_target": "audio_fingerprint",
"dry_run_result": "completed_without_feature_write",
"execution_mode": "dry_run",
"write_target_table": "audio_fingerprint",
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"reference_set_id": 2,
"ready_asset_count": 0,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
}
}
},
"next_write_target": "audio_fingerprint",
"notes": [
"this worker currently validates planner -> job -> PostgreSQL state flow",
"real chromaprint extraction can replace dry_run while preserving the same job contract"
]
}
\ No newline at end of file
{
"worker": "run_embedding_job",
"schema": "acr_test",
"job": {
"extraction_job_id": 2,
"feature_set_id": 3,
"target_scope": "reference_set:phase1_hot_reference_v1",
"job_status": "pending",
"shard_key": "phase1/reference/mert/v1-95m/5s_2.5s",
"job_metadata": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1"
},
"feature_name": "semantic_embedding",
"feature_level": "window",
"extraction_granularity": "sliding_window",
"window_sec": 5.0,
"hop_sec": 2.5,
"embedding_dim": 768,
"distance_metric": "cosine",
"feature_config": {
"role": "primary_semantic_baseline"
},
"model_id": 3,
"model_name": "mert",
"model_version": "v1-95m",
"model_family": "music_ssl",
"input_sample_rate": 24000,
"output_embedding_dim": 768,
"model_metadata": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1"
}
},
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"reference_set_id": 2,
"reference_set_name": "phase1_hot_reference_v1",
"recording_count": 0,
"ready_asset_count": 0,
"active_window_count": 0
},
"status_after_start": {
"extraction_job_id": 2,
"job_status": "running",
"input_count": 0,
"output_count": null,
"started_at": "2026-06-04T13:02:56.714882+08:00",
"finished_at": null,
"log_uri": null,
"metadata_json": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1",
"worker": "run_embedding_job",
"dry_run": true,
"vector_table": "audio_embedding_vector_768",
"output_target": "audio_embedding",
"execution_mode": "dry_run",
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"reference_set_id": 2,
"ready_asset_count": 0,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
}
}
},
"status_after_complete": {
"extraction_job_id": 2,
"job_status": "completed",
"input_count": 0,
"output_count": 0,
"started_at": "2026-06-04T13:02:56.714882+08:00",
"finished_at": "2026-06-04T13:02:56.715469+08:00",
"log_uri": null,
"metadata_json": {
"lane": "semantic",
"role": "primary_baseline",
"phase": "phase1",
"worker": "run_embedding_job",
"dry_run": true,
"vector_table": "audio_embedding_vector_768",
"output_target": "audio_embedding",
"dry_run_result": "completed_without_feature_write",
"execution_mode": "dry_run",
"write_target_table": "audio_embedding",
"target_scope_summary": {
"scope_type": "reference_set",
"scope_value": "phase1_hot_reference_v1",
"recording_count": 0,
"reference_set_id": 2,
"ready_asset_count": 0,
"reference_set_name": "phase1_hot_reference_v1",
"active_window_count": 0
}
}
},
"resolved_vector_table": "audio_embedding_vector_768",
"notes": [
"this worker currently validates planner -> job -> PostgreSQL state flow",
"real encoder inference can replace dry_run while preserving the same job contract"
]
}
\ No newline at end of file
{
"worker": "mark_job_status",
"schema": "acr_test",
"update": {
"extraction_job_id": 1,
"job_status": "pending",
"input_count": 0,
"output_count": 0,
"started_at": "2026-06-04T13:02:56.589356+08:00",
"finished_at": "2026-06-04T13:02:56.591597+08:00",
"log_uri": null,
"metadata_json": {
"lane": "exact",
"phase": "phase1",
"priority": "p0"
}
}
}
\ No newline at end of file
......@@ -26,7 +26,13 @@ def parse_target_scope(target_scope: str) -> dict[str, Any]:
def build_command_suggestions(job: dict[str, Any], schema: str) -> list[str]:
base_env = f"EXTRACTION_JOB_ID={job['extraction_job_id']} FEATURE_SET_ID={job['feature_set_id']} TARGET_SCOPE='{job['target_scope']}' PG_SCHEMA={schema}"
base_env = (
'PG_DSN="${PG_DSN:?set PG_DSN}" '
f"EXTRACTION_JOB_ID={job['extraction_job_id']} "
f"FEATURE_SET_ID={job['feature_set_id']} "
f"TARGET_SCOPE='{job['target_scope']}' "
f"PG_SCHEMA={schema}"
)
commands = []
if job['lane'] == 'exact':
commands.append(
......
from __future__ import annotations
import json
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import psycopg
SCHEMA_RE = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')
@dataclass
class JobContext:
extraction_job_id: int
feature_set_id: int
target_scope: str
job_status: str
shard_key: str | None
job_metadata: dict[str, Any]
feature_name: str
feature_level: str
extraction_granularity: str
window_sec: float | None
hop_sec: float | None
embedding_dim: int | None
distance_metric: str
feature_config: dict[str, Any]
model_id: int
model_name: str
model_version: str
model_family: str
input_sample_rate: int | None
output_embedding_dim: int | None
model_metadata: dict[str, Any]
def require_env(name: str, default: str | None = None) -> str:
value = os.environ.get(name, default)
if value is None or value == '':
raise SystemExit(f'missing required env: {name}')
return value
def validate_schema(schema: str) -> str:
if not SCHEMA_RE.match(schema):
raise SystemExit(f'invalid schema name: {schema}')
return schema
def ensure_output_parent(path: str | None) -> Path | None:
if not path:
return None
output = Path(path)
output.parent.mkdir(parents=True, exist_ok=True)
return output
def connect(dsn: str, schema: str, *, autocommit: bool = True) -> psycopg.Connection:
conn = psycopg.connect(dsn, autocommit=autocommit)
conn.execute(f'SET search_path TO {validate_schema(schema)}, public;')
return conn
def fetch_job_context(conn: psycopg.Connection, extraction_job_id: int) -> JobContext:
row = conn.execute(
"""
SELECT
fej.extraction_job_id,
fej.feature_set_id,
fej.target_scope,
fej.job_status,
fej.shard_key,
fej.metadata_json,
fs.feature_name,
fs.feature_level,
fs.extraction_granularity,
fs.window_sec,
fs.hop_sec,
fs.embedding_dim,
fs.distance_metric,
fs.config_json,
mr.model_id,
mr.model_name,
mr.model_version,
mr.model_family,
mr.input_sample_rate,
mr.output_embedding_dim,
mr.metadata_json
FROM feature_extraction_job fej
JOIN feature_set_registry fs ON fs.feature_set_id = fej.feature_set_id
JOIN model_registry mr ON mr.model_id = fs.model_id
WHERE fej.extraction_job_id = %s
LIMIT 1;
""",
(extraction_job_id,),
).fetchone()
if not row:
raise SystemExit(f'feature_extraction_job not found: {extraction_job_id}')
return JobContext(
extraction_job_id=int(row[0]),
feature_set_id=int(row[1]),
target_scope=row[2],
job_status=row[3],
shard_key=row[4],
job_metadata=row[5] or {},
feature_name=row[6],
feature_level=row[7],
extraction_granularity=row[8],
window_sec=float(row[9]) if row[9] is not None else None,
hop_sec=float(row[10]) if row[10] is not None else None,
embedding_dim=int(row[11]) if row[11] is not None else None,
distance_metric=row[12],
feature_config=row[13] or {},
model_id=int(row[14]),
model_name=row[15],
model_version=row[16],
model_family=row[17],
input_sample_rate=int(row[18]) if row[18] is not None else None,
output_embedding_dim=int(row[19]) if row[19] is not None else None,
model_metadata=row[20] or {},
)
def parse_target_scope(target_scope: str) -> tuple[str, str]:
if ':' in target_scope:
scope_type, scope_value = target_scope.split(':', 1)
return scope_type, scope_value
return 'unknown', target_scope
def resolve_scope_summary(conn: psycopg.Connection, target_scope: str) -> dict[str, Any]:
scope_type, scope_value = parse_target_scope(target_scope)
if scope_type == 'reference_set':
row = conn.execute(
"""
SELECT
rs.reference_set_id,
rs.set_name,
count(DISTINCT rsm.recording_id) AS recording_count,
count(DISTINCT ra.asset_id) FILTER (WHERE ra.ingest_status = 'ready') AS ready_asset_count,
count(DISTINCT aw.window_id) FILTER (WHERE aw.active_for_index) AS active_window_count
FROM reference_set_registry rs
LEFT JOIN reference_set_member rsm ON rsm.reference_set_id = rs.reference_set_id
LEFT JOIN recording_asset ra ON ra.recording_id = rsm.recording_id
LEFT JOIN audio_window aw ON aw.recording_id = rsm.recording_id
WHERE rs.set_name = %s
GROUP BY rs.reference_set_id, rs.set_name
LIMIT 1;
""",
(scope_value,),
).fetchone()
if not row:
raise SystemExit(f'reference set not found for target_scope={target_scope}')
return {
'scope_type': scope_type,
'scope_value': scope_value,
'reference_set_id': int(row[0]),
'reference_set_name': row[1],
'recording_count': int(row[2]),
'ready_asset_count': int(row[3]),
'active_window_count': int(row[4]),
}
return {
'scope_type': scope_type,
'scope_value': scope_value,
'recording_count': 0,
'ready_asset_count': 0,
'active_window_count': 0,
}
def update_job_status(
conn: psycopg.Connection,
extraction_job_id: int,
*,
status: str,
input_count: int | None = None,
output_count: int | None = None,
log_uri: str | None = None,
metadata_patch: dict[str, Any] | None = None,
set_started_at: bool = False,
set_finished_at: bool = False,
) -> dict[str, Any]:
patch = json.dumps(metadata_patch or {}, ensure_ascii=False)
row = conn.execute(
"""
UPDATE feature_extraction_job
SET job_status = %s,
input_count = COALESCE(%s, input_count),
output_count = COALESCE(%s, output_count),
log_uri = COALESCE(%s, log_uri),
started_at = CASE
WHEN %s THEN COALESCE(started_at, NOW())
ELSE started_at
END,
finished_at = CASE
WHEN %s THEN NOW()
ELSE finished_at
END,
metadata_json = COALESCE(metadata_json, '{}'::jsonb) || %s::jsonb
WHERE extraction_job_id = %s
RETURNING extraction_job_id, job_status, input_count, output_count, started_at, finished_at, log_uri, metadata_json;
""",
(
status,
input_count,
output_count,
log_uri,
set_started_at,
set_finished_at,
patch,
extraction_job_id,
),
).fetchone()
if not row:
raise SystemExit(f'failed to update feature_extraction_job={extraction_job_id}')
return {
'extraction_job_id': int(row[0]),
'job_status': row[1],
'input_count': int(row[2]) if row[2] is not None else None,
'output_count': int(row[3]) if row[3] is not None else None,
'started_at': row[4].isoformat() if row[4] is not None else None,
'finished_at': row[5].isoformat() if row[5] is not None else None,
'log_uri': row[6],
'metadata_json': row[7] or {},
}
def emit_payload(payload: dict[str, Any], output: str | None) -> None:
text = json.dumps(payload, ensure_ascii=False, indent=2)
if output:
target = ensure_output_parent(output)
assert target is not None
target.write_text(text, encoding='utf-8')
print(text)
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import json
import os
from _job_common import connect, emit_payload, require_env, update_job_status
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(require_env('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--status', required=True)
ap.add_argument('--input-count', type=int)
ap.add_argument('--output-count', type=int)
ap.add_argument('--log-uri')
ap.add_argument('--metadata-json')
ap.add_argument('--set-started-at', action='store_true')
ap.add_argument('--set-finished-at', action='store_true')
ap.add_argument('--output')
args = ap.parse_args()
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
metadata_patch = json.loads(args.metadata_json) if args.metadata_json else {}
with connect(args.dsn, args.schema) as conn:
updated = update_job_status(
conn,
args.job_id,
status=args.status,
input_count=args.input_count,
output_count=args.output_count,
log_uri=args.log_uri,
metadata_patch=metadata_patch,
set_started_at=args.set_started_at,
set_finished_at=args.set_finished_at,
)
emit_payload(
{
'worker': 'mark_job_status',
'schema': args.schema,
'update': updated,
},
args.output,
)
if __name__ == '__main__':
main()
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import os
from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_fingerprint'))
ap.add_argument('--complete-dry-run', action='store_true')
ap.add_argument('--output')
args = ap.parse_args()
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
with connect(args.dsn, args.schema) as conn:
job = fetch_job_context(conn, args.job_id)
if job.model_name != 'chromaprint':
raise SystemExit(f'feature_extraction_job={args.job_id} is not a chromaprint job')
scope = resolve_scope_summary(conn, job.target_scope)
running = update_job_status(
conn,
job.extraction_job_id,
status='running',
input_count=scope['ready_asset_count'],
metadata_patch={
'worker': 'run_chromaprint_job',
'output_target': args.output_target,
'dry_run': True,
'target_scope_summary': scope,
'execution_mode': 'dry_run',
},
set_started_at=True,
)
completed = None
if args.complete_dry_run:
completed = update_job_status(
conn,
job.extraction_job_id,
status='completed',
output_count=0,
metadata_patch={
'worker': 'run_chromaprint_job',
'output_target': args.output_target,
'dry_run': True,
'dry_run_result': 'completed_without_feature_write',
'write_target_table': 'audio_fingerprint',
},
set_finished_at=True,
)
emit_payload(
{
'worker': 'run_chromaprint_job',
'schema': args.schema,
'job': job.__dict__,
'target_scope_summary': scope,
'status_after_start': running,
'status_after_complete': completed,
'next_write_target': 'audio_fingerprint',
'notes': [
'this worker currently validates planner -> job -> PostgreSQL state flow',
'real chromaprint extraction can replace dry_run while preserving the same job contract',
],
},
args.output,
)
if __name__ == '__main__':
main()
#!/usr/bin/env /usr/local/miniconda3/bin/python
from __future__ import annotations
import argparse
import os
from _job_common import connect, emit_payload, fetch_job_context, resolve_scope_summary, update_job_status
VECTOR_TABLE_BY_DIM = {
192: 'audio_embedding_vector_192',
768: 'audio_embedding_vector_768',
}
def main() -> None:
ap = argparse.ArgumentParser()
ap.add_argument('--dsn', default=os.environ.get('PG_DSN'))
ap.add_argument('--schema', default=os.environ.get('PG_SCHEMA', 'acr_test'))
ap.add_argument('--job-id', type=int, default=int(os.environ.get('EXTRACTION_JOB_ID', '0')))
ap.add_argument('--model-name', default=os.environ.get('MODEL_NAME'))
ap.add_argument('--model-version', default=os.environ.get('MODEL_VERSION'))
ap.add_argument('--vector-table', default=os.environ.get('VECTOR_TABLE'))
ap.add_argument('--output-target', default=os.environ.get('OUTPUT_TARGET', 'audio_embedding'))
ap.add_argument('--complete-dry-run', action='store_true')
ap.add_argument('--output')
args = ap.parse_args()
if not args.dsn:
raise SystemExit('missing --dsn or PG_DSN')
if not args.job_id:
raise SystemExit('missing --job-id or EXTRACTION_JOB_ID')
with connect(args.dsn, args.schema) as conn:
job = fetch_job_context(conn, args.job_id)
if job.model_name == 'chromaprint':
raise SystemExit(f'feature_extraction_job={args.job_id} is not an embedding job')
if args.model_name and job.model_name != args.model_name:
raise SystemExit(f'model mismatch: job={job.model_name} cli={args.model_name}')
if args.model_version and job.model_version != args.model_version:
raise SystemExit(f'model version mismatch: job={job.model_version} cli={args.model_version}')
resolved_vector_table = args.vector_table or VECTOR_TABLE_BY_DIM.get(job.embedding_dim or job.output_embedding_dim or -1)
scope = resolve_scope_summary(conn, job.target_scope)
running = update_job_status(
conn,
job.extraction_job_id,
status='running',
input_count=scope['active_window_count'] or scope['ready_asset_count'],
metadata_patch={
'worker': 'run_embedding_job',
'output_target': args.output_target,
'vector_table': resolved_vector_table,
'dry_run': True,
'target_scope_summary': scope,
'execution_mode': 'dry_run',
},
set_started_at=True,
)
completed = None
if args.complete_dry_run:
completed = update_job_status(
conn,
job.extraction_job_id,
status='completed',
output_count=0,
metadata_patch={
'worker': 'run_embedding_job',
'output_target': args.output_target,
'vector_table': resolved_vector_table,
'dry_run': True,
'dry_run_result': 'completed_without_feature_write',
'write_target_table': args.output_target,
},
set_finished_at=True,
)
emit_payload(
{
'worker': 'run_embedding_job',
'schema': args.schema,
'job': job.__dict__,
'target_scope_summary': scope,
'status_after_start': running,
'status_after_complete': completed,
'resolved_vector_table': resolved_vector_table,
'notes': [
'this worker currently validates planner -> job -> PostgreSQL state flow',
'real encoder inference can replace dry_run while preserving the same job contract',
],
},
args.output,
)
if __name__ == '__main__':
main()
## 2026-06-04
- 新增 [Phase-1 Worker Contract](./phase1-worker-contract.md)`acr-engine/workers/_job_common.py``mark_job_status.py``run_chromaprint_job.py``run_embedding_job.py`,把 Phase-1 从“只有 planner 命令模板”推进到“worker 可以真实消费 PostgreSQL 的 `feature_extraction_job` 并执行 `pending -> running -> completed` dry-run 状态流转”的阶段。
- 新增 `phase1_worker_chromaprint_dry_run.json``phase1_worker_embedding_dry_run.json``phase1_worker_mark_pending_report.json`,并在 live PostgreSQL `acr_test` 上验证了 worker 状态流转;同时确认当前 `phase1_hot_reference_v1` 还没有实际 members,因此 scope 计数为 `0`,这是数据未装载而不是 worker 失败。
- 修正 `plan_phase1_extraction_jobs_live.py` 的命令模板,把 `PG_DSN=\"${PG_DSN:?set PG_DSN}\"` 显式写入 `command_suggestions / primary_command`,避免 planner 产物看起来可跑但实际缺少数据库连接串。
- 更新 `plan_phase1_extraction_jobs_live.py``phase1_extraction_plan_report.json`,把 Phase-1 execution plan 从“仅有排序计划”推进到“附带 `command_suggestions / primary_command` 的可复制执行命令模板”。
- 新增 `acr-engine/scripts/plan_phase1_extraction_jobs_live.py``acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json`,支持从 PostgreSQL 的 `feature_extraction_job` 真实读取 pending jobs,并联表生成按 lane / priority 排序的 Phase-1 execution plan。
- 新增 `acr-engine/scripts/bootstrap_phase1_extraction_jobs_live.py``acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json`,把 Phase-1 的 `feature_extraction_job` 初始化做成可直接连 PostgreSQL 的 live 脚本,并已在 `acr_test` schema 真实创建 5 条 pending jobs。
......
......@@ -219,6 +219,36 @@ flowchart TD
---
## 10. Phase-1 worker contract(新增执行层)
当前已经不只是 registry/bootstrap 了,还补上了最小真实 worker 执行面:
- `acr-engine/workers/mark_job_status.py`
- `acr-engine/workers/run_chromaprint_job.py`
- `acr-engine/workers/run_embedding_job.py`
这层的作用不是立即跑完真实抽特征,而是先把下面这条链打通:
```text
planner -> feature_extraction_job -> worker -> PostgreSQL status update
```
### 当前能力
1. 读取 `feature_extraction_job`
2. 联表解析 `feature_set_registry + model_registry`
3. 解析 `target_scope`
4. 回写 `pending -> running -> completed`
5. 为后续真模型推理保留稳定契约
### 推荐阅读
详细契约与流程图见:
- [docs/phase1-worker-contract.md](./phase1-worker-contract.md)
---
## 8. live PostgreSQL bootstrap 脚本
为了避免每次手工执行 SQL,本仓库现在提供了一个可直接连 PostgreSQL 的 live bootstrap 脚本:
......
# Phase-1 Worker Contract / 作业执行器契约
> 更新:2026-06-04
> 目标:把 Phase-1 从“只有 registry / plan”推进到“worker 可以真实消费 PostgreSQL 作业并更新状态”。
---
## 一页结论
当前 Phase-1 已经具备一条最小真实执行链:
1. planner 从 `feature_extraction_job` 读 pending jobs
2. worker 读取 `extraction_job_id`
3. worker 联表解析 `feature_set_registry + model_registry`
4. worker 解析 `target_scope`
5. worker 回写 `feature_extraction_job.job_status / input_count / output_count / metadata_json`
也就是说,现在 PostgreSQL 不只是“数据字典”,已经开始承担:
- 作业编排面
- 状态机面
- 执行证据面
---
## 1. 当前落地的 worker
位于:
- `acr-engine/workers/mark_job_status.py`
- `acr-engine/workers/run_chromaprint_job.py`
- `acr-engine/workers/run_embedding_job.py`
- `acr-engine/workers/_job_common.py`
### 角色划分
| worker | 作用 |
|---|---|
| `mark_job_status.py` | 通用状态推进器 |
| `run_chromaprint_job.py` | exact lane worker |
| `run_embedding_job.py` | semantic lane worker |
| `_job_common.py` | 共享的 job 读取、scope 解析、状态回写逻辑 |
---
## 2. 当前状态机
```mermaid
flowchart LR
A[pending] --> B[running]
B --> C[completed]
B --> D[failed]
```
### 当前已验证的状态流转
- `pending -> running`
- `running -> completed`(dry-run 模式)
### 设计意图
先把 **作业契约与状态流转** 固定住,再把真正的模型推理塞进去。
这样后续不管换成:
- `Chromaprint`
- `MERT`
- `MuQ`
- `CoverHunter encoder`
都不需要重做 orchestration 数据结构。
---
## 3. worker 输入契约
### 环境变量
| 变量 | 说明 |
|---|---|
| `PG_DSN` | PostgreSQL 连接串 |
| `PG_SCHEMA` | 目标 schema |
| `EXTRACTION_JOB_ID` | 要执行的作业 id |
| `FEATURE_SET_ID` | 规划时附带,worker 可用于一致性检查 |
| `TARGET_SCOPE` | 规划时附带,worker 当前以 DB 中 job 记录为准 |
| `MODEL_NAME` | embedding worker 用于防错 |
| `MODEL_VERSION` | embedding worker 用于防错 |
| `VECTOR_TABLE` | embedding worker 目标向量表 |
| `OUTPUT_TARGET` | `audio_fingerprint``audio_embedding` |
### CLI 参数
三个 worker 都支持显式 CLI 参数覆盖 env。
### planner 命令模板的当前约定
`plan_phase1_extraction_jobs_live.py` 现在会显式生成:
```bash
PG_DSN="${PG_DSN:?set PG_DSN}" ...
```
这样复制命令时,如果调用方忘了提供数据库连接串,会立刻失败,而不是静默跑空。
---
## 4. PostgreSQL 读取契约
worker 当前真实读取:
1. `feature_extraction_job`
2. `feature_set_registry`
3. `model_registry`
4. `reference_set_registry` / `reference_set_member`
5. `recording_asset`
6. `audio_window`
### 为什么要读 scope summary
因为 Phase-1 第一阶段的核心不是“立刻抽出 embedding”,而是先确定:
- 这次 job 面向哪个 reference set
- 涉及多少 recording
- 涉及多少 ready asset
- 涉及多少 active window
这样后续做:
- 分片
- 并行
- 重试
- SLA 估算
才有稳定基线。
---
## 5. 当前 dry-run 的真实意义
当前 worker 还没有真正调用模型做特征提取;它做的是:
1. 验证 planner 命令模板可被真实消费
2. 验证 job -> feature_set -> model 的 join 契约
3. 验证 target scope 解析
4. 验证 PostgreSQL 作业状态回写
5. 为下一步真推理保留稳定入口
所以它不是假文档,而是:
> **先把工业执行面的骨架打通,再把模型推理填进去。**
---
## 6. 推荐执行顺序
```mermaid
flowchart TD
A[bootstrap model/feature/reference registry] --> B[bootstrap feature_extraction_job]
B --> C[plan pending jobs]
C --> D[run worker dry-run]
D --> E[validate status transitions]
E --> F[replace dry-run with real extractor]
```
---
## 7. exact lane 与 semantic lane 的后续替换点
### 7.1 Chromaprint worker
后续把下面逻辑塞进 `run_chromaprint_job.py`
1. 读取 `recording_asset`
2. 调 chromaprint CLI / library
3.`audio_fingerprint`
4. 更新 `output_count`
5. 标记 `completed`
### 7.2 Embedding worker
后续把下面逻辑塞进 `run_embedding_job.py`
1. 读取 `audio_window`
2. 加载 `MERT` / `MuQ` / `ECAPA`
3. 提取向量
4.`audio_embedding`
5.`audio_embedding_vector_<dim>`
6. 更新 `output_count`
7. 标记 `completed`
---
## 8. 解决了什么问题
这次 worker contract 落地,主要解决了 4 个问题:
1. **planner 不再只是纸面计划**
2. **job status 有了真实推进器**
3. **后续换模型不用重做 orchestration**
4. **可以先 dry-run 验证执行链,再接入重模型**
---
## 9. 当前边界
当前还没有完成的部分:
- 真实 chromaprint 特征写入
- 真实 MERT / MuQ / ECAPA embedding 写入
- `failed` 重试策略
- job 分片执行器
- 幂等去重写入策略
但现在已经足够支撑下一阶段:
> **把真实 extractor 接到已经验证过的 PostgreSQL worker contract 上。**
......@@ -69,6 +69,9 @@
| registry bootstrap 幂等性报告 | `acr-engine/data/pgvector_eval/music20/phase1_registry_bootstrap_idempotency_report.json` |
| extraction job bootstrap 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_jobs_report.json` |
| extraction plan 报告 | `acr-engine/data/pgvector_eval/music20/phase1_extraction_plan_report.json` |
| chromaprint worker dry-run 报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_chromaprint_dry_run.json` |
| embedding worker dry-run 报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_embedding_dry_run.json` |
| job status 手工回写报告 | `acr-engine/data/pgvector_eval/music20/phase1_worker_mark_pending_report.json` |
| 历史对照报告 | `acr-engine/data/pgvector_eval/music20/songid_eval_report.json` |
---
......@@ -258,6 +261,84 @@ flowchart LR
当前这次 live 样例里只实际包含:
- `type_1`
---
## Phase-1 worker dry-run 测试链路(新增)
这一步解决的是:
> planner 虽然已经能输出可复制命令,但之前仓库里没有真正的 worker 可以消费这些命令。
现在已经补上最小真实 worker:
- `acr-engine/workers/mark_job_status.py`
- `acr-engine/workers/run_chromaprint_job.py`
- `acr-engine/workers/run_embedding_job.py`
### 测试目标
验证下面这条链是真实可走通的:
```mermaid
flowchart TD
A[feature_extraction_job pending] --> B[planner 生成命令模板]
B --> C[worker 读取 extraction_job_id]
C --> D[worker 解析 feature/model/scope]
D --> E[worker 回写 running/completed]
E --> F[bootstrap 脚本可再次恢复 pending]
```
### 当前验证口径
这轮先不跑真实模型推理,而是先验证工业执行面:
1. `run_chromaprint_job.py`
- 真实连接 PostgreSQL
- 读取 `feature_extraction_job=1`
- 解析 `reference_set:phase1_hot_reference_v1`
- 回写 `running -> completed`
2. `run_embedding_job.py`
- 真实连接 PostgreSQL
- 读取 `feature_extraction_job=2`
- 解析 `mert v1-95m`
- 回写 `running -> completed`
3. 再次执行 `bootstrap_phase1_extraction_jobs_live.py`
- 把 job 状态恢复为 `pending`
- 保证后续 session 可以从同一批 jobs 继续推进
### 为什么先做 dry-run
因为当前第一优先级是把下面这些东西固定住:
- job contract
- status transitions
- scope 解析
- planner -> worker 命令兼容性
等这个骨架稳定后,再把真实的:
- chromaprint 提取
- MERT / MuQ embedding 提取
接进去,整体风险更低。
### 当前 live 结果的一个关键解释
本轮 worker dry-run 里,`phase1_hot_reference_v1` 已经存在,但在 `acr_test` schema 里**还没有实际 member**,所以:
- `recording_count=0`
- `ready_asset_count=0`
- `active_window_count=0`
这不是 worker 异常,而是当前 Phase-1 live 数据面还没把业务 reference recordings 真实装进去。
因此这轮验证证明的是:
- planner -> worker 命令兼容
- worker -> PostgreSQL 状态流转可用
**不是**真实特征抽取吞吐验证。
- `type_7`
因此:
......
......@@ -44,7 +44,8 @@
4. [docs/postgresql-data-model.md](./postgresql-data-model.md)
5. [docs/phase1-implementation-checklist.md](./phase1-implementation-checklist.md)
6. [docs/model-feature-registry-bootstrap.md](./model-feature-registry-bootstrap.md)
7. [docs/CHANGELOG.md](./CHANGELOG.md)
7. [docs/phase1-worker-contract.md](./phase1-worker-contract.md)
8. [docs/CHANGELOG.md](./CHANGELOG.md)
如果只想快速恢复上下文,至少读前 5 个。
......@@ -186,9 +187,13 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- PostgreSQL `acr_test` schema 上已真实创建 5 条 `feature_extraction_job`,后续 MERT / MuQ 接入可直接从 pending jobs 启动
- PostgreSQL `acr_test` schema 上已真实生成 Phase-1 extraction execution plan,当前顺序是 `chromaprint -> mert -> mert-long -> muq -> ecapa`
- extraction plan 报告里已包含 `command_suggestions / primary_command`,下次可直接从 plan 抄 worker 命令模板
- Phase-1 worker 入口已真实落地:`run_chromaprint_job.py / run_embedding_job.py / mark_job_status.py`
- 下一阶段已经不是“补 planner”,而是把 dry-run worker 替换为真实 extractor,并把 `audio_fingerprint / audio_embedding` 写入做成幂等执行
- 最新 live worker 证据表明:`phase1_hot_reference_v1` 当前在 `acr_test` 里还没有实际 members,所以 dry-run 已验证状态机,但 scope 计数仍是 `0`
### 未验证 / 仍是缺口
- **未实际跑 MERT / MuQ encoder-only 特征抽取**
- **worker 目前仍以 dry-run 为主,尚未写真实 `audio_fingerprint / audio_embedding`**
- **未落 reference set 的真实业务数据**
- **未定义最终线上分数融合细则**
- **type_8 / type_16 还没有进入当前 live JSONL 的 PostgreSQL 实测链**
......@@ -204,6 +209,7 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
- `a549d1d` — Clarify the ACR evolution path and freeze a production-grade data model
- `e514a6c` — Keep the new ACR architecture guide clean for follow-up edits
- `4b23f54` — Make the Phase-1 ACR plan executable for each delivery role
- `0679481` — Attach runnable command templates to the extraction plan
如果下次需要追踪文档补充点,可以从这三个提交开始看。
......@@ -212,4 +218,4 @@ sed -n '1,320p' acr-engine/sql/acr_pg_schema_v2.sql
## 一句话交接
> **下次启动不要再从“要不要换模型、要不要重构数据结构”开始讨论。**
> 这些方向已经定了。直接从 **PostgreSQL v2 schema 落库 + Phase-1 encoder-only 执行链** 开始推进。
> 这些方向已经定了。直接从 **PostgreSQL v2 schema 落库 + Phase-1 worker/extractor 执行链** 开始推进。
......