import argparse import os import sys import time from concurrent.futures import ProcessPoolExecutor, as_completed from multiprocessing import Pool from pathlib import Path from types import SimpleNamespace import pyarrow.parquet as pq from tqdm import tqdm """ Convert Kaiyuan parquet files directly to Megatron indexed dataset format. Expected parquet schema: text: The previous implementation used parquet -> JSONL -> Megatron preprocess_data.py. This implementation removes the JSONL intermediate file and writes .bin/.idx with Megatron's IndexedDatasetBuilder directly. Usage: python /ssd1/yi/pretrain_kaiyuan2b/scripts/convert_phase_to_megatron.py \ --input-dir /ssd1/yi/data/phase1 \ --output-dir /ssd1/yi/converted_data/phase1 \ --megatron-dir /ssd1/yi/pretrain_kaiyuan2b/Megatron-LM \ --tokenizer-model /ssd1/yi/data/tokenizer \ --text-key text \ --num-shards 16 \ --workers-per-shard 12 \ --batch-size 16384 \ --chunksize 128 \ --start 0 \ --end 210 """ _TOKENIZER = None _APPEND_EOD = True def make_tokenizer_args(args): return SimpleNamespace( rank=0, make_vocab_size_divisible_by=128, tensor_model_parallel_size=1, padded_vocab_size=None, vocab_size=args.vocab_size, vocab_file=args.vocab_file, merge_file=args.merge_file, vocab_extra_ids=0, tokenizer_type=args.tokenizer_type, tokenizer_model=args.tokenizer_model, metadata_path=args.tokenizer_metadata, special_tokens=args.tokenizer_special_tokens, tokenizer_sentencepiece_legacy=args.tokenizer_sentencepiece_legacy, tokenizer_hf_no_use_fast=args.tokenizer_hf_no_use_fast, tokenizer_hf_no_include_special_tokens=args.tokenizer_hf_no_include_special_tokens, trust_remote_code=args.trust_remote_code, tiktoken_pattern=args.tiktoken_pattern, tiktoken_num_special_tokens=args.tiktoken_num_special_tokens, null_tokenizer_eod_id=args.null_tokenizer_eod_id, null_tokenizer_pad_id=args.null_tokenizer_pad_id, tokenizer_prompt_format=None, image_tag_type=None, force_system_message=False, sft_tokenizer_prompt_format=None, ) def add_megatron_to_path(megatron_dir): megatron_dir = str(Path(megatron_dir).resolve()) if megatron_dir not in sys.path: sys.path.insert(0, megatron_dir) def build_megatron_tokenizer(args): add_megatron_to_path(args.megatron_dir) from megatron.core.tokenizers.utils.build_tokenizer import build_tokenizer return build_tokenizer(make_tokenizer_args(args)) def init_worker(args): global _TOKENIZER, _APPEND_EOD _APPEND_EOD = args.append_eod _TOKENIZER = build_megatron_tokenizer(args) if _APPEND_EOD and _TOKENIZER.eod is None: raise ValueError("Tokenizer has no EOD/EOS token, but --append-eod is enabled.") def encode_text(text): if not isinstance(text, str): return None text = text.strip() if not text: return None token_ids = _TOKENIZER.tokenize(text) if not token_ids: return None sentence_lens = [len(token_ids)] if _APPEND_EOD: token_ids.append(_TOKENIZER.eod) sentence_lens[-1] += 1 return token_ids, sentence_lens def output_paths(output_prefix, text_key): prefix = Path(output_prefix) return ( Path(str(prefix) + f"_{text_key}_document.bin"), Path(str(prefix) + f"_{text_key}_document.idx"), ) def remove_partial_outputs(output_prefix, text_key): bin_file, idx_file = output_paths(output_prefix, text_key) bin_file.unlink(missing_ok=True) idx_file.unlink(missing_ok=True) def convert_one_parquet(args_tuple): parquet_path, args = args_tuple parquet_path = Path(parquet_path) stem = parquet_path.name.replace(".zstd.parquet", "").replace(".parquet", "") output_prefix = Path(args.output_dir) / f"{args.output_prefix_prefix}_{stem}" bin_file, idx_file = output_paths(output_prefix, args.text_key) if not args.overwrite and bin_file.exists() and idx_file.exists(): return f"[SKIP] {parquet_path.name}: existing bin/idx" remove_partial_outputs(output_prefix, args.text_key) add_megatron_to_path(args.megatron_dir) from megatron.core.datasets import indexed_dataset tokenizer = build_megatron_tokenizer(args) dtype = indexed_dataset.DType.optimal_dtype(tokenizer.vocab_size) builder = indexed_dataset.IndexedDatasetBuilder(str(bin_file), dtype=dtype) start_time = time.time() rows = 0 docs = 0 tokens = 0 def consume_encoded(encoded): nonlocal docs, tokens if encoded is None: return token_ids, sentence_lens = encoded builder.add_document(token_ids, sentence_lens) docs += 1 tokens += len(token_ids) if args.log_interval and docs % args.log_interval == 0: elapsed = max(time.time() - start_time, 1e-6) print( f"[{parquet_path.name}] docs={docs} " f"tokens={tokens} docs/s={docs / elapsed:.2f}", flush=True, ) pf = pq.ParquetFile(parquet_path) if args.workers_per_shard == 1: init_worker(args) for batch in pf.iter_batches(columns=[args.text_key], batch_size=args.batch_size): texts = batch.column(0).to_pylist() rows += len(texts) for text in texts: consume_encoded(encode_text(text)) else: with Pool(processes=args.workers_per_shard, initializer=init_worker, initargs=(args,)) as pool: for batch in pf.iter_batches(columns=[args.text_key], batch_size=args.batch_size): texts = batch.column(0).to_pylist() rows += len(texts) for encoded in pool.imap(encode_text, texts, chunksize=args.chunksize): consume_encoded(encoded) builder.finalize(str(idx_file)) elapsed = max(time.time() - start_time, 1e-6) return ( f"[OK] {parquet_path.name}: rows={rows}, docs={docs}, tokens={tokens}, " f"elapsed={elapsed:.1f}s, docs/s={docs / elapsed:.2f}, output_prefix={output_prefix}" ) def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--input-dir", required=True) parser.add_argument("--output-dir", required=True) parser.add_argument("--tmp-dir", default=None, help="Deprecated; kept for CLI compatibility.") parser.add_argument("--megatron-dir", default="/apps/model_training/Megatron-LM") parser.add_argument("--tokenizer-type", default="HuggingFaceTokenizer") parser.add_argument("--tokenizer-model", required=True) parser.add_argument("--tokenizer-metadata", default=None) parser.add_argument("--tokenizer-special-tokens", nargs="*", default=None) parser.add_argument("--tokenizer-sentencepiece-legacy", action="store_true") parser.add_argument("--tokenizer-hf-no-use-fast", action="store_true") parser.add_argument("--tokenizer-hf-no-include-special-tokens", action="store_true") parser.add_argument("--trust-remote-code", action="store_true") parser.add_argument("--vocab-file", default=None) parser.add_argument("--merge-file", default=None) parser.add_argument("--vocab-size", type=int, default=None) parser.add_argument("--tiktoken-pattern", default=None) parser.add_argument("--tiktoken-num-special-tokens", type=int, default=1000) parser.add_argument("--null-tokenizer-eod-id", type=int, default=None) parser.add_argument("--null-tokenizer-pad-id", type=int, default=-1) parser.add_argument("--text-key", default="text") parser.add_argument("--output-prefix-prefix", default="phase1") parser.add_argument("--num-shards", type=int, default=1, help="Parallel parquet files.") parser.add_argument("--workers-per-shard", type=int, default=max((os.cpu_count() or 8) // 2, 1)) parser.add_argument("--batch-size", type=int, default=16384, help="Parquet record batch size.") parser.add_argument("--chunksize", type=int, default=128, help="Tokenizer pool imap chunk size.") parser.add_argument("--log-interval", type=int, default=10000) parser.add_argument("--start", type=int, default=0) parser.add_argument("--end", type=int, default=None) parser.add_argument("--append-eod", action=argparse.BooleanOptionalAction, default=True) parser.add_argument("--keep-jsonl", action="store_true", help="Deprecated; no JSONL is written.") parser.add_argument("--overwrite", action="store_true") return parser.parse_args() def main(): args = parse_args() if args.num_shards < 1: raise ValueError("--num-shards must be >= 1") if args.workers_per_shard < 1: raise ValueError("--workers-per-shard must be >= 1") if args.batch_size < 1: raise ValueError("--batch-size must be >= 1") if args.chunksize < 1: raise ValueError("--chunksize must be >= 1") files = sorted(Path(args.input_dir).glob("*.zstd.parquet")) if not files: files = sorted(Path(args.input_dir).glob("*.parquet")) files = files[args.start:args.end] print(f"Converting {len(files)} files") print(f"Parallel parquet files: {args.num_shards}") print(f"Tokenizer workers per parquet: {args.workers_per_shard}") print(f"Total tokenizer workers: {args.num_shards * args.workers_per_shard}") Path(args.output_dir).mkdir(parents=True, exist_ok=True) if args.tmp_dir: Path(args.tmp_dir).mkdir(parents=True, exist_ok=True) tasks = [(str(f), args) for f in files] if args.num_shards == 1: for task in tqdm(tasks): print(convert_one_parquet(task), flush=True) else: with ProcessPoolExecutor(max_workers=args.num_shards) as ex: futs = [ex.submit(convert_one_parquet, task) for task in tasks] for fut in tqdm(as_completed(futs), total=len(futs)): print(fut.result(), flush=True) if __name__ == "__main__": main()