Skip to content

Commit c6ecbfa

Browse files
author
Andres D. Molins
committed
Fix: Undo the thread split and put exception handler to see ipfs errors.
1 parent 66b27eb commit c6ecbfa

File tree

2 files changed

+6
-34
lines changed

2 files changed

+6
-34
lines changed

src/aleph/commands.py

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,6 @@ def run_db_migrations(config: Config):
6161
alembic.command.upgrade(alembic_cfg, "head", tag=db_url)
6262

6363

64-
def run_coroutine_in_new_thread_loop(coro: Coroutine):
65-
"""
66-
Executes a given coroutine in a new event loop in the current thread.
67-
This thread is typically one managed by an executor.
68-
"""
69-
new_loop = asyncio.new_event_loop()
70-
asyncio.set_event_loop(new_loop)
71-
try:
72-
LOGGER.debug(f"Running coroutine {getattr(coro, '__name__', str(coro))} in thread event loop: {new_loop}")
73-
result = new_loop.run_until_complete(coro)
74-
LOGGER.debug(f"Coroutine {getattr(coro, '__name__', str(coro))} completed in thread event loop: {new_loop}")
75-
return result
76-
finally:
77-
LOGGER.debug(f"Closing thread event loop: {new_loop}")
78-
new_loop.close()
79-
80-
8164
async def init_node_cache(config: Config) -> NodeCache:
8265
node_cache = NodeCache(
8366
redis_host=config.redis.host.value, redis_port=config.redis.port.value
@@ -214,24 +197,11 @@ async def main(args: List[str]) -> None:
214197
tasks.append(refresh_cache_materialized_views(session_factory))
215198
LOGGER.debug("Initialized cache tasks")
216199

217-
LOGGER.debug("Preparing garbage collector task to run in a separate thread")
218-
# 1. Create the coroutine object as you normally would
219-
gc_coroutine_object = garbage_collector_task(
220-
config=config, garbage_collector=garbage_collector
221-
)
222-
223-
# 2. Get the current event loop to use run_in_executor
224-
current_loop = asyncio.get_running_loop()
225-
226-
# 3. Schedule the execution of `run_coroutine_in_new_thread_loop` with your coroutine in the executor
227-
# This will return a Future that `asyncio.gather` can await.
228-
gc_future = current_loop.run_in_executor(
229-
None, # Use the default ThreadPoolExecutor
230-
run_coroutine_in_new_thread_loop,
231-
gc_coroutine_object # The coroutine to execute
200+
LOGGER.debug("Initializing garbage collector task")
201+
tasks.append(
202+
garbage_collector_task(config=config, garbage_collector=garbage_collector)
232203
)
233-
tasks.append(gc_future) # Add the Future to the list of tasks
234-
LOGGER.debug("Garbage collector task submitted to run in its own thread event loop.")
204+
LOGGER.debug("Initialized garbage collector task")
235205

236206
LOGGER.debug("Running event loop")
237207
await asyncio.gather(*tasks)

src/aleph/services/storage/garbage_collector.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ async def _delete_from_ipfs(self, file_hash: ItemHash):
3535

3636
except NotPinnedError:
3737
LOGGER.warning("File not pinned: %s", file_hash)
38+
except Exception as err:
39+
LOGGER.error("Failed to unpin file %s: %s", file_hash, str(err))
3840

3941
# Smaller IPFS files are cached in local storage
4042
LOGGER.debug("Deleting %s from local storage", file_hash)

0 commit comments

Comments
 (0)