From a81e16511229ffd2359f5c5e27b7b7ffc86df93c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 7 Aug 2025 16:32:37 -0700 Subject: [PATCH 1/4] Merged `_get_stats_actor` and `_get_or_create_stats_actor` Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/stats.py | 28 ++++++---------------------- 1 file changed, 6 insertions(+), 22 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 1b129f81c5c0..3e98cb5aa382 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -633,7 +633,7 @@ def __init__(self): self._update_thread: Optional[threading.Thread] = None self._update_thread_lock: threading.Lock = threading.Lock() - def _get_stats_actor(self, skip_cache: bool = False) -> Optional[ActorHandle]: + def _get_or_create_stats_actor(self, skip_cache: bool = False) -> Optional[ActorHandle]: if ray._private.worker._global_node is None: raise RuntimeError( "Global node is not initialized. Driver might be not connected to Ray." @@ -650,27 +650,11 @@ def _get_stats_actor(self, skip_cache: bool = False) -> Optional[ActorHandle]: self._stats_actor_handle = ray.get_actor( name=STATS_ACTOR_NAME, namespace=STATS_ACTOR_NAMESPACE ) + self._stats_actor_cluster_id = current_cluster_id except ValueError: - return None - self._stats_actor_cluster_id = current_cluster_id - - return self._stats_actor_handle - - def _get_or_create_stats_actor(self) -> Optional[ActorHandle]: - if ray._private.worker._global_node is None: - raise RuntimeError( - "Global node is not initialized. Driver might be not connected to Ray." - ) - - # NOTE: In some cases (for ex, when registering dataset) actor might be gone - # (for ex, when prior driver disconnects) and therefore to avoid using - # stale handle we force looking up the actor with Ray to determine if - # we should create a new one. - actor = self._get_stats_actor(skip_cache=True) - - if actor is None: - self._stats_actor_handle = _get_or_create_stats_actor() - self._stats_actor_cluster_id = ray._private.worker._global_node.cluster_id + # Create an actor if it doesn't exist + self._stats_actor_handle = _get_or_create_stats_actor() + self._stats_actor_cluster_id = ray._private.worker._global_node.cluster_id return self._stats_actor_handle @@ -688,7 +672,7 @@ def _run_update_loop(): # this thread can be running even after the cluster is # shutdown. Creating an actor will automatically start # a new cluster. - stats_actor = self._get_stats_actor() + stats_actor = self._get_or_create_stats_actor() if stats_actor is None: continue stats_actor.update_metrics.remote( From b1aa4cd974ee3b5b5cc4fc10b728b399365c70bf Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 7 Aug 2025 16:33:13 -0700 Subject: [PATCH 2/4] Fixed dataset id fetching and dataset registration to skip potentially stale cache Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/stats.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 3e98cb5aa382..12acee7462d1 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -790,7 +790,14 @@ def register_dataset_to_stats_actor( topology: Optional Topology representing the DAG structure to export data_context: The DataContext attached to the dataset """ - self._get_or_create_stats_actor().register_dataset.remote( + + # NOTE: In some cases (for ex, when registering dataset) actor might be gone + # (for ex, when prior driver disconnects) and therefore to avoid using + # stale handle we force looking up the actor with Ray to determine if + # we should create a new one. + stats_actor = self._get_or_create_stats_actor(skip_cache=True) + + stats_actor.register_dataset.remote( ray.get_runtime_context().get_job_id(), dataset_tag, operator_tags, @@ -800,7 +807,13 @@ def register_dataset_to_stats_actor( def get_dataset_id_from_stats_actor(self) -> str: try: - return ray.get(self._get_or_create_stats_actor().get_dataset_id.remote()) + # NOTE: In some cases (for ex, when registering dataset) actor might be gone + # (for ex, when prior driver disconnects) and therefore to avoid using + # stale handle we force looking up the actor with Ray to determine if + # we should create a new one. + stats_actor = self._get_or_create_stats_actor(skip_cache=True) + + return ray.get(stats_actor.get_dataset_id.remote()) except Exception: # Getting dataset id from _StatsActor may fail, in this case # fall back to uuid4 From d73721ccd00e649620286e8edf8518953676bb6f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 7 Aug 2025 16:36:39 -0700 Subject: [PATCH 3/4] `lint` Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/stats.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 12acee7462d1..097896f0a101 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -633,7 +633,9 @@ def __init__(self): self._update_thread: Optional[threading.Thread] = None self._update_thread_lock: threading.Lock = threading.Lock() - def _get_or_create_stats_actor(self, skip_cache: bool = False) -> Optional[ActorHandle]: + def _get_or_create_stats_actor( + self, skip_cache: bool = False + ) -> Optional[ActorHandle]: if ray._private.worker._global_node is None: raise RuntimeError( "Global node is not initialized. Driver might be not connected to Ray." @@ -654,7 +656,9 @@ def _get_or_create_stats_actor(self, skip_cache: bool = False) -> Optional[Actor except ValueError: # Create an actor if it doesn't exist self._stats_actor_handle = _get_or_create_stats_actor() - self._stats_actor_cluster_id = ray._private.worker._global_node.cluster_id + self._stats_actor_cluster_id = ( + ray._private.worker._global_node.cluster_id + ) return self._stats_actor_handle From 9d4a59eac1f7c60d261597878d67e4cb1659683c Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 11 Aug 2025 14:02:04 -0700 Subject: [PATCH 4/4] Deleting outdated comment Signed-off-by: Alexey Kudinkin --- python/ray/data/_internal/stats.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 097896f0a101..869aa49b84ca 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -672,10 +672,6 @@ def _run_update_loop(): while True: if self._last_iteration_stats or self._last_execution_stats: try: - # Do not create _StatsActor if it doesn't exist because - # this thread can be running even after the cluster is - # shutdown. Creating an actor will automatically start - # a new cluster. stats_actor = self._get_or_create_stats_actor() if stats_actor is None: continue