Skip to content

Commit 11ffe93

Browse files
daiping8daiping8
authored andcommitted
[hotfix][core]fix Too many open files caused by RuntimeEnvAgent
Signed-off-by: daiping8 <[email protected]>
1 parent 4b6dba3 commit 11ffe93

File tree

1 file changed

+116
-10
lines changed

1 file changed

+116
-10
lines changed

python/ray/_private/runtime_env/agent/runtime_env_agent.py

Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import os
44
import time
55
import traceback
6-
from collections import defaultdict
6+
from collections import OrderedDict, defaultdict
77
from dataclasses import dataclass
88
from typing import Callable, Dict, List, Set, Tuple
99

@@ -162,6 +162,102 @@ def runtime_env_refs(self) -> Dict[str, int]:
162162
return self._runtime_env_reference
163163

164164

165+
class LRULoggerCache:
166+
def __init__(self, maxsize=128):
167+
self.maxsize = maxsize
168+
self.cache = OrderedDict()
169+
170+
def get(
171+
self, job_id: str, create_func: Callable[[], logging.Logger]
172+
) -> logging.Logger:
173+
"""Retrieve or create a logger and update the access order"""
174+
if job_id in self.cache:
175+
# Move to Recently Used
176+
self.cache.move_to_end(job_id)
177+
return self.cache[job_id]
178+
# Create a new recorder
179+
logger = create_func()
180+
self.cache[job_id] = logger
181+
# Check and eliminate the oldest entries
182+
if len(self.cache) > self.maxsize:
183+
self._evict_oldest()
184+
return logger
185+
186+
def _cleanup_logger(self, logger: logging.Logger) -> List[str]:
187+
"""
188+
Internal helper to close logger handlers and return associated file paths.
189+
Args:
190+
logger: The logger object to be cleaned up.
191+
Returns:
192+
A list of file paths for log files that can be deleted.
193+
"""
194+
pending_files = []
195+
for handler in logger.handlers[:]:
196+
if isinstance(handler, logging.FileHandler):
197+
try:
198+
filepath = handler.baseFilename
199+
handler.close()
200+
pending_files.append(filepath)
201+
except Exception as e:
202+
default_logger.error(f"Failed to close handler: {str(e)}")
203+
logger.removeHandler(handler)
204+
return pending_files
205+
206+
async def _async_delete_files(self, filepaths: List[str]):
207+
"""Asynchronous file deletion"""
208+
for filepath in filepaths:
209+
try:
210+
if os.path.exists(filepath):
211+
await asyncio.to_thread(os.unlink, filepath)
212+
except Exception as e:
213+
default_logger.warning(f"Async delete failed for {filepath}: {str(e)}")
214+
215+
def _sync_delete_files(self, filepaths: List[str]):
216+
"""Synchronize file deletion (for non asynchronous contexts)"""
217+
for filepath in filepaths:
218+
try:
219+
if os.path.exists(filepath):
220+
os.unlink(filepath)
221+
except Exception as e:
222+
default_logger.warning(f"Sync delete failed for {filepath}: {str(e)}")
223+
224+
def remove(self, job_id: str, async_cleanup: bool = True) -> bool:
225+
"""Remove the specified logger from the cache and clean up its resources.
226+
Args:
227+
job_id: The ID of the job whose logger should be removed.
228+
async_cleanup: Whether to clean up associated files asynchronously.
229+
Returns:
230+
True if the logger was found and removed, False otherwise.
231+
"""
232+
if job_id not in self.cache:
233+
return False
234+
235+
logger = self.cache.pop(job_id)
236+
pending_files = self._cleanup_logger(logger)
237+
del logger
238+
239+
if pending_files:
240+
if async_cleanup and asyncio.get_event_loop().is_running():
241+
asyncio.create_task(self._async_delete_files(pending_files))
242+
else:
243+
self._sync_delete_files(pending_files)
244+
245+
return True
246+
247+
def _evict_oldest(self):
248+
"""Eliminate the oldest unused recorder"""
249+
if not self.cache:
250+
return
251+
252+
_, oldest_logger = self.cache.popitem(last=False)
253+
pending_files = self._cleanup_logger(oldest_logger)
254+
del oldest_logger
255+
256+
if pending_files:
257+
# LRU elimination usually occurs during cache operations, and synchronous deletion is used by default to avoid delays
258+
self._sync_delete_files(pending_files)
259+
260+
165261
class RuntimeEnvAgent:
166262
"""An RPC server to create and delete runtime envs.
167263
@@ -194,7 +290,9 @@ def __init__(
194290
self._logger.info(f"Parent raylet pid is {os.environ.get('RAY_RAYLET_PID')}")
195291

196292
self._runtime_env_dir = runtime_env_dir
197-
self._per_job_logger_cache = dict()
293+
self._per_job_logger_cache = LRULoggerCache(
294+
maxsize=int(os.environ.get("RAY_RUNTIME_ENV_LOG_CACHE_SIZE", 128)),
295+
)
198296
# Cache the results of creating envs to avoid repeatedly calling into
199297
# conda and other slow calls.
200298
self._env_cache: Dict[str, CreatedEnvResult] = dict()
@@ -286,14 +384,22 @@ def delete_runtime_env():
286384

287385
def get_or_create_logger(self, job_id: bytes, log_files: List[str]):
288386
job_id = job_id.decode()
289-
if job_id not in self._per_job_logger_cache:
290-
params = self._logging_params.copy()
291-
params["filename"] = [f"runtime_env_setup-{job_id}.log", *log_files]
292-
params["logger_name"] = f"runtime_env_{job_id}"
293-
params["propagate"] = False
294-
per_job_logger = setup_component_logger(**params)
295-
self._per_job_logger_cache[job_id] = per_job_logger
296-
return self._per_job_logger_cache[job_id]
387+
# Using LRU cache to retrieve or create a logger
388+
return self._per_job_logger_cache.get(
389+
job_id, lambda: self._create_logger(job_id, log_files)
390+
)
391+
392+
def _create_logger(self, job_id: str, log_files: List[str]):
393+
params = self._logging_params.copy()
394+
params["filename"] = [f"runtime_env_setup-{job_id}.log", *log_files]
395+
params["logger_name"] = f"runtime_env_{job_id}"
396+
params["propagate"] = False
397+
return setup_component_logger(**params)
398+
399+
# Optional: Add explicit cleanup method (for possible future Job lifecycle management)
400+
async def handle_job_terminated(self, job_id: str):
401+
"""Explicitly clean up job log resources"""
402+
self._per_job_logger_cache.remove(job_id)
297403

298404
async def GetOrCreateRuntimeEnv(self, request):
299405
self._logger.debug(

0 commit comments

Comments
 (0)