Skip to content

Commit ad19c70

Browse files
nesitorAndres D. Molins
andauthored
Solve garbage collector issue (#787)
* Fix: Solve garbage collector issue. * Fix: Solve error on JSON serialization of rejected messages. * Fix: Solve debug line issue. * Fix: Try to run the garbage collector in another thread. * Fix: Undo the thread split and put exception handler to see ipfs errors. * Fix: Move IPFS Garbage collection call to final step. * Fix: Remove IPFS Garbage collection call and force the daemon to have it enabled. --------- Co-authored-by: Andres D. Molins <[email protected]>
1 parent c4a949c commit ad19c70

File tree

8 files changed

+108
-30
lines changed

8 files changed

+108
-30
lines changed

deployment/docker-build/dev/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ services:
115115

116116
ipfs:
117117
restart: always
118-
image: ipfs/kubo:v0.15.0
118+
image: ipfs/kubo:v0.34.1
119119
ports:
120120
- "4001:4001"
121121
- "4001:4001/udp"
@@ -126,7 +126,7 @@ services:
126126
- IPFS_PROFILE=server
127127
networks:
128128
- pyaleph
129-
command: ["daemon", "--enable-pubsub-experiment", "--migrate"]
129+
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]
130130

131131

132132
networks:

deployment/docker-build/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ services:
4747

4848
ipfs:
4949
restart: always
50-
image: ipfs/kubo:v0.15.0
50+
image: ipfs/kubo:v0.34.1
5151
ports:
5252
- "4001:4001"
5353
- "4001:4001/udp"
@@ -59,7 +59,7 @@ services:
5959
- IPFS_PROFILE=server
6060
networks:
6161
- pyaleph
62-
command: ["daemon", "--enable-pubsub-experiment", "--migrate"]
62+
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]
6363

6464
postgres:
6565
restart: always

deployment/samples/docker-compose/docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ services:
105105

106106
ipfs:
107107
restart: always
108-
image: ipfs/kubo:v0.15.0
108+
image: ipfs/kubo:v0.34.1
109109
ports:
110110
- "4001:4001"
111111
- "4001:4001/udp"

deployment/samples/docker-monitoring/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ services:
107107

108108
ipfs:
109109
restart: always
110-
image: ipfs/kubo:v0.15.0
110+
image: ipfs/kubo:v0.34.1
111111
ports:
112112
- "4001:4001"
113113
- "4001:4001/udp"
@@ -118,7 +118,7 @@ services:
118118
- IPFS_PROFILE=server
119119
networks:
120120
- pyaleph
121-
command: ["daemon", "--enable-pubsub-experiment", "--migrate"]
121+
command: ["daemon", "--enable-pubsub-experiment", "--enable-gc", "--migrate"]
122122

123123
prometheus:
124124
restart: always

src/aleph/db/accessors/files.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,11 @@ def get_unpinned_files(session: DbSession) -> Iterable[StoredFileDb]:
2929
"""
3030
Returns the list of files that are not pinned by a message or an on-chain transaction.
3131
"""
32-
select_pins = select(FilePinDb).where(FilePinDb.file_hash == StoredFileDb.hash)
33-
select_stmt = select(StoredFileDb).where(~select_pins.exists())
32+
select_stmt = (
33+
select(StoredFileDb)
34+
.join(FilePinDb, StoredFileDb.hash == FilePinDb.file_hash, isouter=True)
35+
.where(FilePinDb.id.is_(None))
36+
)
3437
return session.execute(select_stmt).scalars()
3538

3639

src/aleph/db/accessors/messages.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -437,19 +437,46 @@ def make_upsert_rejected_message_statement(
437437
exc_traceback: Optional[str] = None,
438438
tx_hash: Optional[str] = None,
439439
) -> Insert:
440+
# Convert details to a dictionary that is JSON serializable
441+
serializable_details = None
442+
if details is not None:
443+
try:
444+
# First try a simple dict conversion
445+
serializable_details = dict(details)
446+
447+
# Now recursively ensure all values within the dictionary are JSON serializable
448+
def ensure_serializable(obj):
449+
if isinstance(obj, dict):
450+
return {k: ensure_serializable(v) for k, v in obj.items()}
451+
elif isinstance(obj, list):
452+
return [ensure_serializable(item) for item in obj]
453+
elif isinstance(obj, (str, int, float, bool, type(None))):
454+
return obj
455+
else:
456+
# For non-serializable types, convert to string
457+
return str(obj)
458+
459+
serializable_details = ensure_serializable(serializable_details)
460+
461+
except Exception:
462+
# If any conversion fails, create a new dict with a message
463+
serializable_details = {
464+
"error": "Details contained non-serializable values"
465+
}
466+
440467
insert_rejected_message_stmt = insert(RejectedMessageDb).values(
441468
item_hash=item_hash,
442469
message=pending_message_dict,
443470
error_code=error_code,
444-
details=details,
471+
details=serializable_details,
445472
traceback=exc_traceback,
446473
tx_hash=tx_hash,
447474
)
448475
upsert_rejected_message_stmt = insert_rejected_message_stmt.on_conflict_do_update(
449476
constraint="rejected_messages_pkey",
450477
set_={
451478
"error_code": insert_rejected_message_stmt.excluded.error_code,
452-
"details": details,
479+
"details": serializable_details,
453480
"traceback": insert_rejected_message_stmt.excluded.traceback,
454481
"tx_hash": tx_hash,
455482
},

src/aleph/services/storage/garbage_collector.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import logging
44

55
from aioipfs import NotPinnedError
6-
from aioipfs.api import RepoAPI
76
from aleph_message.models import ItemHash, ItemType
87
from configmanager import Config
98

@@ -29,15 +28,13 @@ async def _delete_from_ipfs(self, file_hash: ItemHash):
2928
result = await ipfs_client.pin.rm(file_hash)
3029
print(result)
3130

32-
# Launch the IPFS garbage collector (`ipfs repo gc`)
33-
async for _ in RepoAPI(driver=ipfs_client).gc():
34-
pass
35-
3631
except NotPinnedError:
3732
LOGGER.warning("File not pinned: %s", file_hash)
33+
except Exception as err:
34+
LOGGER.error("Failed to unpin file %s: %s", file_hash, str(err))
3835

3936
# Smaller IPFS files are cached in local storage
40-
LOGGER.debug("Deleting %s from local storage")
37+
LOGGER.debug("Deleting %s from local storage", file_hash)
4138
await self._delete_from_local_storage(file_hash)
4239

4340
LOGGER.debug("Removed from IPFS: %s", file_hash)
@@ -55,7 +52,7 @@ async def collect(self, datetime: dt.datetime):
5552

5653
# Delete files without pins
5754
files_to_delete = list(get_unpinned_files(session))
58-
LOGGER.info("Found %d files to delete")
55+
LOGGER.info("Found %d files to delete", len(files_to_delete))
5956

6057
for file_to_delete in files_to_delete:
6158
file_hash = ItemHash(file_to_delete.hash)
@@ -91,5 +88,7 @@ async def garbage_collector_task(
9188
LOGGER.info("Starting garbage collection...")
9289
await garbage_collector.collect(datetime=utc_now())
9390
LOGGER.info("Garbage collector ran successfully.")
94-
except Exception:
95-
LOGGER.exception("An unexpected error occurred during garbage collection.")
91+
except Exception as err:
92+
LOGGER.exception(
93+
"An unexpected error occurred during garbage collection.", exc_info=err
94+
)

src/aleph/types/message_status.py

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,32 @@ def __str__(self):
7474
return self.__class__.__name__
7575

7676
def details(self) -> Optional[Dict[str, Any]]:
77+
"""
78+
Return error details in a JSON serializable format.
79+
80+
Returns:
81+
Dictionary with error details or None if no errors.
82+
"""
7783
errors = self.args[0]
78-
return {"errors": errors} if errors else None
84+
85+
# Ensure errors are JSON serializable
86+
if errors:
87+
# Convert non-serializable objects to strings if needed
88+
serializable_errors = []
89+
for err in errors:
90+
try:
91+
# Test if the error is JSON serializable by attempting to convert to dict
92+
# This will fail for custom objects
93+
if hasattr(err, "__dict__"):
94+
serializable_errors.append(str(err))
95+
else:
96+
serializable_errors.append(err)
97+
except (TypeError, ValueError):
98+
# If conversion fails, use string representation
99+
serializable_errors.append(str(err))
100+
101+
return {"errors": serializable_errors}
102+
return None
79103

80104

81105
class InvalidMessageException(MessageProcessingException):
@@ -272,14 +296,21 @@ def __init__(
272296
self.parent_size = parent_size
273297

274298
def details(self) -> Optional[Dict[str, Any]]:
299+
"""
300+
Return error details in a JSON serializable format.
301+
302+
Returns:
303+
Dictionary with error details.
304+
"""
305+
# Ensure all values are JSON serializable
275306
return {
276307
"errors": [
277308
{
278-
"volume_name": self.volume_name,
279-
"parent_ref": self.parent_ref,
280-
"parent_file": self.parent_file,
281-
"parent_size": self.parent_size,
282-
"volume_size": self.volume_size,
309+
"volume_name": str(self.volume_name),
310+
"parent_ref": str(self.parent_ref),
311+
"parent_file": str(self.parent_file),
312+
"parent_size": int(self.parent_size),
313+
"volume_size": int(self.volume_size),
283314
}
284315
]
285316
}
@@ -299,11 +330,17 @@ def __init__(
299330
self.aggregate_key = aggregate_key
300331

301332
def details(self) -> Optional[Dict[str, Any]]:
333+
"""
334+
Return error details in a JSON serializable format.
335+
336+
Returns:
337+
Dictionary with error details.
338+
"""
302339
errors = []
303340
if self.target_hash is not None:
304-
errors.append({"message": self.target_hash})
341+
errors.append({"message": str(self.target_hash)})
305342
if self.aggregate_key is not None:
306-
errors.append({"aggregate": self.aggregate_key})
343+
errors.append({"aggregate": str(self.aggregate_key)})
307344

308345
return {"errors": errors}
309346

@@ -319,7 +356,13 @@ def __init__(self, target_hash: str):
319356
self.target_hash = target_hash
320357

321358
def details(self) -> Optional[Dict[str, Any]]:
322-
return {"errors": [{"message": self.target_hash}]}
359+
"""
360+
Return error details in a JSON serializable format.
361+
362+
Returns:
363+
Dictionary with error details.
364+
"""
365+
return {"errors": [{"message": str(self.target_hash)}]}
323366

324367

325368
class InsufficientBalanceException(InvalidMessageException):
@@ -338,7 +381,13 @@ def __init__(
338381
self.required_balance = required_balance
339382

340383
def details(self) -> Optional[Dict[str, Any]]:
341-
# Note: cast to string to keep the precision
384+
"""
385+
Return error details in a JSON serializable format.
386+
387+
Returns:
388+
Dictionary with error details.
389+
"""
390+
# Note: cast to string to keep the precision and ensure it's JSON serializable
342391
return {
343392
"errors": [
344393
{

0 commit comments

Comments
 (0)