feat: optimized dataset convertion efficiency, add on-demand training start/stop script

This commit is contained in:
2026-05-06 22:32:18 +08:00
parent 056df3b6ca
commit 0008288964
6 changed files with 607 additions and 114 deletions

View File

@@ -1,130 +1,240 @@
import argparse
import json
import os
import subprocess
import sys
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Pool
from pathlib import Path
from tqdm import tqdm
from types import SimpleNamespace
import pyarrow.parquet as pq
from tqdm import tqdm
"""
Convert Pyarrow parquets to megatron format, use jsonl as intermediate format.
Convert Kaiyuan parquet files directly to Megatron indexed dataset format.
Takes in parquet schema:
Expected parquet schema:
text: <string>
The previous implementation used parquet -> JSONL -> Megatron preprocess_data.py.
This implementation removes the JSONL intermediate file and writes .bin/.idx with
Megatron's IndexedDatasetBuilder directly.
Usage:
python /apps/yi/model_training/scripts/convert_phase_to_megatron.py \
--input-dir /apps/yi/model_training/data/phase1 \
--output-dir /ssd/yi/converted_data/megatron_phase1 \
--tmp-dir /ssd/yi/converted_data/tmp_jsonl \
--megatron-dir /apps/yi/model_training/Megatron-LM \
--tokenizer-model /apps/yi/model_training/data/tokenizer \
--text-key text \
--num-shards 4 \
--workers-per-shard 16 \
--start 100 \
--end 220 # 1 of total 220 parquets
--end 220
"""
_TOKENIZER = None
_APPEND_EOD = True
def parquet_to_jsonl(parquet_path: Path, jsonl_path: Path, text_key: str):
jsonl_path.parent.mkdir(parents=True, exist_ok=True)
rows = 0
with jsonl_path.open("w", encoding="utf-8") as fout:
pf = pq.ParquetFile(parquet_path)
for batch in pf.iter_batches(columns=[text_key], batch_size=8192):
col = batch.column(0).to_pylist()
for text in col:
if isinstance(text, str) and text.strip():
fout.write(json.dumps({text_key: text}, ensure_ascii=False) + "\n")
rows += 1
return rows
def run_one(args_tuple):
(
parquet_path,
output_dir,
tmp_dir,
text_key,
megatron_dir,
tokenizer_type,
tokenizer_model,
workers_per_shard,
keep_jsonl,
overwrite,
) = args_tuple
parquet_path = Path(parquet_path)
stem = parquet_path.name.replace(".zstd.parquet", "").replace(".parquet", "")
jsonl_path = Path(tmp_dir) / f"{stem}.jsonl"
output_prefix = Path(output_dir) / f"phase1_{stem}"
bin_file = Path(str(output_prefix) + f"_{text_key}_document.bin")
idx_file = Path(str(output_prefix) + f"_{text_key}_document.idx")
if not overwrite and bin_file.exists() and idx_file.exists():
return f"[SKIP] {parquet_path.name}: existing bin/idx"
print(f"[START] {parquet_path.name}", flush=True)
rows = parquet_to_jsonl(parquet_path, jsonl_path, text_key)
print(f"[JSONL DONE] {parquet_path.name}: rows={rows}, jsonl={jsonl_path}", flush=True)
print(f"[MEGATRON START] {parquet_path.name}", flush=True)
cmd = [
"python",
str(Path(megatron_dir) / "tools/preprocess_data.py"),
"--input", str(jsonl_path),
"--output-prefix", str(output_prefix),
"--tokenizer-type", tokenizer_type,
"--tokenizer-model", tokenizer_model,
"--json-keys", text_key,
"--workers", str(workers_per_shard),
"--append-eod",
]
env = os.environ.copy()
proc = subprocess.run(
cmd,
cwd=megatron_dir,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
def make_tokenizer_args(args):
return SimpleNamespace(
rank=0,
make_vocab_size_divisible_by=128,
tensor_model_parallel_size=1,
padded_vocab_size=None,
vocab_size=args.vocab_size,
vocab_file=args.vocab_file,
merge_file=args.merge_file,
vocab_extra_ids=0,
tokenizer_type=args.tokenizer_type,
tokenizer_model=args.tokenizer_model,
metadata_path=args.tokenizer_metadata,
special_tokens=args.tokenizer_special_tokens,
tokenizer_sentencepiece_legacy=args.tokenizer_sentencepiece_legacy,
tokenizer_hf_no_use_fast=args.tokenizer_hf_no_use_fast,
tokenizer_hf_no_include_special_tokens=args.tokenizer_hf_no_include_special_tokens,
trust_remote_code=args.trust_remote_code,
tiktoken_pattern=args.tiktoken_pattern,
tiktoken_num_special_tokens=args.tiktoken_num_special_tokens,
null_tokenizer_eod_id=args.null_tokenizer_eod_id,
null_tokenizer_pad_id=args.null_tokenizer_pad_id,
tokenizer_prompt_format=None,
image_tag_type=None,
force_system_message=False,
sft_tokenizer_prompt_format=None,
)
if proc.returncode != 0:
return f"[FAIL] {parquet_path.name}\n{proc.stdout[-4000:]}"
if not keep_jsonl:
jsonl_path.unlink(missing_ok=True)
return f"[OK] {parquet_path.name}: rows={rows}, output_prefix={output_prefix}"
def add_megatron_to_path(megatron_dir):
megatron_dir = str(Path(megatron_dir).resolve())
if megatron_dir not in sys.path:
sys.path.insert(0, megatron_dir)
def main():
def build_megatron_tokenizer(args):
add_megatron_to_path(args.megatron_dir)
from megatron.core.tokenizers.utils.build_tokenizer import build_tokenizer
return build_tokenizer(make_tokenizer_args(args))
def init_worker(args):
global _TOKENIZER, _APPEND_EOD
_APPEND_EOD = args.append_eod
_TOKENIZER = build_megatron_tokenizer(args)
if _APPEND_EOD and _TOKENIZER.eod is None:
raise ValueError("Tokenizer has no EOD/EOS token, but --append-eod is enabled.")
def encode_text(text):
if not isinstance(text, str):
return None
text = text.strip()
if not text:
return None
token_ids = _TOKENIZER.tokenize(text)
if not token_ids:
return None
sentence_lens = [len(token_ids)]
if _APPEND_EOD:
token_ids.append(_TOKENIZER.eod)
sentence_lens[-1] += 1
return token_ids, sentence_lens
def output_paths(output_prefix, text_key):
prefix = Path(output_prefix)
return (
Path(str(prefix) + f"_{text_key}_document.bin"),
Path(str(prefix) + f"_{text_key}_document.idx"),
)
def remove_partial_outputs(output_prefix, text_key):
bin_file, idx_file = output_paths(output_prefix, text_key)
bin_file.unlink(missing_ok=True)
idx_file.unlink(missing_ok=True)
def convert_one_parquet(args_tuple):
parquet_path, args = args_tuple
parquet_path = Path(parquet_path)
stem = parquet_path.name.replace(".zstd.parquet", "").replace(".parquet", "")
output_prefix = Path(args.output_dir) / f"{args.output_prefix_prefix}_{stem}"
bin_file, idx_file = output_paths(output_prefix, args.text_key)
if not args.overwrite and bin_file.exists() and idx_file.exists():
return f"[SKIP] {parquet_path.name}: existing bin/idx"
remove_partial_outputs(output_prefix, args.text_key)
add_megatron_to_path(args.megatron_dir)
from megatron.core.datasets import indexed_dataset
tokenizer = build_megatron_tokenizer(args)
dtype = indexed_dataset.DType.optimal_dtype(tokenizer.vocab_size)
builder = indexed_dataset.IndexedDatasetBuilder(str(bin_file), dtype=dtype)
start_time = time.time()
rows = 0
docs = 0
tokens = 0
def consume_encoded(encoded):
nonlocal docs, tokens
if encoded is None:
return
token_ids, sentence_lens = encoded
builder.add_document(token_ids, sentence_lens)
docs += 1
tokens += len(token_ids)
if args.log_interval and docs % args.log_interval == 0:
elapsed = max(time.time() - start_time, 1e-6)
print(
f"[{parquet_path.name}] docs={docs} "
f"tokens={tokens} docs/s={docs / elapsed:.2f}",
flush=True,
)
pf = pq.ParquetFile(parquet_path)
if args.workers_per_shard == 1:
init_worker(args)
for batch in pf.iter_batches(columns=[args.text_key], batch_size=args.batch_size):
texts = batch.column(0).to_pylist()
rows += len(texts)
for text in texts:
consume_encoded(encode_text(text))
else:
with Pool(processes=args.workers_per_shard, initializer=init_worker, initargs=(args,)) as pool:
for batch in pf.iter_batches(columns=[args.text_key], batch_size=args.batch_size):
texts = batch.column(0).to_pylist()
rows += len(texts)
for encoded in pool.imap(encode_text, texts, chunksize=args.chunksize):
consume_encoded(encoded)
builder.finalize(str(idx_file))
elapsed = max(time.time() - start_time, 1e-6)
return (
f"[OK] {parquet_path.name}: rows={rows}, docs={docs}, tokens={tokens}, "
f"elapsed={elapsed:.1f}s, docs/s={docs / elapsed:.2f}, output_prefix={output_prefix}"
)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input-dir", required=True)
parser.add_argument("--output-dir", required=True)
parser.add_argument("--tmp-dir", required=True)
parser.add_argument("--tmp-dir", default=None, help="Deprecated; kept for CLI compatibility.")
parser.add_argument("--megatron-dir", default="/apps/model_training/Megatron-LM")
parser.add_argument("--tokenizer-type", default="HuggingFaceTokenizer")
parser.add_argument("--tokenizer-model", required=True)
parser.add_argument("--tokenizer-metadata", default=None)
parser.add_argument("--tokenizer-special-tokens", nargs="*", default=None)
parser.add_argument("--tokenizer-sentencepiece-legacy", action="store_true")
parser.add_argument("--tokenizer-hf-no-use-fast", action="store_true")
parser.add_argument("--tokenizer-hf-no-include-special-tokens", action="store_true")
parser.add_argument("--trust-remote-code", action="store_true")
parser.add_argument("--vocab-file", default=None)
parser.add_argument("--merge-file", default=None)
parser.add_argument("--vocab-size", type=int, default=None)
parser.add_argument("--tiktoken-pattern", default=None)
parser.add_argument("--tiktoken-num-special-tokens", type=int, default=1000)
parser.add_argument("--null-tokenizer-eod-id", type=int, default=None)
parser.add_argument("--null-tokenizer-pad-id", type=int, default=-1)
parser.add_argument("--text-key", default="text")
parser.add_argument("--num-shards", type=int, default=1, help="parallel parquet shards")
parser.add_argument("--workers-per-shard", type=int, default=8)
parser.add_argument("--output-prefix-prefix", default="phase1")
parser.add_argument("--num-shards", type=int, default=1, help="Parallel parquet files.")
parser.add_argument("--workers-per-shard", type=int, default=max((os.cpu_count() or 8) // 2, 1))
parser.add_argument("--batch-size", type=int, default=8192, help="Parquet record batch size.")
parser.add_argument("--chunksize", type=int, default=64, help="Tokenizer pool imap chunk size.")
parser.add_argument("--log-interval", type=int, default=10000)
parser.add_argument("--start", type=int, default=0)
parser.add_argument("--end", type=int, default=None)
parser.add_argument("--keep-jsonl", action="store_true")
parser.add_argument("--append-eod", action=argparse.BooleanOptionalAction, default=True)
parser.add_argument("--keep-jsonl", action="store_true", help="Deprecated; no JSONL is written.")
parser.add_argument("--overwrite", action="store_true")
args = parser.parse_args()
return parser.parse_args()
def main():
args = parse_args()
if args.num_shards < 1:
raise ValueError("--num-shards must be >= 1")
if args.workers_per_shard < 1:
raise ValueError("--workers-per-shard must be >= 1")
if args.batch_size < 1:
raise ValueError("--batch-size must be >= 1")
if args.chunksize < 1:
raise ValueError("--chunksize must be >= 1")
files = sorted(Path(args.input_dir).glob("*.zstd.parquet"))
if not files:
@@ -132,33 +242,24 @@ def main():
files = files[args.start:args.end]
print(f"Converting {len(files)} files")
print(f"Parallel shards: {args.num_shards}")
print(f"Workers per shard: {args.workers_per_shard}")
print(f"Parallel parquet files: {args.num_shards}")
print(f"Tokenizer workers per parquet: {args.workers_per_shard}")
print(f"Total tokenizer workers: {args.num_shards * args.workers_per_shard}")
Path(args.output_dir).mkdir(parents=True, exist_ok=True)
Path(args.tmp_dir).mkdir(parents=True, exist_ok=True)
if args.tmp_dir:
Path(args.tmp_dir).mkdir(parents=True, exist_ok=True)
tasks = [
(
str(f),
args.output_dir,
args.tmp_dir,
args.text_key,
args.megatron_dir,
args.tokenizer_type,
args.tokenizer_model,
args.workers_per_shard,
args.keep_jsonl,
args.overwrite,
)
for f in files
]
with ProcessPoolExecutor(max_workers=args.num_shards) as ex:
futs = [ex.submit(run_one, t) for t in tasks]
for fut in tqdm(as_completed(futs), total=len(futs)):
print(fut.result(), flush=True)
tasks = [(str(f), args) for f in files]
if args.num_shards == 1:
for task in tqdm(tasks):
print(convert_one_parquet(task), flush=True)
else:
with ProcessPoolExecutor(max_workers=args.num_shards) as ex:
futs = [ex.submit(convert_one_parquet, task) for task in tasks]
for fut in tqdm(as_completed(futs), total=len(futs)):
print(fut.result(), flush=True)
if __name__ == "__main__":
main()
main()