Skip to content

Solve garbage collector issue #787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deployment/docker-build/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.15.0
image: ipfs/kubo:v0.34.1
ports:
- "4001:4001"
- "4001:4001/udp"
Expand All @@ -126,7 +126,7 @@ services:
- IPFS_PROFILE=server
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--migrate"]
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]


networks:
Expand Down
4 changes: 2 additions & 2 deletions deployment/docker-build/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.15.0
image: ipfs/kubo:v0.34.1
ports:
- "4001:4001"
- "4001:4001/udp"
Expand All @@ -59,7 +59,7 @@ services:
- IPFS_PROFILE=server
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--migrate"]
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]

postgres:
restart: always
Expand Down
2 changes: 1 addition & 1 deletion deployment/samples/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.15.0
image: ipfs/kubo:v0.34.1
ports:
- "4001:4001"
- "4001:4001/udp"
Expand Down
4 changes: 2 additions & 2 deletions deployment/samples/docker-monitoring/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ services:

ipfs:
restart: always
image: ipfs/kubo:v0.15.0
image: ipfs/kubo:v0.34.1
ports:
- "4001:4001"
- "4001:4001/udp"
Expand All @@ -118,7 +118,7 @@ services:
- IPFS_PROFILE=server
networks:
- pyaleph
command: ["daemon", "--enable-pubsub-experiment", "--migrate"]
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]

prometheus:
restart: always
Expand Down
7 changes: 5 additions & 2 deletions src/aleph/db/accessors/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ def get_unpinned_files(session: DbSession) -> Iterable[StoredFileDb]:
"""
Returns the list of files that are not pinned by a message or an on-chain transaction.
"""
select_pins = select(FilePinDb).where(FilePinDb.file_hash == StoredFileDb.hash)
select_stmt = select(StoredFileDb).where(~select_pins.exists())
select_stmt = (
select(StoredFileDb)
.join(FilePinDb, StoredFileDb.hash == FilePinDb.file_hash, isouter=True)
.where(FilePinDb.id.is_(None))
)
return session.execute(select_stmt).scalars()


Expand Down
31 changes: 29 additions & 2 deletions src/aleph/db/accessors/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,19 +437,46 @@ def make_upsert_rejected_message_statement(
exc_traceback: Optional[str] = None,
tx_hash: Optional[str] = None,
) -> Insert:
# Convert details to a dictionary that is JSON serializable
serializable_details = None
if details is not None:
try:
# First try a simple dict conversion
serializable_details = dict(details)

# Now recursively ensure all values within the dictionary are JSON serializable
def ensure_serializable(obj):
if isinstance(obj, dict):
return {k: ensure_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [ensure_serializable(item) for item in obj]
elif isinstance(obj, (str, int, float, bool, type(None))):
return obj
else:
# For non-serializable types, convert to string
return str(obj)

serializable_details = ensure_serializable(serializable_details)

except Exception:
# If any conversion fails, create a new dict with a message
serializable_details = {
"error": "Details contained non-serializable values"
}

insert_rejected_message_stmt = insert(RejectedMessageDb).values(
item_hash=item_hash,
message=pending_message_dict,
error_code=error_code,
details=details,
details=serializable_details,
traceback=exc_traceback,
tx_hash=tx_hash,
)
upsert_rejected_message_stmt = insert_rejected_message_stmt.on_conflict_do_update(
constraint="rejected_messages_pkey",
set_={
"error_code": insert_rejected_message_stmt.excluded.error_code,
"details": details,
"details": serializable_details,
"traceback": insert_rejected_message_stmt.excluded.traceback,
"tx_hash": tx_hash,
},
Expand Down
17 changes: 8 additions & 9 deletions src/aleph/services/storage/garbage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging

from aioipfs import NotPinnedError
from aioipfs.api import RepoAPI
from aleph_message.models import ItemHash, ItemType
from configmanager import Config

Expand All @@ -29,15 +28,13 @@ async def _delete_from_ipfs(self, file_hash: ItemHash):
result = await ipfs_client.pin.rm(file_hash)
print(result)

# Launch the IPFS garbage collector (`ipfs repo gc`)
async for _ in RepoAPI(driver=ipfs_client).gc():
pass

except NotPinnedError:
LOGGER.warning("File not pinned: %s", file_hash)
except Exception as err:
LOGGER.error("Failed to unpin file %s: %s", file_hash, str(err))

# Smaller IPFS files are cached in local storage
LOGGER.debug("Deleting %s from local storage")
LOGGER.debug("Deleting %s from local storage", file_hash)
await self._delete_from_local_storage(file_hash)

LOGGER.debug("Removed from IPFS: %s", file_hash)
Expand All @@ -55,7 +52,7 @@ async def collect(self, datetime: dt.datetime):

# Delete files without pins
files_to_delete = list(get_unpinned_files(session))
LOGGER.info("Found %d files to delete")
LOGGER.info("Found %d files to delete", len(files_to_delete))

for file_to_delete in files_to_delete:
file_hash = ItemHash(file_to_delete.hash)
Expand Down Expand Up @@ -91,5 +88,7 @@ async def garbage_collector_task(
LOGGER.info("Starting garbage collection...")
await garbage_collector.collect(datetime=utc_now())
LOGGER.info("Garbage collector ran successfully.")
except Exception:
LOGGER.exception("An unexpected error occurred during garbage collection.")
except Exception as err:
LOGGER.exception(
"An unexpected error occurred during garbage collection.", exc_info=err
)
69 changes: 59 additions & 10 deletions src/aleph/types/message_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,32 @@ def __str__(self):
return self.__class__.__name__

def details(self) -> Optional[Dict[str, Any]]:
"""
Return error details in a JSON serializable format.

Returns:
Dictionary with error details or None if no errors.
"""
errors = self.args[0]
return {"errors": errors} if errors else None

# Ensure errors are JSON serializable
if errors:
# Convert non-serializable objects to strings if needed
serializable_errors = []
for err in errors:
try:
# Test if the error is JSON serializable by attempting to convert to dict
# This will fail for custom objects
if hasattr(err, "__dict__"):
serializable_errors.append(str(err))
else:
serializable_errors.append(err)
except (TypeError, ValueError):
# If conversion fails, use string representation
serializable_errors.append(str(err))

return {"errors": serializable_errors}
return None


class InvalidMessageException(MessageProcessingException):
Expand Down Expand Up @@ -272,14 +296,21 @@ def __init__(
self.parent_size = parent_size

def details(self) -> Optional[Dict[str, Any]]:
"""
Return error details in a JSON serializable format.

Returns:
Dictionary with error details.
"""
# Ensure all values are JSON serializable
return {
"errors": [
{
"volume_name": self.volume_name,
"parent_ref": self.parent_ref,
"parent_file": self.parent_file,
"parent_size": self.parent_size,
"volume_size": self.volume_size,
"volume_name": str(self.volume_name),
"parent_ref": str(self.parent_ref),
"parent_file": str(self.parent_file),
"parent_size": int(self.parent_size),
"volume_size": int(self.volume_size),
}
]
}
Expand All @@ -299,11 +330,17 @@ def __init__(
self.aggregate_key = aggregate_key

def details(self) -> Optional[Dict[str, Any]]:
"""
Return error details in a JSON serializable format.

Returns:
Dictionary with error details.
"""
errors = []
if self.target_hash is not None:
errors.append({"message": self.target_hash})
errors.append({"message": str(self.target_hash)})
if self.aggregate_key is not None:
errors.append({"aggregate": self.aggregate_key})
errors.append({"aggregate": str(self.aggregate_key)})

return {"errors": errors}

Expand All @@ -319,7 +356,13 @@ def __init__(self, target_hash: str):
self.target_hash = target_hash

def details(self) -> Optional[Dict[str, Any]]:
return {"errors": [{"message": self.target_hash}]}
"""
Return error details in a JSON serializable format.

Returns:
Dictionary with error details.
"""
return {"errors": [{"message": str(self.target_hash)}]}


class InsufficientBalanceException(InvalidMessageException):
Expand All @@ -338,7 +381,13 @@ def __init__(
self.required_balance = required_balance

def details(self) -> Optional[Dict[str, Any]]:
# Note: cast to string to keep the precision
"""
Return error details in a JSON serializable format.

Returns:
Dictionary with error details.
"""
# Note: cast to string to keep the precision and ensure it's JSON serializable
return {
"errors": [
{
Expand Down
Loading