Skip to content

Batch Processing

Parallel multi-replay parsing via ProcessPoolExecutor. Each worker process parses one replay independently, so performance scales with CPU cores.

Memory

parse_many_to_parquet writes and discards each replay immediately, keeping memory usage flat regardless of batch size. parse_many_to_dataframe holds all results in memory until concatenation — prefer parse_many_to_parquet for large batches.

Parquet dependency

Parquet output requires an optional engine. Install pyarrow (recommended):

pip install pyarrow


gem.batch

Bulk replay parsing — process many .dem files in parallel.

Provides three public functions:

  • :func:parse_many — parse a list/folder of replays, return list[ParseResult].
  • :func:parse_many_to_dataframe — same, but concatenate all successful results into a dict[str, DataFrame] (one row-set per table, with a match_path column added for provenance).
  • :func:parse_many_to_parquet — parse-and-write each replay into its own subdirectory under output_dir, one .parquet file per table. Replays are processed and discarded one at a time to keep memory bounded.

All three functions use ProcessPoolExecutor for true parallelism (CPU-bound work) and display a Rich progress bar by default.

ParseResult dataclass

Outcome of parsing a single replay.

Parameters:

Name Type Description Default
path Path

Absolute path to the .dem file.

required
match ParsedMatch | None

Populated :class:~gem.models.ParsedMatch, or None on failure.

required
error Exception | None

Exception raised during parsing, or None on success.

required
Source code in src/gem/batch.py
@dataclass
class ParseResult:
    """Outcome of parsing a single replay.

    Args:
        path: Absolute path to the ``.dem`` file.
        match: Populated :class:`~gem.models.ParsedMatch`, or ``None`` on failure.
        error: Exception raised during parsing, or ``None`` on success.
    """

    path: Path
    match: ParsedMatch | None
    error: Exception | None

    @property
    def ok(self) -> bool:
        """Return ``True`` when parsing succeeded."""
        return self.error is None

ok: bool property

Return True when parsing succeeded.

parse_many(source: str | Path | Sequence[str | Path], *, workers: int | None = None, recursive: bool = False, progress: bool = True, timeout: float | None = None) -> list[ParseResult]

Parse multiple replays in parallel and return a result per replay.

Parameters:

Name Type Description Default
source str | Path | Sequence[str | Path]

Either a directory path (all .dem files inside) or an explicit list of replay paths.

required
workers int | None

Number of worker processes. Defaults to os.cpu_count(), capped at the number of replays.

None
recursive bool

When source is a directory, scan subdirectories too.

False
progress bool

Show a Rich progress bar while parsing.

True
timeout float | None

Per-replay timeout in seconds. None means no limit.

None

Returns:

Type Description
list[ParseResult]

List of :class:ParseResult in completion order. Failed replays have

list[ParseResult]

result.ok == False and carry the exception in result.error.

Source code in src/gem/batch.py
def parse_many(
    source: str | Path | Sequence[str | Path],
    *,
    workers: int | None = None,
    recursive: bool = False,
    progress: bool = True,
    timeout: float | None = None,
) -> list[ParseResult]:
    """Parse multiple replays in parallel and return a result per replay.

    Args:
        source: Either a directory path (all ``.dem`` files inside) or an
            explicit list of replay paths.
        workers: Number of worker processes.  Defaults to ``os.cpu_count()``,
            capped at the number of replays.
        recursive: When *source* is a directory, scan subdirectories too.
        progress: Show a Rich progress bar while parsing.
        timeout: Per-replay timeout in seconds.  ``None`` means no limit.

    Returns:
        List of :class:`ParseResult` in completion order.  Failed replays have
        ``result.ok == False`` and carry the exception in ``result.error``.
    """
    paths = _collect_paths(source, recursive=recursive)
    n_workers = min(workers or os.cpu_count() or 1, len(paths))

    results: list[ParseResult] = []

    from rich.progress import (
        BarColumn,
        MofNCompleteColumn,
        Progress,
        TextColumn,
        TimeElapsedColumn,
    )

    rich_progress: Progress | None = (
        Progress(
            TextColumn("[progress.description]{task.description}"),
            BarColumn(),
            MofNCompleteColumn(),
            TimeElapsedColumn(),
        )
        if progress
        else None
    )

    def _run(executor: ProcessPoolExecutor) -> None:
        future_to_path: dict[Future[tuple], Path] = {
            executor.submit(_parse_one, p): p for p in paths
        }
        for future in as_completed(future_to_path, timeout=timeout):
            path, match, error = future.result()
            results.append(ParseResult(path=path, match=match, error=error))
            if rich_progress is not None:
                rich_progress.advance(task_id)

    def _execute() -> None:
        with ProcessPoolExecutor(max_workers=n_workers) as pool:
            _run(pool)

    if rich_progress is not None:
        with rich_progress:
            task_id = rich_progress.add_task(
                f"[cyan]Parsing {len(paths)} replay(s)…[/cyan]", total=len(paths)
            )
            _execute()
    else:
        _execute()

    return results

parse_many_to_dataframe(source: str | Path | Sequence[str | Path], *, workers: int | None = None, recursive: bool = False, progress: bool = True, timeout: float | None = None) -> dict[str, pd.DataFrame]

Parse multiple replays and concatenate results into per-table DataFrames.

Each DataFrame gets a match_path column added so rows can be traced back to their source replay.

Parameters:

Name Type Description Default
source str | Path | Sequence[str | Path]

Directory path or explicit list of replay paths.

required
workers int | None

Number of worker processes (default: os.cpu_count()).

None
recursive bool

Scan subdirectories when source is a directory.

False
progress bool

Show a Rich progress bar while parsing.

True
timeout float | None

Per-replay timeout in seconds.

None

Returns:

Type Description
dict[str, DataFrame]

dict[str, DataFrame] with the same keys as

dict[str, DataFrame]

func:~gem.parse_to_dataframe, containing rows from all successful

dict[str, DataFrame]

replays concatenated together.

Source code in src/gem/batch.py
def parse_many_to_dataframe(
    source: str | Path | Sequence[str | Path],
    *,
    workers: int | None = None,
    recursive: bool = False,
    progress: bool = True,
    timeout: float | None = None,
) -> dict[str, pd.DataFrame]:
    """Parse multiple replays and concatenate results into per-table DataFrames.

    Each DataFrame gets a ``match_path`` column added so rows can be traced
    back to their source replay.

    Args:
        source: Directory path or explicit list of replay paths.
        workers: Number of worker processes (default: ``os.cpu_count()``).
        recursive: Scan subdirectories when *source* is a directory.
        progress: Show a Rich progress bar while parsing.
        timeout: Per-replay timeout in seconds.

    Returns:
        ``dict[str, DataFrame]`` with the same keys as
        :func:`~gem.parse_to_dataframe`, containing rows from all successful
        replays concatenated together.
    """
    import pandas as pd

    from gem.dataframes import build_dataframes

    results = parse_many(
        source, workers=workers, recursive=recursive, progress=progress, timeout=timeout
    )

    per_table: dict[str, list[pd.DataFrame]] = {}
    for result in results:
        if not result.ok or result.match is None:
            continue
        dfs = build_dataframes(result.match)
        for key, df in dfs.items():
            df = df.copy()
            df.insert(0, "match_path", str(result.path))
            per_table.setdefault(key, []).append(df)

    return {key: pd.concat(frames, ignore_index=True) for key, frames in per_table.items()}

parse_many_to_parquet(source: str | Path | Sequence[str | Path], output_dir: str | Path, *, workers: int | None = None, recursive: bool = False, progress: bool = True, timeout: float | None = None, index: bool = False) -> list[Path]

Parse multiple replays and write each to its own parquet subdirectory.

Each replay is written and discarded immediately to keep memory usage bounded regardless of batch size. The output layout is::

output_dir/
  <replay_stem>/
    players.parquet
    combat_log.parquet
    ...

Parameters:

Name Type Description Default
source str | Path | Sequence[str | Path]

Directory path or explicit list of replay paths.

required
output_dir str | Path

Root directory to write parquet subdirectories into.

required
workers int | None

Number of worker processes (default: os.cpu_count()).

None
recursive bool

Scan subdirectories when source is a directory.

False
progress bool

Show a Rich progress bar while parsing.

True
timeout float | None

Per-replay timeout in seconds.

None
index bool

Whether to include the DataFrame index in parquet output.

False

Returns:

Type Description
list[Path]

List of all parquet file paths written.

Source code in src/gem/batch.py
def parse_many_to_parquet(
    source: str | Path | Sequence[str | Path],
    output_dir: str | Path,
    *,
    workers: int | None = None,
    recursive: bool = False,
    progress: bool = True,
    timeout: float | None = None,
    index: bool = False,
) -> list[Path]:
    """Parse multiple replays and write each to its own parquet subdirectory.

    Each replay is written and discarded immediately to keep memory usage
    bounded regardless of batch size.  The output layout is::

        output_dir/
          <replay_stem>/
            players.parquet
            combat_log.parquet
            ...

    Args:
        source: Directory path or explicit list of replay paths.
        output_dir: Root directory to write parquet subdirectories into.
        workers: Number of worker processes (default: ``os.cpu_count()``).
        recursive: Scan subdirectories when *source* is a directory.
        progress: Show a Rich progress bar while parsing.
        timeout: Per-replay timeout in seconds.
        index: Whether to include the DataFrame index in parquet output.

    Returns:
        List of all parquet file paths written.
    """
    from gem import to_parquet

    results = parse_many(
        source, workers=workers, recursive=recursive, progress=progress, timeout=timeout
    )

    out_root = Path(output_dir)
    written: list[Path] = []

    for result in results:
        if not result.ok or result.match is None:
            continue
        subdir = out_root / result.path.stem
        written.extend(to_parquet(result.match, subdir, index=index))

    return written