From 37d3dc9df9f5cbd0400da4724d68128506e9ce98 Mon Sep 17 00:00:00 2001 From: Praateek Date: Wed, 2 Apr 2025 16:50:57 -0700 Subject: [PATCH 1/9] fc Signed-off-by: Praateek --- nemo_curator/datasets/doc_dataset.py | 33 ++++-- .../modules/semantic_dedup/clusteringmodel.py | 54 +++++---- .../semanticclusterleveldedup.py | 25 +++- nemo_curator/utils/distributed_utils.py | 107 +++++++++++++----- nemo_curator/utils/file_utils.py | 59 +++++++--- nemo_curator/utils/semdedup_utils.py | 26 +++-- 6 files changed, 220 insertions(+), 84 deletions(-) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 11dd7e51e..9a9a63b9e 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -14,12 +14,12 @@ import os from functools import wraps -from typing import Any, Callable, List, Literal, Optional, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Union import dask.dataframe as dd from nemo_curator.utils.distributed_utils import read_data, write_to_disk -from nemo_curator.utils.file_utils import get_all_files_paths_under +from nemo_curator.utils.file_utils import get_all_files_paths_under, get_fs class DocumentDataset: @@ -55,6 +55,7 @@ def read_json( add_filename: Union[bool, str] = False, input_meta: Optional[Union[str, dict]] = None, columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> "DocumentDataset": """ @@ -82,6 +83,7 @@ def read_json( blocksize=blocksize, input_meta=input_meta, columns=columns, + storage_options=storage_options, **kwargs, ) ) @@ -95,6 +97,7 @@ def read_parquet( blocksize: Optional[str] = "1gb", add_filename: Union[bool, str] = False, columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> "DocumentDataset": """ @@ -120,6 +123,7 @@ def read_parquet( files_per_partition=files_per_partition, blocksize=blocksize, columns=columns, + storage_options=storage_options, **kwargs, ) ) @@ -130,6 +134,7 @@ def read_pickle( input_files: Union[str, List[str]], backend: Literal["pandas", "cudf"] = "pandas", columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> "DocumentDataset": """ @@ -151,6 +156,7 @@ def read_pickle( file_type="pickle", backend=backend, columns=columns, + storage_options=storage_options, **kwargs, ) ) @@ -169,6 +175,7 @@ def read_custom( add_filename: Union[bool, str] = False, columns: Optional[List[str]] = None, input_meta: Union[str, dict] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> "DocumentDataset": """ @@ -207,6 +214,7 @@ def read_custom( root=input_files, recurse_subdirectories=False, keep_extensions=[file_type], + storage_options=storage_options, ) elif isinstance(input_files, list): files = input_files @@ -222,6 +230,7 @@ def read_custom( columns=columns, input_meta=input_meta, read_func_single_partition=read_func_single_partition, + storage_options=storage_options, **kwargs, ) ) @@ -232,6 +241,7 @@ def to_json( write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, partition_on: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, ): """ Writes the dataset to the specified path in JSONL format. @@ -262,6 +272,7 @@ def to_json( keep_filename_column=keep_filename_column, partition_on=partition_on, output_type="jsonl", + storage_options=storage_options, ) def to_parquet( @@ -270,6 +281,7 @@ def to_parquet( write_to_filename: Union[bool, str] = False, keep_filename_column: bool = False, partition_on: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, ): """ Writes the dataset to the specified path in Parquet format. @@ -300,6 +312,7 @@ def to_parquet( keep_filename_column=keep_filename_column, partition_on=partition_on, output_type="parquet", + storage_options=storage_options, ) def to_pickle( @@ -337,7 +350,7 @@ def from_pandas( ) ) - def to_pandas(self): + def to_pandas(self) -> "pd.DataFrame": """ Creates a pandas dataframe from a DocumentDataset @@ -356,6 +369,7 @@ def _read_json_or_parquet( blocksize: Optional[str] = None, input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ): """ @@ -376,10 +390,10 @@ def _read_json_or_parquet( """ file_ext = "." + file_type - if isinstance(input_files, list): + fs = get_fs(input_files[0], storage_options) # List of files - if all(os.path.isfile(f) for f in input_files): + if all(fs.isfile(f) for f in input_files): raw_data = read_data( input_files, file_type=file_type, @@ -389,6 +403,7 @@ def _read_json_or_parquet( add_filename=add_filename, input_meta=input_meta, columns=columns, + storage_options=storage_options, **kwargs, ) @@ -398,7 +413,7 @@ def _read_json_or_parquet( for data_path in input_files: files = get_all_files_paths_under( - root=data_path, recurse_subdirectories=False + root=data_path, recurse_subdirectories=False, fs=fs ) df = read_data( files, @@ -409,6 +424,7 @@ def _read_json_or_parquet( add_filename=add_filename, input_meta=input_meta, columns=columns, + storage_options=storage_options, **kwargs, ) dfs.append(df) @@ -423,7 +439,9 @@ def _read_json_or_parquet( # Directory of jsonl or parquet files else: files = get_all_files_paths_under( - root=input_files, recurse_subdirectories=False + root=input_files, + recurse_subdirectories=False, + storage_options=storage_options, ) raw_data = read_data( @@ -435,6 +453,7 @@ def _read_json_or_parquet( add_filename=add_filename, input_meta=input_meta, columns=columns, + storage_options=storage_options, **kwargs, ) diff --git a/nemo_curator/modules/semantic_dedup/clusteringmodel.py b/nemo_curator/modules/semantic_dedup/clusteringmodel.py index cc7fe0369..958b5cf62 100644 --- a/nemo_curator/modules/semantic_dedup/clusteringmodel.py +++ b/nemo_curator/modules/semantic_dedup/clusteringmodel.py @@ -15,9 +15,8 @@ import logging import os -import shutil import time -from typing import Optional, Union +from typing import Dict, Optional, Union import cudf import cupy as cp @@ -28,7 +27,7 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.log import create_logger from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix -from nemo_curator.utils.file_utils import expand_outdir_and_mkdir +from nemo_curator.utils.file_utils import expand_outdir_and_mkdir, get_fs from nemo_curator.utils.semdedup_utils import ( COSINE_DIST_TO_CENT_COL, L2_DIST_TO_CENT_COL, @@ -51,6 +50,7 @@ def __init__( clustering_input_partition_size: Optional[str] = "2gb", logger: Union[logging.Logger, str] = "./", profile_dir: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, ): """ Initializes the ClusteringModel with the provided settings for semantic clustering to help semantic deduplication. @@ -73,7 +73,8 @@ def __init__( Default is "./". profile_dir (Optional[str]): If specified, directory to write Dask profile. Default is None. - + storage_options (Optional[Dict[str, str]]): Storage options for the file system. + Default is None. """ self.id_col = id_column self.max_iter = max_iter @@ -84,9 +85,13 @@ def __init__( self.clustering_input_partition_size = clustering_input_partition_size self.logger = self._setup_logger(logger) self.profile_dir = profile_dir + self.storage_options = storage_options + self.fs = get_fs(self.clustering_output_dir, self.storage_options) - if not os.path.exists(self.clustering_output_dir): - expand_outdir_and_mkdir(self.clustering_output_dir) + if not self.fs.exists(self.clustering_output_dir): + expand_outdir_and_mkdir( + self.clustering_output_dir, self.storage_options, self.fs + ) else: self.logger.warning( f"Clustering output directory {self.clustering_output_dir} already exists and will be overwritten" @@ -187,37 +192,46 @@ def __call__(self, embeddings_dataset: DocumentDataset): kmeans_centroids_file = os.path.join( self.clustering_output_dir, "kmeans_centroids.npy" ) - np.save(kmeans_centroids_file, centroids) + with self.fs.open(kmeans_centroids_file, "wb") as f: + np.save(f, centroids) self.logger.info("Saving centroids complete") del kmeans, cupy_normalized_darr, centroids # Save embeddings by nearest center to a file - clustering_output_dir = os.path.join( + embeddings_by_nearest_center_dir = os.path.join( self.clustering_output_dir, "embs_by_nearest_center" ) - if os.path.exists(clustering_output_dir): + if os.path.exists(embeddings_by_nearest_center_dir): self.logger.warning( - f"Output directory {clustering_output_dir} already exists and will be overwritten" + f"Output directory {embeddings_by_nearest_center_dir} already exists and will be overwritten" ) - shutil.rmtree(clustering_output_dir) + self.fs.rm(embeddings_by_nearest_center_dir, recursive=True) + + # TODO see if we have to create the directory + embeddings_df.map_partitions( + lambda df: df.to_parquet( + embeddings_by_nearest_center_dir, + index=False, + partition_cols=["nearest_cent"], + storage_options=self.storage_options, + ), + # we specify meta to allow skipping _meta_nonempty + meta=dict(), + ).compute() - embeddings_df.to_parquet( - clustering_output_dir, - index=False, - partition_on="nearest_cent", - write_index=False, - ) self.logger.info( f"Time taken for assigning distance to each embedding: {time.time() - t0}s" - f" and output written at {clustering_output_dir}" + f" and output written at {embeddings_by_nearest_center_dir}" ) del embeddings_df # We read this way to ensure each cluster is read in a single partition # This allows us to perform pairwise similarity within the cluster fps = [ - os.path.join(clustering_output_dir, f"nearest_cent={i}") + os.path.join(self.clustering_output_dir, f"nearest_cent={i}") for i in range(self.n_clusters) ] - embeddings_df = dd.from_map(cudf.read_parquet, fps) + embeddings_df = dd.from_map( + cudf.read_parquet, fps, storage_options=self.storage_options + ) return DocumentDataset(embeddings_df) diff --git a/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py b/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py index 93121f7e2..aa1f49073 100644 --- a/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py +++ b/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py @@ -17,7 +17,7 @@ import os import shutil import time -from typing import Literal, Optional, Union +from typing import Dict, Literal, Optional, Union import dask.bag as db import dask.dataframe as dd @@ -25,7 +25,7 @@ from nemo_curator.datasets import DocumentDataset from nemo_curator.log import create_logger from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix -from nemo_curator.utils.file_utils import expand_outdir_and_mkdir +from nemo_curator.utils.file_utils import expand_outdir_and_mkdir, get_fs from nemo_curator.utils.semdedup_utils import ( get_semantic_matches_per_cluster, prune_single_cluster, @@ -46,6 +46,7 @@ def __init__( batched_cosine_similarity: int = 1024, logger: Union[logging.Logger, str] = "./", profile_dir: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, ) -> None: """ Initialize the SemanticClusterLevelDedup class. @@ -73,6 +74,8 @@ def __init__( Default is "./". profile_dir (Optional[str]): If specified, directory to write Dask profile. Default is None. + storage_options (Optional[Dict[str, str]]): Storage options for the file system. + Default is None. """ self.n_clusters = n_clusters @@ -89,6 +92,8 @@ def __init__( self.batched_cosine_similarity = batched_cosine_similarity self.logger = self._setup_logger(logger) self.profile_dir = profile_dir + self.storage_options = storage_options + self.fs = get_fs(self.output_dir, self.storage_options) def _setup_logger(self, logger: Union[logging.Logger, str]) -> logging.Logger: """ @@ -112,12 +117,14 @@ def _setup_logger(self, logger: Union[logging.Logger, str]) -> logging.Logger: return logger def compute_semantic_match_dfs(self) -> None: - if os.path.exists(self.semdedup_pruning_tables_dir): + if self.fs.exists(self.semdedup_pruning_tables_dir): self.logger.info( f"Removing existing directory {self.semdedup_pruning_tables_dir}" ) - shutil.rmtree(self.semdedup_pruning_tables_dir) - expand_outdir_and_mkdir(self.semdedup_pruning_tables_dir) + self.fs.rm(self.semdedup_pruning_tables_dir, recursive=True) + expand_outdir_and_mkdir( + self.semdedup_pruning_tables_dir, self.storage_options, self.fs + ) t0 = time.time() with performance_report_if_with_ts_suffix( @@ -135,6 +142,7 @@ def compute_semantic_match_dfs(self) -> None: which_to_keep=self.which_to_keep, sim_metric=self.sim_metric, batched_cosine_similarity=self.batched_cosine_similarity, + storage_options=self.storage_options, ) ) tasks.compute() @@ -172,6 +180,7 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset: emb_by_clust_dir=self.emb_by_clust_dir, semdedup_pruning_tables_dir=self.semdedup_pruning_tables_dir, eps=eps_to_extract, + storage_options=self.storage_options, ) results_df.to_parquet(output_parquet_path, index=False, ignore_index=True) self.logger.info( @@ -188,7 +197,11 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset: filtered_unique_ids_path=output_parquet_path, output_summary_file=output_summary_file, logger=self.logger, + storage_options=self.storage_options, ) return DocumentDataset.read_parquet( - output_parquet_path, blocksize="1gb", backend="cudf" + output_parquet_path, + blocksize="1gb", + backend="cudf", + storage_options=self.storage_options, ) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 24e1ddfff..52039dda0 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -15,12 +15,12 @@ import ast import os -import shutil import subprocess import dask from nemo_curator._compat import DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA +from nemo_curator.utils.file_utils import get_fs os.environ["RAPIDS_NO_INITIALIZE"] = "1" import random @@ -309,6 +309,7 @@ def read_single_partition( add_filename: Union[bool, str] = False, input_meta: Union[str, dict] = None, io_columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> Union["cudf.DataFrame", pd.DataFrame]: """ @@ -385,7 +386,7 @@ def read_single_partition( concat_f = pd.concat df_ls = [] for file in files: - df = read_f(file, **read_kwargs, **kwargs) + df = read_f(file, storage_options=storage_options, **read_kwargs, **kwargs) if add_filename: df[_resolve_filename_col(add_filename)] = os.path.basename(file) df = select_columns(df, io_columns, file_type, add_filename) @@ -393,7 +394,7 @@ def read_single_partition( df = concat_f(df_ls, ignore_index=True) else: - df = read_f(files, **read_kwargs, **kwargs) + df = read_f(files, storage_options=storage_options, **read_kwargs, **kwargs) df = select_columns(df, io_columns, file_type, add_filename) return df @@ -406,6 +407,7 @@ def read_data_blocksize( add_filename: Union[bool, str] = False, input_meta: Union[str, dict] = None, columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> dd.DataFrame: read_kwargs = dict() @@ -440,9 +442,10 @@ def read_data_blocksize( # only returns those columns, we explicitly set `columns` here columns = list(read_kwargs["dtype"].keys()) if add_filename: + fs = get_fs(input_files[0], storage_options) def extract_filename(path: str) -> str: - return os.path.basename(path) + return fs.basename(path) read_kwargs["include_path_column"] = _resolve_filename_col(add_filename) read_kwargs["path_converter"] = extract_filename @@ -472,7 +475,13 @@ def extract_filename(path: str) -> str: raise ValueError(msg) with dask.config.set({"dataframe.backend": backend}): - df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs) + df = read_func( + input_files, + blocksize=blocksize, + storage_options=storage_options, + **read_kwargs, + **kwargs, + ) output = select_columns(df, columns, file_type, add_filename) return output[sorted(output.columns)] @@ -492,6 +501,7 @@ def read_data_files_per_partition( Union[dd.DataFrame, pd.DataFrame], # Return type ] ] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> dd.DataFrame: if read_func_single_partition is None: @@ -519,6 +529,7 @@ def read_data_files_per_partition( backend=backend, add_filename=add_filename, input_meta=input_meta, + storage_options=storage_options, **read_func_single_partition_kwargs, ) output = output[sorted(output.columns)] @@ -529,6 +540,7 @@ def read_pandas_pickle( file: str, add_filename: Union[bool, str] = False, columns: Optional[List[str]] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> pd.DataFrame: """ @@ -546,9 +558,9 @@ def read_pandas_pickle( warnings.warn("add_filename is not supported for pickle files") if columns is not None: - return pd.read_pickle(file, **kwargs)[columns] + return pd.read_pickle(file, storage_options=storage_options, **kwargs)[columns] else: - return pd.read_pickle(file, **kwargs) + return pd.read_pickle(file, storage_options=storage_options, **kwargs) def read_data( @@ -566,6 +578,7 @@ def read_data( Union[dd.DataFrame, pd.DataFrame], # Return type ] ] = None, + storage_options: Optional[Dict[str, str]] = None, **kwargs, ) -> dd.DataFrame: """ @@ -591,6 +604,8 @@ def read_data( - add_filename: Read below - columns: Read below - input_meta: Read below + storage_options (Optional[Dict[str, str]]): Storage options for the file system. + Default is None. Returns: A Dask-cuDF or a Dask-pandas DataFrame. @@ -611,11 +626,16 @@ def read_data( input_meta=input_meta, columns=columns, read_func_single_partition=read_func_single_partition, + storage_options=storage_options, **kwargs, ) elif file_type == "pickle": df = read_pandas_pickle( - input_files[0], add_filename=add_filename, columns=columns, **kwargs + input_files[0], + add_filename=add_filename, + columns=columns, + storage_options=storage_options, + **kwargs, ) df = dd.from_pandas(df, npartitions=16) if backend == "cudf": @@ -653,6 +673,7 @@ def read_data( add_filename=add_filename, input_meta=input_meta, columns=columns, + storage_options=storage_options, **kwargs, ) else: @@ -671,6 +692,7 @@ def read_data( input_meta=input_meta, columns=columns, read_func_single_partition=read_func_single_partition, + storage_options=storage_options, **kwargs, ) else: @@ -822,12 +844,15 @@ def single_partition_write_with_filename( def _single_partition_write_to_simple_bitext( - out_df, output_file_path, partition_info=None + out_df, + output_file_path, + partition_info=None, + storage_options: Optional[Dict[str, str]] = None, ): if len(out_df) > 0: empty_partition = False else: - warnings.warn(f"Empty partition found") + warnings.warn("Empty partition found") empty_partition = True if is_cudf_type(out_df): @@ -841,12 +866,14 @@ def _single_partition_write_to_simple_bitext( if empty_partition: return success_ser + fs = get_fs(output_file_path, storage_options) + src_output_file_path = output_file_path + f".{out_df['src_lang'].iloc[0]}" tgt_output_file_path = output_file_path + f".{out_df['tgt_lang'].iloc[0]}" partition_id = partition_info["number"] if partition_info else 0 with ( - open(f"{src_output_file_path}.{partition_id}", "w") as src_out, - open(f"{tgt_output_file_path}.{partition_id}", "w") as tgt_out, + fs.open(f"{src_output_file_path}.{partition_id}", "w") as src_out, + fs.open(f"{tgt_output_file_path}.{partition_id}", "w") as tgt_out, ): # Handle cuDF Series which are not directly iterable if is_cudf_type(out_df): @@ -863,7 +890,11 @@ def _single_partition_write_to_simple_bitext( return success_ser -def _merge_tmp_simple_bitext_partitions(tmp_output_dir: str, output_dir: str): +def _merge_tmp_simple_bitext_partitions( + tmp_output_dir: str, + output_dir: str, + storage_options: Optional[Dict[str, str]] = None, +): """Merge partitions of simple bitext files in `tmp_output_dir` into files at `output_dir`. Args: @@ -871,9 +902,9 @@ def _merge_tmp_simple_bitext_partitions(tmp_output_dir: str, output_dir: str): with suffixes that looks like "file.1", "file.2" that shows the merging order output_file_path (str): dir to write output files """ - + fs = get_fs(tmp_output_dir, storage_options) sorted_tmp_files = sorted( - os.listdir(tmp_output_dir), key=lambda x: int(x.split(".")[-1]) + fs.listdir(tmp_output_dir), key=lambda x: int(x.split(".")[-1]) ) unique_file_handles = {} # Loop through the sorted files and concatenate their contents @@ -886,9 +917,9 @@ def _merge_tmp_simple_bitext_partitions(tmp_output_dir: str, output_dir: str): # create the output file if we haven't yet if output_file_path not in unique_file_handles: - unique_file_handles[output_file_path] = open(output_file_path, "w") + unique_file_handles[output_file_path] = fs.open(output_file_path, "w") - with open(input_file_path, "r") as infile: + with fs.open(input_file_path, "r") as infile: unique_file_handles[output_file_path].write(infile.read()) # close all dangling file handles @@ -903,6 +934,7 @@ def write_to_disk( keep_filename_column: bool = False, output_type: str = "jsonl", partition_on: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, ): """ This function writes a Dask DataFrame to the specified file path. @@ -921,13 +953,18 @@ def write_to_disk( If specified, the data will be partitioned based on the unique values in this column, and each partition will be written to a separate directory """ + if storage_options is not None: + warnings.warn("storage_options is not tested for write_to_disk") filename_col = _resolve_filename_col(write_to_filename) # output_path is a file name if isinstance(output_path, str) and output_path.endswith(".jsonl"): if df.npartitions == 1: df.map_partitions( - _write_to_jsonl_or_parquet, output_path, output_type + _write_to_jsonl_or_parquet, + output_path, + output_type, + storage_options=storage_options, ).compute() return else: @@ -956,7 +993,8 @@ def write_to_disk( # output_path is a directory if write_to_filename and output_type != "bitext": - os.makedirs(output_path, exist_ok=True) + fs = get_fs(output_path, storage_options) + fs.makedirs(output_path, exist_ok=True) output = df.map_partitions( single_partition_write_with_filename, output_path, @@ -976,30 +1014,35 @@ def write_to_disk( output_path=output_path, output_type=output_type, partition_on=partition_on, + storage_options=storage_options, ) elif output_type == "bitext": + fs = get_fs(output_path, storage_options) + if write_to_filename: - os.makedirs(output_path, exist_ok=True) + fs.makedirs(output_path, exist_ok=True) tmp_output_file_dir = os.path.join(output_path, ".tmp") - os.makedirs(tmp_output_file_dir, exist_ok=True) - file_name = os.path.basename(list(df[filename_col].unique())[0]) + fs.makedirs(tmp_output_file_dir, exist_ok=True) + file_name = fs.basename(list(df[filename_col].unique())[0]) else: tmp_output_file_dir = os.path.join(output_path, ".tmp") - os.makedirs(tmp_output_file_dir, exist_ok=True) - file_name = os.path.basename(output_path) + fs.makedirs(tmp_output_file_dir, exist_ok=True) + file_name = fs.basename(output_path) output = df.map_partitions( _single_partition_write_to_simple_bitext, os.path.join(tmp_output_file_dir, file_name), meta=output_meta, enforce_metadata=False, + storage_options=storage_options, ) output = output.compute() _merge_tmp_simple_bitext_partitions( tmp_output_file_dir, (output_path if write_to_filename else os.path.dirname(output_path)), + storage_options=storage_options, ) - shutil.rmtree(tmp_output_file_dir) + fs.rm(tmp_output_file_dir, recursive=True) else: raise ValueError(f"Unknown output type: {output_type}") @@ -1011,7 +1054,9 @@ def _write_to_jsonl_or_parquet( output_path: str, output_type: Literal["jsonl", "parquet"] = "jsonl", partition_on: Optional[str] = None, + storage_options: Optional[Dict[str, str]] = None, ): + fs = get_fs(output_path, storage_options) if output_type == "jsonl": if partition_on is not None: unique_values = ( @@ -1022,7 +1067,7 @@ def _write_to_jsonl_or_parquet( .to_list() ) for value in unique_values: - os.makedirs(output_path, exist_ok=True) + fs.makedirs(output_path, exist_ok=True) partition_output_path = os.path.join( output_path, f"{partition_on}={value}" ) @@ -1032,6 +1077,7 @@ def _write_to_jsonl_or_parquet( lines=True, force_ascii=False, index=False, # Only index=False is supported for orient="records" + storage_options=storage_options, ) else: if is_cudf_type(df): @@ -1043,6 +1089,7 @@ def _write_to_jsonl_or_parquet( lines=True, force_ascii=False, index=False, + storage_options=storage_options, ) # Only index=False is supported for orient="records" else: df.to_json( @@ -1051,9 +1098,15 @@ def _write_to_jsonl_or_parquet( lines=True, force_ascii=False, index=False, + storage_options=storage_options, ) # Only index=False is supported for orient="records" elif output_type == "parquet": - df.to_parquet(output_path, write_index=False, partition_on=partition_on) + df.to_parquet( + output_path, + write_index=False, + partition_on=partition_on, + storage_options=storage_options, + ) else: raise ValueError(f"Unknown output type: {output_type}") diff --git a/nemo_curator/utils/file_utils.py b/nemo_curator/utils/file_utils.py index 7aff6c240..07997d776 100644 --- a/nemo_curator/utils/file_utils.py +++ b/nemo_curator/utils/file_utils.py @@ -18,13 +18,15 @@ import pathlib import warnings from functools import partial, reduce -from typing import List, Optional, Union +from typing import Dict, List, Optional, Union import dask.bag as db import dask.dataframe as dd +import fsspec import numpy as np import pandas as pd from dask import delayed +from fsspec.core import get_filesystem_class, split_protocol from nemo_curator.utils.distributed_utils import ( read_data, @@ -36,13 +38,36 @@ ) -def mkdir(d): - pathlib.Path(d).mkdir(parents=True, exist_ok=True) +def get_fs( + path: str, storage_options: Optional[Dict[str, str]] = None +) -> fsspec.AbstractFileSystem: + if not storage_options: + storage_options = dict() + protocol, path = split_protocol(path) + fs = get_filesystem_class(protocol)(**storage_options) + return fs -def expand_outdir_and_mkdir(outdir): - outdir = os.path.abspath(os.path.expanduser(outdir)) - mkdir(outdir) +def expand_outdir_and_mkdir( + outdir: str, + storage_options: Optional[Dict[str, str]] = None, + fs: Optional[fsspec.AbstractFileSystem] = None, +): + if fs is None: + fs = get_fs(outdir, storage_options) + + if isinstance(fs, fsspec.implementations.local.LocalFileSystem): + outdir = os.path.abspath(os.path.expanduser(outdir)) + + try: + from s3fs import S3FileSystem + + if isinstance(fs, S3FileSystem): + fs.touch(os.path.join(outdir, ".empty")) + except (ImportError, ModuleNotFoundError): + pass + finally: + fs.makedirs(outdir, exist_ok=True) return outdir @@ -71,7 +96,7 @@ def filter_files_by_extension( filtered_files.append(file) if len(files_list) != len(filtered_files): - warnings.warn(f"Skipped at least one file due to unmatched file extension(s).") + warnings.warn("Skipped at least one file due to unmatched file extension(s).") return filtered_files @@ -81,6 +106,8 @@ def get_all_files_paths_under( recurse_subdirectories: bool = True, followlinks: bool = False, keep_extensions: Optional[Union[str, List[str]]] = None, + storage_options: Optional[Dict[str, str]] = None, + fs: Optional[fsspec.AbstractFileSystem] = None, ) -> List[str]: """ This function returns a list of all the files under a specified directory. @@ -94,17 +121,17 @@ def get_all_files_paths_under( or multiple file types to include in the output, e.g., "jsonl" or ["jsonl", "parquet"]. """ - if recurse_subdirectories: - file_ls = [ - os.path.join(r, f) - for r, subdirs, files in os.walk(root, followlinks=followlinks) - for f in files - ] - else: - # Only include files, not directories - file_ls = [entry.path for entry in os.scandir(root) if entry.is_file()] + if followlinks: + warnings.warn("followlinks is an untested codepath for now") + if not fs: + fs = get_fs(root, storage_options) + file_ls = fs.find(root, maxdepth=None if recurse_subdirectories else 1) file_ls.sort() + # Add protocol back if it exists in original path + if "://" in root: + protocol = root.split("://")[0] + file_ls = [f"{protocol}://{f}" for f in file_ls] if keep_extensions is not None: file_ls = filter_files_by_extension(file_ls, keep_extensions) diff --git a/nemo_curator/utils/semdedup_utils.py b/nemo_curator/utils/semdedup_utils.py index f8c88cd99..76ce70efe 100644 --- a/nemo_curator/utils/semdedup_utils.py +++ b/nemo_curator/utils/semdedup_utils.py @@ -15,7 +15,7 @@ import logging import os -from typing import Literal, Tuple +from typing import Dict, Literal, Optional, Tuple import cudf import cupy as cp @@ -128,6 +128,7 @@ def get_semantic_matches_per_cluster( which_to_keep: Literal["hard", "easy", "random"], sim_metric: Literal["cosine", "l2"], batched_cosine_similarity: int = 1024, + storage_options: Optional[Dict[str, str]] = None, ) -> None: """ Get the semantic matches for a single cluster. @@ -144,6 +145,7 @@ def get_semantic_matches_per_cluster( cluster_df = cudf.read_parquet( os.path.join(emb_by_clust_dir, f"nearest_cent={cluster_id}"), columns=[embedding_col, id_col, distance_col], + storage_options=storage_options, ) output_df_file_path = os.path.join(output_dir, f"cluster_{cluster_id}.parquet") if len(cluster_df) == 1: @@ -151,7 +153,7 @@ def get_semantic_matches_per_cluster( cluster_df["max_id"] = cluster_df[id_col] cluster_df["cosine_sim_score"] = [0] cluster_df = cluster_df[["id", "max_id", "cosine_sim_score"]] - cluster_df.to_parquet(output_df_file_path) + cluster_df.to_parquet(output_df_file_path, storage_options=storage_options) return if which_to_keep == "hard": @@ -187,7 +189,7 @@ def get_semantic_matches_per_cluster( "cosine_sim_score": max_similarity, } ) - points_to_remove_df.to_parquet(output_df_file_path) + points_to_remove_df.to_parquet(output_df_file_path, storage_options=storage_options) def prune_single_cluster( @@ -196,6 +198,7 @@ def prune_single_cluster( emb_by_clust_dir: str, semdedup_pruning_tables_dir: str, eps: float, + storage_options: Optional[Dict[str, str]] = None, ) -> cudf.DataFrame: """ Processes data for a single cluster, applying pruning based on specified epsilon. @@ -213,7 +216,9 @@ def prune_single_cluster( cluster_dir = os.path.join(emb_by_clust_dir, f"nearest_cent={cluster_id}") # For the output we only return id, cosine_dist_to_cent, and cluster df_cluster = cudf.read_parquet( - cluster_dir, columns=[id_col, COSINE_DIST_TO_CENT_COL] + cluster_dir, + columns=[id_col, COSINE_DIST_TO_CENT_COL], + storage_options=storage_options, ).assign(cluster=cluster_id) pruning_table_fname = os.path.join( @@ -221,7 +226,9 @@ def prune_single_cluster( ) # In future we can add more columns to the pruning table like max_id etc. pruning_table = cudf.read_parquet( - pruning_table_fname, columns=["id", "cosine_sim_score"] + pruning_table_fname, + columns=["id", "cosine_sim_score"], + storage_options=storage_options, ) # If the pruning table only has one row, we don't need to remove any records if len(pruning_table) == 1: @@ -243,12 +250,15 @@ def write_pruned_summary_file( filtered_unique_ids_path: str, output_summary_file: str, logger: logging.Logger, + storage_options: Optional[Dict[str, str]] = None, ): """ Writes a summary file for the pruned data. """ - removed = len(dd.read_parquet(filtered_unique_ids_path)) - total = len(dd.read_parquet(emb_by_clust_dir)) + removed = len( + dd.read_parquet(filtered_unique_ids_path, storage_options=storage_options) + ) + total = len(dd.read_parquet(emb_by_clust_dir, storage_options=storage_options)) kept = total - removed logger.info( @@ -261,4 +271,4 @@ def write_pruned_summary_file( "total": [total], } df = pd.DataFrame(result_dict) - df.to_csv(output_summary_file, index=False) + df.to_csv(output_summary_file, index=False, storage_options=storage_options) From ac99f9f814a47e7c46b23635e7ab00c6fa093724 Mon Sep 17 00:00:00 2001 From: Praateek Date: Wed, 2 Apr 2025 17:42:00 -0700 Subject: [PATCH 2/9] bug fixes Signed-off-by: Praateek --- nemo_curator/modules/semantic_dedup/clusteringmodel.py | 6 ++++-- nemo_curator/utils/distributed_utils.py | 8 +++++++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/nemo_curator/modules/semantic_dedup/clusteringmodel.py b/nemo_curator/modules/semantic_dedup/clusteringmodel.py index 958b5cf62..865cd3b1b 100644 --- a/nemo_curator/modules/semantic_dedup/clusteringmodel.py +++ b/nemo_curator/modules/semantic_dedup/clusteringmodel.py @@ -207,7 +207,9 @@ def __call__(self, embeddings_dataset: DocumentDataset): ) self.fs.rm(embeddings_by_nearest_center_dir, recursive=True) - # TODO see if we have to create the directory + expand_outdir_and_mkdir( + embeddings_by_nearest_center_dir, self.storage_options, self.fs + ) embeddings_df.map_partitions( lambda df: df.to_parquet( embeddings_by_nearest_center_dir, @@ -228,7 +230,7 @@ def __call__(self, embeddings_dataset: DocumentDataset): # We read this way to ensure each cluster is read in a single partition # This allows us to perform pairwise similarity within the cluster fps = [ - os.path.join(self.clustering_output_dir, f"nearest_cent={i}") + os.path.join(embeddings_by_nearest_center_dir, f"nearest_cent={i}") for i in range(self.n_clusters) ] embeddings_df = dd.from_map( diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 52039dda0..2aa6cc5ab 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -20,7 +20,6 @@ import dask from nemo_curator._compat import DASK_CUDF_PARQUET_READ_INCONSISTENT_SCHEMA -from nemo_curator.utils.file_utils import get_fs os.environ["RAPIDS_NO_INITIALIZE"] = "1" import random @@ -442,6 +441,7 @@ def read_data_blocksize( # only returns those columns, we explicitly set `columns` here columns = list(read_kwargs["dtype"].keys()) if add_filename: + from nemo_curator.utils.file_utils import get_fs fs = get_fs(input_files[0], storage_options) def extract_filename(path: str) -> str: @@ -865,6 +865,7 @@ def _single_partition_write_to_simple_bitext( # Skip file creation for empty partitions if empty_partition: return success_ser + from nemo_curator.utils.file_utils import get_fs fs = get_fs(output_file_path, storage_options) @@ -902,6 +903,8 @@ def _merge_tmp_simple_bitext_partitions( with suffixes that looks like "file.1", "file.2" that shows the merging order output_file_path (str): dir to write output files """ + from nemo_curator.utils.file_utils import get_fs + fs = get_fs(tmp_output_dir, storage_options) sorted_tmp_files = sorted( fs.listdir(tmp_output_dir), key=lambda x: int(x.split(".")[-1]) @@ -992,6 +995,8 @@ def write_to_disk( output_meta = pd.Series([True], dtype="bool") # output_path is a directory + from nemo_curator.utils.file_utils import get_fs + if write_to_filename and output_type != "bitext": fs = get_fs(output_path, storage_options) fs.makedirs(output_path, exist_ok=True) @@ -1056,6 +1061,7 @@ def _write_to_jsonl_or_parquet( partition_on: Optional[str] = None, storage_options: Optional[Dict[str, str]] = None, ): + from nemo_curator.utils.file_utils import get_fs fs = get_fs(output_path, storage_options) if output_type == "jsonl": if partition_on is not None: From 9b86cb22cbace63c7b5e3d82fc0766fae2fa321f Mon Sep 17 00:00:00 2001 From: Praateek Date: Wed, 2 Apr 2025 17:43:40 -0700 Subject: [PATCH 3/9] pc Signed-off-by: Praateek --- nemo_curator/utils/distributed_utils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 2aa6cc5ab..ad36192b3 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -442,6 +442,7 @@ def read_data_blocksize( columns = list(read_kwargs["dtype"].keys()) if add_filename: from nemo_curator.utils.file_utils import get_fs + fs = get_fs(input_files[0], storage_options) def extract_filename(path: str) -> str: @@ -1062,6 +1063,7 @@ def _write_to_jsonl_or_parquet( storage_options: Optional[Dict[str, str]] = None, ): from nemo_curator.utils.file_utils import get_fs + fs = get_fs(output_path, storage_options) if output_type == "jsonl": if partition_on is not None: From 485dc37d8e367a13083bbd6ee59c401eecc32c75 Mon Sep 17 00:00:00 2001 From: Praateek Date: Wed, 2 Apr 2025 17:47:20 -0700 Subject: [PATCH 4/9] remove os.path.exists Signed-off-by: Praateek --- nemo_curator/modules/semantic_dedup/clusteringmodel.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/nemo_curator/modules/semantic_dedup/clusteringmodel.py b/nemo_curator/modules/semantic_dedup/clusteringmodel.py index 865cd3b1b..d3607b281 100644 --- a/nemo_curator/modules/semantic_dedup/clusteringmodel.py +++ b/nemo_curator/modules/semantic_dedup/clusteringmodel.py @@ -201,15 +201,12 @@ def __call__(self, embeddings_dataset: DocumentDataset): embeddings_by_nearest_center_dir = os.path.join( self.clustering_output_dir, "embs_by_nearest_center" ) - if os.path.exists(embeddings_by_nearest_center_dir): + if self.fs.exists(embeddings_by_nearest_center_dir): self.logger.warning( f"Output directory {embeddings_by_nearest_center_dir} already exists and will be overwritten" ) self.fs.rm(embeddings_by_nearest_center_dir, recursive=True) - expand_outdir_and_mkdir( - embeddings_by_nearest_center_dir, self.storage_options, self.fs - ) embeddings_df.map_partitions( lambda df: df.to_parquet( embeddings_by_nearest_center_dir, From d5f800512cf0e1b1ce4302c50b52a3a53464d16a Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 3 Apr 2025 10:46:56 -0700 Subject: [PATCH 5/9] add s3/azure Signed-off-by: Praateek --- pyproject.toml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 419f468f0..74f3b307a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,13 @@ cuda12x_nightly = [ "dask-cudf-cu12>=25.04.0a0,<=25.04", "spacy[cuda12x]>=3.6.0, <3.8.0", ] +s3 = [ + "s3fs", + "aiobotocore>=2.4.2", +] +azure = [ + "adlfs", +] # Installs CPU + GPU text and image curation modules image = [ "nvidia-dali-cuda120", From f9e7e815ded83f85fac4e9f174b77fe5c0ff693c Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 3 Apr 2025 10:52:24 -0700 Subject: [PATCH 6/9] pc Signed-off-by: Praateek --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 74f3b307a..784ed5189 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,7 @@ cuda12x_nightly = [ ] s3 = [ "s3fs", - "aiobotocore>=2.4.2", + "aiobotocore>=2.4.2", ] azure = [ "adlfs", From e5298103191834b0e69add1934df9af7b261d90d Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 3 Apr 2025 16:22:33 -0700 Subject: [PATCH 7/9] use storage options Signed-off-by: Praateek --- nemo_curator/utils/semdedup_utils.py | 2 +- tests/utils/test_file_utils.py | 21 --------------------- 2 files changed, 1 insertion(+), 22 deletions(-) diff --git a/nemo_curator/utils/semdedup_utils.py b/nemo_curator/utils/semdedup_utils.py index 76ce70efe..82e9738e2 100644 --- a/nemo_curator/utils/semdedup_utils.py +++ b/nemo_curator/utils/semdedup_utils.py @@ -270,5 +270,5 @@ def write_pruned_summary_file( "removed": [removed], "total": [total], } - df = pd.DataFrame(result_dict) + df = cudf.DataFrame(result_dict) df.to_csv(output_summary_file, index=False, storage_options=storage_options) diff --git a/tests/utils/test_file_utils.py b/tests/utils/test_file_utils.py index a3fd75ee2..8c4aa4962 100644 --- a/tests/utils/test_file_utils.py +++ b/tests/utils/test_file_utils.py @@ -12,24 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import os -import pathlib import shutil import tempfile import warnings -from functools import reduce from unittest.mock import MagicMock, mock_open, patch -import dask.bag as db import dask.dataframe as dd -import numpy as np import pandas as pd import pytest -from dask import delayed from nemo_curator.utils.file_utils import ( - NEMO_CURATOR_HOME, _save_jsonl, _update_filetype, expand_outdir_and_mkdir, @@ -38,7 +31,6 @@ get_batched_files, get_remaining_files, merge_counts, - mkdir, parse_str_of_num_bytes, remove_path_extension, reshard_jsonl, @@ -59,19 +51,6 @@ def temp_dir(): class TestBasicFileUtils: """Tests for basic file utility functions.""" - def test_mkdir(self, temp_dir): - """Test that mkdir creates directories.""" - test_dir = os.path.join(temp_dir, "test_directory") - mkdir(test_dir) - assert os.path.exists(test_dir) - assert os.path.isdir(test_dir) - - # Test with nested directory - nested_dir = os.path.join(test_dir, "nested", "directory") - mkdir(nested_dir) - assert os.path.exists(nested_dir) - assert os.path.isdir(nested_dir) - def test_expand_outdir_and_mkdir(self, temp_dir): """Test that expand_outdir_and_mkdir expands paths and creates directories.""" # Test with a normal path From 060d702d0e5289abb1e94a92f9b8296b36c04fb7 Mon Sep 17 00:00:00 2001 From: Praateek Date: Thu, 3 Apr 2025 17:30:29 -0700 Subject: [PATCH 8/9] missing storage Signed-off-by: Praateek --- .../modules/semantic_dedup/semanticclusterleveldedup.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py b/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py index aa1f49073..21ee35368 100644 --- a/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py +++ b/nemo_curator/modules/semantic_dedup/semanticclusterleveldedup.py @@ -182,7 +182,12 @@ def extract_dedup_data(self, eps_to_extract: float) -> DocumentDataset: eps=eps_to_extract, storage_options=self.storage_options, ) - results_df.to_parquet(output_parquet_path, index=False, ignore_index=True) + results_df.to_parquet( + output_parquet_path, + index=False, + ignore_index=True, + storage_options=self.storage_options, + ) self.logger.info( f"Time taken for Extracting Pruned Data : {time.time() - t0} and output written at {output_parquet_path}" ) From afe8f1a0bf599f21d535e76986504abae1c0f684 Mon Sep 17 00:00:00 2001 From: Praateek Date: Wed, 23 Apr 2025 09:24:16 -0700 Subject: [PATCH 9/9] try removing aiobotocore Signed-off-by: Praateek --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 784ed5189..ba1141368 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,6 @@ cuda12x_nightly = [ ] s3 = [ "s3fs", - "aiobotocore>=2.4.2", ] azure = [ "adlfs",