diff --git a/deployment/docker-build/dev/docker-compose.yml b/deployment/docker-build/dev/docker-compose.yml index ecce78286..40aa95d22 100644 --- a/deployment/docker-build/dev/docker-compose.yml +++ b/deployment/docker-build/dev/docker-compose.yml @@ -115,7 +115,7 @@ services: ipfs: restart: always - image: ipfs/kubo:v0.15.0 + image: ipfs/kubo:v0.34.1 ports: - "4001:4001" - "4001:4001/udp" @@ -126,7 +126,7 @@ services: - IPFS_PROFILE=server networks: - pyaleph - command: ["daemon", "--enable-pubsub-experiment", "--migrate"] + command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"] networks: diff --git a/deployment/docker-build/docker-compose.yml b/deployment/docker-build/docker-compose.yml index 486457c06..b2e8a0fb8 100644 --- a/deployment/docker-build/docker-compose.yml +++ b/deployment/docker-build/docker-compose.yml @@ -47,7 +47,7 @@ services: ipfs: restart: always - image: ipfs/kubo:v0.15.0 + image: ipfs/kubo:v0.34.1 ports: - "4001:4001" - "4001:4001/udp" @@ -59,7 +59,7 @@ services: - IPFS_PROFILE=server networks: - pyaleph - command: ["daemon", "--enable-pubsub-experiment", "--migrate"] + command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"] postgres: restart: always diff --git a/deployment/samples/docker-compose/docker-compose.yml b/deployment/samples/docker-compose/docker-compose.yml index b61406398..11f13cc05 100644 --- a/deployment/samples/docker-compose/docker-compose.yml +++ b/deployment/samples/docker-compose/docker-compose.yml @@ -105,7 +105,7 @@ services: ipfs: restart: always - image: ipfs/kubo:v0.15.0 + image: ipfs/kubo:v0.34.1 ports: - "4001:4001" - "4001:4001/udp" diff --git a/deployment/samples/docker-monitoring/docker-compose.yml b/deployment/samples/docker-monitoring/docker-compose.yml index 7c08e31e7..bf53bee9f 100644 --- a/deployment/samples/docker-monitoring/docker-compose.yml +++ b/deployment/samples/docker-monitoring/docker-compose.yml @@ -107,7 +107,7 @@ services: ipfs: restart: always - image: ipfs/kubo:v0.15.0 + image: ipfs/kubo:v0.34.1 ports: - "4001:4001" - "4001:4001/udp" @@ -118,7 +118,7 @@ services: - IPFS_PROFILE=server networks: - pyaleph - command: ["daemon", "--enable-pubsub-experiment", "--migrate"] + command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"] prometheus: restart: always diff --git a/src/aleph/db/accessors/files.py b/src/aleph/db/accessors/files.py index da7f51156..a073098e2 100644 --- a/src/aleph/db/accessors/files.py +++ b/src/aleph/db/accessors/files.py @@ -29,8 +29,11 @@ def get_unpinned_files(session: DbSession) -> Iterable[StoredFileDb]: """ Returns the list of files that are not pinned by a message or an on-chain transaction. """ - select_pins = select(FilePinDb).where(FilePinDb.file_hash == StoredFileDb.hash) - select_stmt = select(StoredFileDb).where(~select_pins.exists()) + select_stmt = ( + select(StoredFileDb) + .join(FilePinDb, StoredFileDb.hash == FilePinDb.file_hash, isouter=True) + .where(FilePinDb.id.is_(None)) + ) return session.execute(select_stmt).scalars() diff --git a/src/aleph/db/accessors/messages.py b/src/aleph/db/accessors/messages.py index 2003dd96d..6ab4a6175 100644 --- a/src/aleph/db/accessors/messages.py +++ b/src/aleph/db/accessors/messages.py @@ -437,11 +437,38 @@ def make_upsert_rejected_message_statement( exc_traceback: Optional[str] = None, tx_hash: Optional[str] = None, ) -> Insert: + # Convert details to a dictionary that is JSON serializable + serializable_details = None + if details is not None: + try: + # First try a simple dict conversion + serializable_details = dict(details) + + # Now recursively ensure all values within the dictionary are JSON serializable + def ensure_serializable(obj): + if isinstance(obj, dict): + return {k: ensure_serializable(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [ensure_serializable(item) for item in obj] + elif isinstance(obj, (str, int, float, bool, type(None))): + return obj + else: + # For non-serializable types, convert to string + return str(obj) + + serializable_details = ensure_serializable(serializable_details) + + except Exception: + # If any conversion fails, create a new dict with a message + serializable_details = { + "error": "Details contained non-serializable values" + } + insert_rejected_message_stmt = insert(RejectedMessageDb).values( item_hash=item_hash, message=pending_message_dict, error_code=error_code, - details=details, + details=serializable_details, traceback=exc_traceback, tx_hash=tx_hash, ) @@ -449,7 +476,7 @@ def make_upsert_rejected_message_statement( constraint="rejected_messages_pkey", set_={ "error_code": insert_rejected_message_stmt.excluded.error_code, - "details": details, + "details": serializable_details, "traceback": insert_rejected_message_stmt.excluded.traceback, "tx_hash": tx_hash, }, diff --git a/src/aleph/services/storage/garbage_collector.py b/src/aleph/services/storage/garbage_collector.py index eae578e0c..51e622155 100644 --- a/src/aleph/services/storage/garbage_collector.py +++ b/src/aleph/services/storage/garbage_collector.py @@ -3,7 +3,6 @@ import logging from aioipfs import NotPinnedError -from aioipfs.api import RepoAPI from aleph_message.models import ItemHash, ItemType from configmanager import Config @@ -29,15 +28,13 @@ async def _delete_from_ipfs(self, file_hash: ItemHash): result = await ipfs_client.pin.rm(file_hash) print(result) - # Launch the IPFS garbage collector (`ipfs repo gc`) - async for _ in RepoAPI(driver=ipfs_client).gc(): - pass - except NotPinnedError: LOGGER.warning("File not pinned: %s", file_hash) + except Exception as err: + LOGGER.error("Failed to unpin file %s: %s", file_hash, str(err)) # Smaller IPFS files are cached in local storage - LOGGER.debug("Deleting %s from local storage") + LOGGER.debug("Deleting %s from local storage", file_hash) await self._delete_from_local_storage(file_hash) LOGGER.debug("Removed from IPFS: %s", file_hash) @@ -55,7 +52,7 @@ async def collect(self, datetime: dt.datetime): # Delete files without pins files_to_delete = list(get_unpinned_files(session)) - LOGGER.info("Found %d files to delete") + LOGGER.info("Found %d files to delete", len(files_to_delete)) for file_to_delete in files_to_delete: file_hash = ItemHash(file_to_delete.hash) @@ -91,5 +88,7 @@ async def garbage_collector_task( LOGGER.info("Starting garbage collection...") await garbage_collector.collect(datetime=utc_now()) LOGGER.info("Garbage collector ran successfully.") - except Exception: - LOGGER.exception("An unexpected error occurred during garbage collection.") + except Exception as err: + LOGGER.exception( + "An unexpected error occurred during garbage collection.", exc_info=err + ) diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index 2e1403638..406696e79 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -74,8 +74,32 @@ def __str__(self): return self.__class__.__name__ def details(self) -> Optional[Dict[str, Any]]: + """ + Return error details in a JSON serializable format. + + Returns: + Dictionary with error details or None if no errors. + """ errors = self.args[0] - return {"errors": errors} if errors else None + + # Ensure errors are JSON serializable + if errors: + # Convert non-serializable objects to strings if needed + serializable_errors = [] + for err in errors: + try: + # Test if the error is JSON serializable by attempting to convert to dict + # This will fail for custom objects + if hasattr(err, "__dict__"): + serializable_errors.append(str(err)) + else: + serializable_errors.append(err) + except (TypeError, ValueError): + # If conversion fails, use string representation + serializable_errors.append(str(err)) + + return {"errors": serializable_errors} + return None class InvalidMessageException(MessageProcessingException): @@ -272,14 +296,21 @@ def __init__( self.parent_size = parent_size def details(self) -> Optional[Dict[str, Any]]: + """ + Return error details in a JSON serializable format. + + Returns: + Dictionary with error details. + """ + # Ensure all values are JSON serializable return { "errors": [ { - "volume_name": self.volume_name, - "parent_ref": self.parent_ref, - "parent_file": self.parent_file, - "parent_size": self.parent_size, - "volume_size": self.volume_size, + "volume_name": str(self.volume_name), + "parent_ref": str(self.parent_ref), + "parent_file": str(self.parent_file), + "parent_size": int(self.parent_size), + "volume_size": int(self.volume_size), } ] } @@ -299,11 +330,17 @@ def __init__( self.aggregate_key = aggregate_key def details(self) -> Optional[Dict[str, Any]]: + """ + Return error details in a JSON serializable format. + + Returns: + Dictionary with error details. + """ errors = [] if self.target_hash is not None: - errors.append({"message": self.target_hash}) + errors.append({"message": str(self.target_hash)}) if self.aggregate_key is not None: - errors.append({"aggregate": self.aggregate_key}) + errors.append({"aggregate": str(self.aggregate_key)}) return {"errors": errors} @@ -319,7 +356,13 @@ def __init__(self, target_hash: str): self.target_hash = target_hash def details(self) -> Optional[Dict[str, Any]]: - return {"errors": [{"message": self.target_hash}]} + """ + Return error details in a JSON serializable format. + + Returns: + Dictionary with error details. + """ + return {"errors": [{"message": str(self.target_hash)}]} class InsufficientBalanceException(InvalidMessageException): @@ -338,7 +381,13 @@ def __init__( self.required_balance = required_balance def details(self) -> Optional[Dict[str, Any]]: - # Note: cast to string to keep the precision + """ + Return error details in a JSON serializable format. + + Returns: + Dictionary with error details. + """ + # Note: cast to string to keep the precision and ensure it's JSON serializable return { "errors": [ {