prepare_fma_archive.py
4.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
"""Manage download/extract workflow for the official FMA Small archive."""
from __future__ import annotations
import argparse
import json
import subprocess
from pathlib import Path
FMA_SMALL_URL = "https://modelscope.cn/datasets/pengzhendong/fma/resolve/master/fma_small.zip"
FMA_SMALL_BYTES = 7679594875
ARCHIVE_PATH = Path("data/raw/fma_small.zip")
EXTRACT_DIR = Path("data/raw/fma_small_audio")
def run(cmd: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, text=True, capture_output=True)
def download(resume: bool = True) -> dict:
ARCHIVE_PATH.parent.mkdir(parents=True, exist_ok=True)
cmd = ["curl", "-L"]
if resume:
cmd += ["--continue-at", "-"]
cmd += ["--output", str(ARCHIVE_PATH), FMA_SMALL_URL]
proc = run(cmd)
return {
"action": "download",
"command": cmd,
"returncode": proc.returncode,
"archive_path": str(ARCHIVE_PATH.resolve()),
"archive_exists": ARCHIVE_PATH.exists(),
"archive_size": ARCHIVE_PATH.stat().st_size if ARCHIVE_PATH.exists() else 0,
"stdout_tail": proc.stdout[-1200:],
"stderr_tail": proc.stderr[-1200:],
}
def bg_download(log_path: Path, resume: bool = True) -> dict:
ARCHIVE_PATH.parent.mkdir(parents=True, exist_ok=True)
log_path.parent.mkdir(parents=True, exist_ok=True)
cmd = ["nohup", "curl", "-L"]
if resume:
cmd += ["--continue-at", "-"]
cmd += ["--output", str(ARCHIVE_PATH), FMA_SMALL_URL]
shell_cmd = " ".join(cmd) + f" >> {log_path} 2>&1 & echo $!"
proc = subprocess.run(["bash", "-lc", shell_cmd], text=True, capture_output=True)
pid = proc.stdout.strip()
return {
"action": "bg-download",
"returncode": proc.returncode,
"pid": pid,
"log_path": str(log_path.resolve()),
"archive_path": str(ARCHIVE_PATH.resolve()),
"archive_exists": ARCHIVE_PATH.exists(),
"archive_size": ARCHIVE_PATH.stat().st_size if ARCHIVE_PATH.exists() else 0,
"stderr_tail": proc.stderr[-1200:],
}
def inspect() -> dict:
archive_exists = ARCHIVE_PATH.exists()
extract_exists = EXTRACT_DIR.exists()
num_audio = 0
if extract_exists:
num_audio = len([p for p in EXTRACT_DIR.rglob('*') if p.suffix.lower() in {'.mp3', '.wav', '.flac', '.ogg'}])
archive_size = ARCHIVE_PATH.stat().st_size if archive_exists else 0
progress_ratio = (archive_size / FMA_SMALL_BYTES) if archive_exists and FMA_SMALL_BYTES else 0.0
return {
"action": "inspect",
"archive_url": FMA_SMALL_URL,
"archive_bytes_expected": FMA_SMALL_BYTES,
"archive_path": str(ARCHIVE_PATH.resolve()),
"archive_exists": archive_exists,
"archive_size": archive_size,
"archive_progress_ratio": round(progress_ratio, 6),
"archive_progress_percent": round(progress_ratio * 100, 4),
"extract_dir": str(EXTRACT_DIR.resolve()),
"extract_exists": extract_exists,
"num_audio_files": num_audio,
}
def extract(overwrite: bool = False) -> dict:
if not ARCHIVE_PATH.exists():
raise SystemExit(json.dumps({
"status": "blocked",
"reason": "archive_missing",
"archive_path": str(ARCHIVE_PATH.resolve()),
"recommendation": f"Run download first from {FMA_SMALL_URL}",
}, indent=2, ensure_ascii=False))
EXTRACT_DIR.mkdir(parents=True, exist_ok=True)
cmd = ["unzip"]
if overwrite:
cmd.append("-o")
else:
cmd.append("-n")
cmd += [str(ARCHIVE_PATH), "-d", str(EXTRACT_DIR)]
proc = run(cmd)
num_audio = len([p for p in EXTRACT_DIR.rglob('*') if p.suffix.lower() in {'.mp3', '.wav', '.flac', '.ogg'}])
return {
"action": "extract",
"command": cmd,
"returncode": proc.returncode,
"extract_dir": str(EXTRACT_DIR.resolve()),
"num_audio_files": num_audio,
"stdout_tail": proc.stdout[-1200:],
"stderr_tail": proc.stderr[-1200:],
}
def main():
parser = argparse.ArgumentParser()
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("download")
p.add_argument("--no-resume", action="store_true")
p = sub.add_parser("bg-download")
p.add_argument("--no-resume", action="store_true")
p.add_argument("--log-path", default="/tmp/fma_modelscope_download.log")
sub.add_parser("inspect")
p = sub.add_parser("extract")
p.add_argument("--overwrite", action="store_true")
args = parser.parse_args()
if args.cmd == "download":
result = download(resume=not args.no_resume)
elif args.cmd == "bg-download":
result = bg_download(Path(args.log_path), resume=not args.no_resume)
elif args.cmd == "inspect":
result = inspect()
elif args.cmd == "extract":
result = extract(overwrite=args.overwrite)
else:
raise SystemExit(2)
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()