Source code for easylink.utilities.splitter_utils

"""

This module contains utility functions for splitting datasets into smaller datasets.
One primary use case for this is to run sections of the pipeline in an auto 
parallel manner.

Note that it is critical that all data splitting utility functions are definied
in this module; easylink will not be able to find them otherwise.

"""

import math
import os

import pandas as pd
from loguru import logger


[docs] def split_data_by_size( input_files: list[str], output_dir: str, desired_chunk_size_mb: int | float ) -> None: """Splits the data (from a single input slot) into chunks of desired size. This function takes all datasets from a single input slot, concatenates them, and then splits the resulting dataset into chunks of the desired size. Note that this will split the data as evenly as possible, but the final chunk may be smaller than the desired size if the input data does not divide evenly; it makes no effort to redistribute the lingering data. Parameters ---------- input_files A list of input file paths to be concatenated and split. output_dir The directory where the resulting chunks will be saved. desired_chunk_size_mb The desired size of each chunk, in megabytes. """ # concatenate all input files df = pd.DataFrame() input_file_size_mb = 0 for file in input_files: input_file_size_mb += os.path.getsize(file) / 1024**2 tmp = pd.read_parquet(file) df = pd.concat([df, tmp], ignore_index=True) # divide df into num_chunks and save each one out num_chunks = math.ceil(input_file_size_mb / desired_chunk_size_mb) chunk_size = math.ceil(len(df) / num_chunks) if num_chunks == 1: logger.info(f"Input data is already smaller than desired chunk size; not splitting") else: logger.info( f"Splitting a {round(input_file_size_mb, 2)} MB dataset ({len(df)} rows) into " f"into {num_chunks} chunks of size ~{desired_chunk_size_mb} MB each" ) for i in range(num_chunks): start = i * chunk_size end = (i + 1) * chunk_size chunk = df.iloc[start:end] chunk_dir = os.path.join(output_dir, f"chunk_{i}") if not os.path.exists(chunk_dir): os.makedirs(chunk_dir) logger.debug( f"Writing out chunk {i+1}/{num_chunks} (rows {chunk.index[0]} to " f"{chunk.index[-1]})" ) chunk.to_parquet(os.path.join(chunk_dir, "result.parquet"))
[docs] def split_data_in_two(input_files: list[str], output_dir: str, *args, **kwargs) -> None: """Splits the data (from a single input slot) into two chunks of equal. This function takes all datasets from a single input slot, concatenates them, and then splits the resulting dataset into two chunks of similar size. Parameters ---------- input_files A list of input file paths to be concatenated and split. output_dir The directory where the resulting chunks will be saved. desired_chunk_size_mb The desired size of each chunk, in megabytes. """ # concatenate all input files df = pd.DataFrame() for file in input_files: tmp = pd.read_parquet(file) df = pd.concat([df, tmp], ignore_index=True) # divide df into two and save each chunk out num_chunks = 2 chunk_size = math.ceil(len(df) / num_chunks) for i in range(num_chunks): start = i * chunk_size end = (i + 1) * chunk_size chunk = df.iloc[start:end] chunk_dir = os.path.join(output_dir, f"chunk_{i}") if not os.path.exists(chunk_dir): os.makedirs(chunk_dir) chunk.to_parquet(os.path.join(chunk_dir, "result.parquet"))