Skip to content

Commit e43aa79

Browse files
Andres D. Molinsnesitor
authored andcommitted
Fix: Try to run the garbage collector in another thread.
1 parent c61b209 commit e43aa79

File tree

1 file changed

+34
-4
lines changed

1 file changed

+34
-4
lines changed

src/aleph/commands.py

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,23 @@ def run_db_migrations(config: Config):
6161
alembic.command.upgrade(alembic_cfg, "head", tag=db_url)
6262

6363

64+
def run_coroutine_in_new_thread_loop(coro: Coroutine):
65+
"""
66+
Executes a given coroutine in a new event loop in the current thread.
67+
This thread is typically one managed by an executor.
68+
"""
69+
new_loop = asyncio.new_event_loop()
70+
asyncio.set_event_loop(new_loop)
71+
try:
72+
LOGGER.debug(f"Running coroutine {getattr(coro, '__name__', str(coro))} in thread event loop: {new_loop}")
73+
result = new_loop.run_until_complete(coro)
74+
LOGGER.debug(f"Coroutine {getattr(coro, '__name__', str(coro))} completed in thread event loop: {new_loop}")
75+
return result
76+
finally:
77+
LOGGER.debug(f"Closing thread event loop: {new_loop}")
78+
new_loop.close()
79+
80+
6481
async def init_node_cache(config: Config) -> NodeCache:
6582
node_cache = NodeCache(
6683
redis_host=config.redis.host.value, redis_port=config.redis.port.value
@@ -197,11 +214,24 @@ async def main(args: List[str]) -> None:
197214
tasks.append(refresh_cache_materialized_views(session_factory))
198215
LOGGER.debug("Initialized cache tasks")
199216

200-
LOGGER.debug("Initializing garbage collector task")
201-
tasks.append(
202-
garbage_collector_task(config=config, garbage_collector=garbage_collector)
217+
LOGGER.debug("Preparing garbage collector task to run in a separate thread")
218+
# 1. Create the coroutine object as you normally would
219+
gc_coroutine_object = garbage_collector_task(
220+
config=config, garbage_collector=garbage_collector
221+
)
222+
223+
# 2. Get the current event loop to use run_in_executor
224+
current_loop = asyncio.get_running_loop()
225+
226+
# 3. Schedule the execution of `run_coroutine_in_new_thread_loop` with your coroutine in the executor
227+
# This will return a Future that `asyncio.gather` can await.
228+
gc_future = current_loop.run_in_executor(
229+
None, # Use the default ThreadPoolExecutor
230+
run_coroutine_in_new_thread_loop,
231+
gc_coroutine_object # The coroutine to execute
203232
)
204-
LOGGER.debug("Initialized garbage collector task")
233+
tasks.append(gc_future) # Add the Future to the list of tasks
234+
LOGGER.debug("Garbage collector task submitted to run in its own thread event loop.")
205235

206236
LOGGER.debug("Running event loop")
207237
await asyncio.gather(*tasks)

0 commit comments

Comments
 (0)