Skip to content

[WIP] Remote I/O in SemDedup #621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import os
from functools import wraps
from typing import Any, Callable, List, Literal, Optional, Union
from typing import Any, Callable, Dict, List, Literal, Optional, Union

import dask.dataframe as dd

from nemo_curator.utils.distributed_utils import read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.file_utils import get_all_files_paths_under, get_fs


class DocumentDataset:
Expand Down Expand Up @@ -55,6 +55,7 @@ def read_json(
add_filename: Union[bool, str] = False,
input_meta: Optional[Union[str, dict]] = None,
columns: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Expand Down Expand Up @@ -82,6 +83,7 @@ def read_json(
blocksize=blocksize,
input_meta=input_meta,
columns=columns,
storage_options=storage_options,
**kwargs,
)
)
Expand All @@ -95,6 +97,7 @@ def read_parquet(
blocksize: Optional[str] = "1gb",
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Expand All @@ -120,6 +123,7 @@ def read_parquet(
files_per_partition=files_per_partition,
blocksize=blocksize,
columns=columns,
storage_options=storage_options,
**kwargs,
)
)
Expand All @@ -130,6 +134,7 @@ def read_pickle(
input_files: Union[str, List[str]],
backend: Literal["pandas", "cudf"] = "pandas",
columns: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Expand All @@ -151,6 +156,7 @@ def read_pickle(
file_type="pickle",
backend=backend,
columns=columns,
storage_options=storage_options,
**kwargs,
)
)
Expand All @@ -169,6 +175,7 @@ def read_custom(
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
input_meta: Union[str, dict] = None,
storage_options: Optional[Dict[str, str]] = None,
**kwargs,
) -> "DocumentDataset":
"""
Expand Down Expand Up @@ -207,6 +214,7 @@ def read_custom(
root=input_files,
recurse_subdirectories=False,
keep_extensions=[file_type],
storage_options=storage_options,
)
elif isinstance(input_files, list):
files = input_files
Expand All @@ -222,6 +230,7 @@ def read_custom(
columns=columns,
input_meta=input_meta,
read_func_single_partition=read_func_single_partition,
storage_options=storage_options,
**kwargs,
)
)
Expand All @@ -232,6 +241,7 @@ def to_json(
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
):
"""
Writes the dataset to the specified path in JSONL format.
Expand Down Expand Up @@ -262,6 +272,7 @@ def to_json(
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="jsonl",
storage_options=storage_options,
)

def to_parquet(
Expand All @@ -270,6 +281,7 @@ def to_parquet(
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
):
"""
Writes the dataset to the specified path in Parquet format.
Expand Down Expand Up @@ -300,6 +312,7 @@ def to_parquet(
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="parquet",
storage_options=storage_options,
)

def to_pickle(
Expand Down Expand Up @@ -337,7 +350,7 @@ def from_pandas(
)
)

def to_pandas(self):
def to_pandas(self) -> "pd.DataFrame":
"""
Creates a pandas dataframe from a DocumentDataset

Expand All @@ -356,6 +369,7 @@ def _read_json_or_parquet(
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
storage_options: Optional[Dict[str, str]] = None,
**kwargs,
):
"""
Expand All @@ -376,10 +390,10 @@ def _read_json_or_parquet(

"""
file_ext = "." + file_type

if isinstance(input_files, list):
fs = get_fs(input_files[0], storage_options)
# List of files
if all(os.path.isfile(f) for f in input_files):
if all(fs.isfile(f) for f in input_files):
raw_data = read_data(
input_files,
file_type=file_type,
Expand All @@ -389,6 +403,7 @@ def _read_json_or_parquet(
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
storage_options=storage_options,
**kwargs,
)

Expand All @@ -398,7 +413,7 @@ def _read_json_or_parquet(

for data_path in input_files:
files = get_all_files_paths_under(
root=data_path, recurse_subdirectories=False
root=data_path, recurse_subdirectories=False, fs=fs
)
df = read_data(
files,
Expand All @@ -409,6 +424,7 @@ def _read_json_or_parquet(
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
storage_options=storage_options,
**kwargs,
)
dfs.append(df)
Expand All @@ -423,7 +439,9 @@ def _read_json_or_parquet(
# Directory of jsonl or parquet files
else:
files = get_all_files_paths_under(
root=input_files, recurse_subdirectories=False
root=input_files,
recurse_subdirectories=False,
storage_options=storage_options,
)

raw_data = read_data(
Expand All @@ -435,6 +453,7 @@ def _read_json_or_parquet(
add_filename=add_filename,
input_meta=input_meta,
columns=columns,
storage_options=storage_options,
**kwargs,
)

Expand Down
53 changes: 33 additions & 20 deletions nemo_curator/modules/semantic_dedup/clusteringmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@

import logging
import os
import shutil
import time
from typing import Optional, Union
from typing import Dict, Optional, Union

import cudf
import cupy as cp
Expand All @@ -28,7 +27,7 @@
from nemo_curator.datasets import DocumentDataset
from nemo_curator.log import create_logger
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir
from nemo_curator.utils.file_utils import expand_outdir_and_mkdir, get_fs
from nemo_curator.utils.semdedup_utils import (
COSINE_DIST_TO_CENT_COL,
L2_DIST_TO_CENT_COL,
Expand All @@ -51,6 +50,7 @@ def __init__(
clustering_input_partition_size: Optional[str] = "2gb",
logger: Union[logging.Logger, str] = "./",
profile_dir: Optional[str] = None,
storage_options: Optional[Dict[str, str]] = None,
):
"""
Initializes the ClusteringModel with the provided settings for semantic clustering to help semantic deduplication.
Expand All @@ -73,7 +73,8 @@ def __init__(
Default is "./".
profile_dir (Optional[str]): If specified, directory to write Dask profile.
Default is None.

storage_options (Optional[Dict[str, str]]): Storage options for the file system.
Default is None.
"""
self.id_col = id_column
self.max_iter = max_iter
Expand All @@ -84,9 +85,13 @@ def __init__(
self.clustering_input_partition_size = clustering_input_partition_size
self.logger = self._setup_logger(logger)
self.profile_dir = profile_dir
self.storage_options = storage_options
self.fs = get_fs(self.clustering_output_dir, self.storage_options)

if not os.path.exists(self.clustering_output_dir):
expand_outdir_and_mkdir(self.clustering_output_dir)
if not self.fs.exists(self.clustering_output_dir):
expand_outdir_and_mkdir(
self.clustering_output_dir, self.storage_options, self.fs
)
else:
self.logger.warning(
f"Clustering output directory {self.clustering_output_dir} already exists and will be overwritten"
Expand Down Expand Up @@ -187,37 +192,45 @@ def __call__(self, embeddings_dataset: DocumentDataset):
kmeans_centroids_file = os.path.join(
self.clustering_output_dir, "kmeans_centroids.npy"
)
np.save(kmeans_centroids_file, centroids)
with self.fs.open(kmeans_centroids_file, "wb") as f:
np.save(f, centroids)
self.logger.info("Saving centroids complete")
del kmeans, cupy_normalized_darr, centroids

# Save embeddings by nearest center to a file
clustering_output_dir = os.path.join(
embeddings_by_nearest_center_dir = os.path.join(
self.clustering_output_dir, "embs_by_nearest_center"
)
if os.path.exists(clustering_output_dir):
if self.fs.exists(embeddings_by_nearest_center_dir):
self.logger.warning(
f"Output directory {clustering_output_dir} already exists and will be overwritten"
f"Output directory {embeddings_by_nearest_center_dir} already exists and will be overwritten"
)
shutil.rmtree(clustering_output_dir)
self.fs.rm(embeddings_by_nearest_center_dir, recursive=True)

embeddings_df.map_partitions(
lambda df: df.to_parquet(
embeddings_by_nearest_center_dir,
index=False,
partition_cols=["nearest_cent"],
storage_options=self.storage_options,
),
# we specify meta to allow skipping _meta_nonempty
meta=dict(),
).compute()

embeddings_df.to_parquet(
clustering_output_dir,
index=False,
partition_on="nearest_cent",
write_index=False,
)
self.logger.info(
f"Time taken for assigning distance to each embedding: {time.time() - t0}s"
f" and output written at {clustering_output_dir}"
f" and output written at {embeddings_by_nearest_center_dir}"
)

del embeddings_df
# We read this way to ensure each cluster is read in a single partition
# This allows us to perform pairwise similarity within the cluster
fps = [
os.path.join(clustering_output_dir, f"nearest_cent={i}")
os.path.join(embeddings_by_nearest_center_dir, f"nearest_cent={i}")
for i in range(self.n_clusters)
]
embeddings_df = dd.from_map(cudf.read_parquet, fps)
embeddings_df = dd.from_map(
cudf.read_parquet, fps, storage_options=self.storage_options
)
return DocumentDataset(embeddings_df)
Loading