Skip to content

Commit c58bed2

Browse files
alexeykudinkindioptre
authored andcommitted
[Data] Cleaning up StatsManager (ray-project#55400)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Just cleaning up, merging 2 overlapping metrics ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <[email protected]> Signed-off-by: Andrew Grosser <[email protected]>
1 parent af94574 commit c58bed2

File tree

1 file changed

+25
-28
lines changed

1 file changed

+25
-28
lines changed

python/ray/data/_internal/stats.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,9 @@ def __init__(self):
633633
self._update_thread: Optional[threading.Thread] = None
634634
self._update_thread_lock: threading.Lock = threading.Lock()
635635

636-
def _get_stats_actor(self, skip_cache: bool = False) -> Optional[ActorHandle]:
636+
def _get_or_create_stats_actor(
637+
self, skip_cache: bool = False
638+
) -> Optional[ActorHandle]:
637639
if ray._private.worker._global_node is None:
638640
raise RuntimeError(
639641
"Global node is not initialized. Driver might be not connected to Ray."
@@ -650,27 +652,13 @@ def _get_stats_actor(self, skip_cache: bool = False) -> Optional[ActorHandle]:
650652
self._stats_actor_handle = ray.get_actor(
651653
name=STATS_ACTOR_NAME, namespace=STATS_ACTOR_NAMESPACE
652654
)
655+
self._stats_actor_cluster_id = current_cluster_id
653656
except ValueError:
654-
return None
655-
self._stats_actor_cluster_id = current_cluster_id
656-
657-
return self._stats_actor_handle
658-
659-
def _get_or_create_stats_actor(self) -> Optional[ActorHandle]:
660-
if ray._private.worker._global_node is None:
661-
raise RuntimeError(
662-
"Global node is not initialized. Driver might be not connected to Ray."
663-
)
664-
665-
# NOTE: In some cases (for ex, when registering dataset) actor might be gone
666-
# (for ex, when prior driver disconnects) and therefore to avoid using
667-
# stale handle we force looking up the actor with Ray to determine if
668-
# we should create a new one.
669-
actor = self._get_stats_actor(skip_cache=True)
670-
671-
if actor is None:
672-
self._stats_actor_handle = _get_or_create_stats_actor()
673-
self._stats_actor_cluster_id = ray._private.worker._global_node.cluster_id
657+
# Create an actor if it doesn't exist
658+
self._stats_actor_handle = _get_or_create_stats_actor()
659+
self._stats_actor_cluster_id = (
660+
ray._private.worker._global_node.cluster_id
661+
)
674662

675663
return self._stats_actor_handle
676664

@@ -684,11 +672,7 @@ def _run_update_loop():
684672
while True:
685673
if self._last_iteration_stats or self._last_execution_stats:
686674
try:
687-
# Do not create _StatsActor if it doesn't exist because
688-
# this thread can be running even after the cluster is
689-
# shutdown. Creating an actor will automatically start
690-
# a new cluster.
691-
stats_actor = self._get_stats_actor()
675+
stats_actor = self._get_or_create_stats_actor()
692676
if stats_actor is None:
693677
continue
694678
stats_actor.update_metrics.remote(
@@ -806,7 +790,14 @@ def register_dataset_to_stats_actor(
806790
topology: Optional Topology representing the DAG structure to export
807791
data_context: The DataContext attached to the dataset
808792
"""
809-
self._get_or_create_stats_actor().register_dataset.remote(
793+
794+
# NOTE: In some cases (for ex, when registering dataset) actor might be gone
795+
# (for ex, when prior driver disconnects) and therefore to avoid using
796+
# stale handle we force looking up the actor with Ray to determine if
797+
# we should create a new one.
798+
stats_actor = self._get_or_create_stats_actor(skip_cache=True)
799+
800+
stats_actor.register_dataset.remote(
810801
ray.get_runtime_context().get_job_id(),
811802
dataset_tag,
812803
operator_tags,
@@ -816,7 +807,13 @@ def register_dataset_to_stats_actor(
816807

817808
def get_dataset_id_from_stats_actor(self) -> str:
818809
try:
819-
return ray.get(self._get_or_create_stats_actor().get_dataset_id.remote())
810+
# NOTE: In some cases (for ex, when registering dataset) actor might be gone
811+
# (for ex, when prior driver disconnects) and therefore to avoid using
812+
# stale handle we force looking up the actor with Ray to determine if
813+
# we should create a new one.
814+
stats_actor = self._get_or_create_stats_actor(skip_cache=True)
815+
816+
return ray.get(stats_actor.get_dataset_id.remote())
820817
except Exception:
821818
# Getting dataset id from _StatsActor may fail, in this case
822819
# fall back to uuid4

0 commit comments

Comments
 (0)