From 30a9d554bf00b9b9e423b8068556c33530a34801 Mon Sep 17 00:00:00 2001 From: amalcaraz Date: Mon, 11 Aug 2025 19:15:32 +0200 Subject: [PATCH 1/3] feat: implement credit cost calculation and garbage collector --- ...036_b7c8d9e0f1a2_add_cost_credit_column.py | 26 +++ ...8d9e0f1a2b3_add_credit_balance_cron_job.py | 34 ++++ src/aleph/commands.py | 6 +- src/aleph/db/accessors/balances.py | 12 ++ src/aleph/db/accessors/cost.py | 27 ++-- src/aleph/db/models/account_costs.py | 1 + src/aleph/handlers/content/store.py | 49 +++--- src/aleph/handlers/content/vm.py | 31 ++-- src/aleph/jobs/cron/credit_balance_job.py | 149 ++++++++++++++++++ src/aleph/services/cost.py | 58 ++++--- src/aleph/services/cost_validation.py | 65 ++++++++ src/aleph/toolkit/constants.py | 44 ++++-- src/aleph/types/cost.py | 5 + src/aleph/types/message_status.py | 37 +++++ 14 files changed, 456 insertions(+), 88 deletions(-) create mode 100644 deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py create mode 100644 deployment/migrations/versions/0037_c8d9e0f1a2b3_add_credit_balance_cron_job.py create mode 100644 src/aleph/jobs/cron/credit_balance_job.py create mode 100644 src/aleph/services/cost_validation.py diff --git a/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py b/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py new file mode 100644 index 00000000..32d0724e --- /dev/null +++ b/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py @@ -0,0 +1,26 @@ +"""add cost_credit column to account_costs table + +Revision ID: b7c8d9e0f1a2 +Revises: a1b2c3d4e5f6 +Create Date: 2025-01-11 00:00:00.000000 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'b7c8d9e0f1a2' +down_revision = 'a1b2c3d4e5f6' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add cost_credit column to account_costs table + op.add_column('account_costs', sa.Column('cost_credit', sa.DECIMAL(), nullable=False, server_default='0')) + + +def downgrade() -> None: + # Remove cost_credit column from account_costs table + op.drop_column('account_costs', 'cost_credit') \ No newline at end of file diff --git a/deployment/migrations/versions/0037_c8d9e0f1a2b3_add_credit_balance_cron_job.py b/deployment/migrations/versions/0037_c8d9e0f1a2b3_add_credit_balance_cron_job.py new file mode 100644 index 00000000..81eb3117 --- /dev/null +++ b/deployment/migrations/versions/0037_c8d9e0f1a2b3_add_credit_balance_cron_job.py @@ -0,0 +1,34 @@ +"""add credit_balance cron job + +Revision ID: c8d9e0f1a2b3 +Revises: b7c8d9e0f1a2 +Create Date: 2025-01-11 00:00:00.000000 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = 'c8d9e0f1a2b3' +down_revision = 'b7c8d9e0f1a2' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Add credit_balance cron job to run every hour (3600 seconds) + op.execute( + """ + INSERT INTO cron_jobs(id, interval, last_run) + VALUES ('credit_balance', 3600, '2025-01-01 00:00:00') + """ + ) + + +def downgrade() -> None: + # Remove credit_balance cron job + op.execute( + """ + DELETE FROM cron_jobs WHERE id = 'credit_balance' + """ + ) \ No newline at end of file diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 062b65a8..a55d6c87 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -30,6 +30,7 @@ from aleph.exceptions import InvalidConfigException, KeyNotFoundException from aleph.jobs import start_jobs from aleph.jobs.cron.balance_job import BalanceCronJob +from aleph.jobs.cron.credit_balance_job import CreditBalanceCronJob from aleph.jobs.cron.cron_job import CronJob, cron_job_task from aleph.network import listener_tasks from aleph.services import p2p @@ -151,7 +152,10 @@ async def main(args: List[str]) -> None: ) cron_job = CronJob( session_factory=session_factory, - jobs={"balance": BalanceCronJob(session_factory=session_factory)}, + jobs={ + "balance": BalanceCronJob(session_factory=session_factory), + "credit_balance": CreditBalanceCronJob(session_factory=session_factory), + }, ) chain_data_service = ChainDataService( session_factory=session_factory, diff --git a/src/aleph/db/accessors/balances.py b/src/aleph/db/accessors/balances.py index 0b1d8ede..1cd3dcc8 100644 --- a/src/aleph/db/accessors/balances.py +++ b/src/aleph/db/accessors/balances.py @@ -385,3 +385,15 @@ def update_credit_balances_airdrop( # Drop the temporary table session.execute("DROP TABLE temp_credit_balances") # type: ignore[arg-type] + + +def get_updated_credit_balance_accounts(session: DbSession, last_update: dt.datetime): + """ + Get addresses that have had their credit balances updated since the given timestamp. + """ + select_stmt = ( + select(AlephCreditBalanceDb.address) + .where(AlephCreditBalanceDb.last_update >= last_update) + .distinct() + ) + return session.execute(select_stmt).scalars().all() diff --git a/src/aleph/db/accessors/cost.py b/src/aleph/db/accessors/cost.py index f7d7f8c4..a0dde333 100644 --- a/src/aleph/db/accessors/cost.py +++ b/src/aleph/db/accessors/cost.py @@ -19,11 +19,14 @@ def get_total_cost_for_address( address: str, payment_type: Optional[PaymentType] = PaymentType.hold, ) -> Decimal: - total_prop = ( - AccountCostsDb.cost_hold - if payment_type == PaymentType.hold - else AccountCostsDb.cost_stream - ) + if payment_type == PaymentType.hold: + total_prop = AccountCostsDb.cost_hold + elif payment_type == PaymentType.superfluid: + total_prop = AccountCostsDb.cost_stream + elif payment_type == PaymentType.credit: + total_prop = AccountCostsDb.cost_credit + else: + total_prop = AccountCostsDb.cost_hold select_stmt = ( select(func.sum(total_prop)) @@ -43,11 +46,14 @@ def get_total_costs_for_address_grouped_by_message( address: str, payment_type: Optional[PaymentType] = PaymentType.hold, ): - total_prop = ( - AccountCostsDb.cost_hold - if payment_type == PaymentType.hold - else AccountCostsDb.cost_stream - ) + if payment_type == PaymentType.hold: + total_prop = AccountCostsDb.cost_hold + elif payment_type == PaymentType.superfluid: + total_prop = AccountCostsDb.cost_stream + elif payment_type == PaymentType.credit: + total_prop = AccountCostsDb.cost_credit + else: + total_prop = AccountCostsDb.cost_hold id_field = func.min(AccountCostsDb.id) @@ -94,6 +100,7 @@ def make_costs_upsert_query(costs: List[AccountCostsDb]) -> Insert: set_={ "cost_hold": upsert_stmt.excluded.cost_hold, "cost_stream": upsert_stmt.excluded.cost_stream, + "cost_credit": upsert_stmt.excluded.cost_credit, }, ) diff --git a/src/aleph/db/models/account_costs.py b/src/aleph/db/models/account_costs.py index fce6a702..283255d7 100644 --- a/src/aleph/db/models/account_costs.py +++ b/src/aleph/db/models/account_costs.py @@ -25,5 +25,6 @@ class AccountCostsDb(Base): payment_type: PaymentType = Column(ChoiceType(PaymentType), nullable=False) cost_hold: Decimal = Column(DECIMAL, nullable=False) cost_stream: Decimal = Column(DECIMAL, nullable=False) + cost_credit: Decimal = Column(DECIMAL, nullable=False, default=0) __table_args__ = (UniqueConstraint("owner", "item_hash", "type", "name"),) diff --git a/src/aleph/handlers/content/store.py b/src/aleph/handlers/content/store.py index eeade47b..e4c3f148 100644 --- a/src/aleph/handlers/content/store.py +++ b/src/aleph/handlers/content/store.py @@ -15,8 +15,6 @@ from aleph_message.models import ItemHash, ItemType, StoreContent from aleph.config import get_config -from aleph.db.accessors.balances import get_total_balance -from aleph.db.accessors.cost import get_total_cost_for_address from aleph.db.accessors.files import ( delete_file_pin, get_file_tag, @@ -33,7 +31,12 @@ from aleph.exceptions import AlephStorageException, UnknownHashError from aleph.handlers.content.content_handler import ContentHandler from aleph.schemas.cost_estimation_messages import CostEstimationStoreContent -from aleph.services.cost import calculate_storage_size, get_total_and_detailed_costs +from aleph.services.cost import ( + calculate_storage_size, + get_payment_type, + get_total_and_detailed_costs, +) +from aleph.services.cost_validation import validate_balance_for_payment from aleph.storage import StorageService from aleph.toolkit.constants import MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE, MiB from aleph.toolkit.costs import are_store_and_program_free @@ -42,7 +45,6 @@ from aleph.types.files import FileType from aleph.types.message_status import ( FileUnavailable, - InsufficientBalanceException, InvalidMessageFormat, PermissionDenied, StoreCannotUpdateStoreWithRef, @@ -190,11 +192,6 @@ async def pre_check_balance(self, session: DbSession, message: MessageDb): config = get_config() ipfs_enabled = config.ipfs.enabled.value - current_balance = get_total_balance(session=session, address=content.address) - current_cost = get_total_cost_for_address( - session=session, address=content.address - ) - engine = content.item_type # Initially only do that balance pre-check for ipfs files. if engine == ItemType.ipfs and ipfs_enabled: @@ -229,13 +226,14 @@ async def pre_check_balance(self, session: DbSession, message: MessageDb): else: message_cost = Decimal(0) - required_balance = current_cost + message_cost - - if current_balance < required_balance: - raise InsufficientBalanceException( - balance=current_balance, - required_balance=required_balance, - ) + # Use reusable validation for all payment types including credit + payment_type = get_payment_type(content) + validate_balance_for_payment( + session=session, + address=content.address, + message_cost=message_cost, + payment_type=payment_type, + ) return None @@ -258,18 +256,15 @@ async def check_balance( ): return costs - current_balance = get_total_balance(address=content.address, session=session) - current_cost = get_total_cost_for_address( - session=session, address=content.address - ) - - required_balance = current_cost + message_cost + payment_type = get_payment_type(content) - if current_balance < required_balance: - raise InsufficientBalanceException( - balance=current_balance, - required_balance=required_balance, - ) + # Use reusable validation for all payment types + validate_balance_for_payment( + session=session, + address=content.address, + message_cost=message_cost, + payment_type=payment_type, + ) return costs diff --git a/src/aleph/handlers/content/vm.py b/src/aleph/handlers/content/vm.py index f7b00da7..71c534f6 100644 --- a/src/aleph/handlers/content/vm.py +++ b/src/aleph/handlers/content/vm.py @@ -17,8 +17,6 @@ PersistentVolume, ) -from aleph.db.accessors.balances import get_total_balance -from aleph.db.accessors.cost import get_total_cost_for_address from aleph.db.accessors.files import ( find_file_pins, find_file_tags, @@ -51,13 +49,13 @@ ) from aleph.db.models.account_costs import AccountCostsDb from aleph.handlers.content.content_handler import ContentHandler -from aleph.services.cost import get_total_and_detailed_costs +from aleph.services.cost import get_payment_type, get_total_and_detailed_costs +from aleph.services.cost_validation import validate_balance_for_payment from aleph.toolkit.costs import are_store_and_program_free from aleph.toolkit.timestamp import timestamp_to_datetime from aleph.types.db_session import DbSession from aleph.types.files import FileTag from aleph.types.message_status import ( - InsufficientBalanceException, InternalError, InvalidMessageFormat, VmCannotUpdateUpdate, @@ -352,22 +350,19 @@ async def check_balance( ): return costs - # NOTE: For now allow to create anything that is being paid with STREAM for free, but generate a cost depending on the content.payment prop (HOLD / STREAM) - if content.payment and content.payment.is_stream: - return costs - - # NOTE: Instances and persistent Programs being paid by HOLD are the only ones being checked for now - current_balance = get_total_balance(address=content.address, session=session) - current_cost = get_total_cost_for_address( - session=session, address=content.address - ) + payment_type = get_payment_type(content) - required_balance = current_cost + message_cost + # NOTE: For now allow to create anything that is being paid with STREAM for free, but generate a cost depending on the content.payment prop (HOLD / STREAM / CREDIT) + if payment_type == PaymentType.superfluid: + return costs - if current_balance < required_balance: - raise InsufficientBalanceException( - balance=current_balance, - required_balance=required_balance, + # Handle credit and other payment types using reusable validation + if payment_type in [PaymentType.credit, PaymentType.hold]: + validate_balance_for_payment( + session=session, + address=content.address, + message_cost=message_cost, + payment_type=payment_type, ) return costs diff --git a/src/aleph/jobs/cron/credit_balance_job.py b/src/aleph/jobs/cron/credit_balance_job.py new file mode 100644 index 00000000..70aff0a9 --- /dev/null +++ b/src/aleph/jobs/cron/credit_balance_job.py @@ -0,0 +1,149 @@ +import datetime as dt +import logging +from typing import List + +from aleph_message.models import MessageType, PaymentType + +from aleph.db.accessors.balances import ( + get_credit_balance, + get_updated_credit_balance_accounts, +) +from aleph.db.accessors.cost import get_total_costs_for_address_grouped_by_message +from aleph.db.accessors.files import update_file_pin_grace_period +from aleph.db.accessors.messages import ( + get_message_by_item_hash, + get_message_status, + make_message_status_upsert_query, +) +from aleph.db.models.cron_jobs import CronJobDb +from aleph.db.models.messages import MessageStatusDb +from aleph.jobs.cron.cron_job import BaseCronJob +from aleph.services.cost import calculate_storage_size +from aleph.toolkit.constants import MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE, MiB +from aleph.toolkit.timestamp import utc_now +from aleph.types.db_session import DbSession, DbSessionFactory +from aleph.types.message_status import MessageStatus + +LOGGER = logging.getLogger(__name__) + + +class CreditBalanceCronJob(BaseCronJob): + def __init__(self, session_factory: DbSessionFactory): + self.session_factory = session_factory + + async def run(self, now: dt.datetime, job: CronJobDb): + with self.session_factory() as session: + accounts = get_updated_credit_balance_accounts(session, job.last_run) + + LOGGER.info( + f"Checking '{len(accounts)}' updated credit balance accounts..." + ) + + for address in accounts: + remaining_credits = get_credit_balance(session, address) + + to_delete = [] + to_recover = [] + + credit_costs = get_total_costs_for_address_grouped_by_message( + session, address, PaymentType.credit + ) + + for item_hash, height, cost, _ in credit_costs: + status = get_message_status(session, item_hash) + + LOGGER.info( + f"Checking credit message {item_hash} with cost {cost} credits" + ) + + # For credits, check if balance is insufficient for minimum 1-day runtime + # Cost is per hour, so multiply by 24 for daily requirement + daily_cost = cost * 24 + should_remove = remaining_credits < daily_cost + remaining_credits = max(0, remaining_credits - cost) + + status = get_message_status(session, item_hash) + if status is None: + continue + + if should_remove: + if ( + status.status != MessageStatus.REMOVING + and status.status != MessageStatus.REMOVED + ): + to_delete.append(item_hash) + else: + if status.status == MessageStatus.REMOVING: + to_recover.append(item_hash) + + if len(to_delete) > 0: + LOGGER.info( + f"'{len(to_delete)}' credit-paid messages to delete for account '{address}'..." + ) + await self.delete_messages(session, to_delete) + + if len(to_recover) > 0: + LOGGER.info( + f"'{len(to_recover)}' credit-paid messages to recover for account '{address}'..." + ) + await self.recover_messages(session, to_recover) + + session.commit() + + async def delete_messages(self, session: DbSession, messages: List[str]): + for item_hash in messages: + message = get_message_by_item_hash(session, item_hash) + + if message is None: + continue + + if message.type == MessageType.store: + storage_size_mib = calculate_storage_size( + session, message.parsed_content + ) + + if storage_size_mib and storage_size_mib <= ( + MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE / MiB + ): + continue + + now = utc_now() + delete_by = now + dt.timedelta(hours=24 + 1) + + if message.type == MessageType.store: + update_file_pin_grace_period( + session=session, + item_hash=item_hash, + delete_by=delete_by, + ) + + session.execute( + make_message_status_upsert_query( + item_hash=item_hash, + new_status=MessageStatus.REMOVING, + reception_time=now, + where=(MessageStatusDb.status == MessageStatus.PROCESSED), + ) + ) + + async def recover_messages(self, session: DbSession, messages: List[str]): + for item_hash in messages: + message = get_message_by_item_hash(session, item_hash) + if message is None: + continue + + if message.type == MessageType.store: + update_file_pin_grace_period( + session=session, + item_hash=item_hash, + delete_by=None, + ) + + session.execute( + make_message_status_upsert_query( + item_hash=item_hash, + new_status=MessageStatus.PROCESSED, + reception_time=utc_now(), + where=(MessageStatusDb.status == MessageStatus.REMOVING), + ) + ) diff --git a/src/aleph/services/cost.py b/src/aleph/services/cost.py index 05f4ada6..b2513657 100644 --- a/src/aleph/services/cost.py +++ b/src/aleph/services/cost.py @@ -85,15 +85,12 @@ def _get_settings(session: DbSession) -> Settings: def get_payment_type(content: CostComputableContent) -> PaymentType: - return ( - PaymentType.superfluid - if ( - hasattr(content, "payment") - and content.payment - and content.payment.is_stream - ) - else PaymentType.hold - ) + if hasattr(content, "payment") and content.payment and content.payment.is_credit: + return PaymentType.credit + elif hasattr(content, "payment") and content.payment and content.payment.is_stream: + return PaymentType.superfluid + else: + return PaymentType.hold def _is_confidential_vm( @@ -241,6 +238,7 @@ def _get_volumes_costs( price_per_mib_second: Decimal, owner: str, item_hash: str, + price_per_mib_credit: Decimal, ) -> List[AccountCostsDb]: costs: List[AccountCostsDb] = [] @@ -263,6 +261,7 @@ def _get_volumes_costs( cost_stream = format_cost( storage_mib * price_per_mib_second, ) + cost_credit = format_cost(storage_mib * price_per_mib_credit) costs.append( AccountCostsDb( @@ -274,6 +273,7 @@ def _get_volumes_costs( payment_type=payment_type, cost_hold=cost_hold, cost_stream=cost_stream, + cost_credit=cost_credit, ) ) @@ -407,6 +407,7 @@ def _get_execution_volumes_costs( price_per_mib = pricing.price.storage.holding price_per_mib_second = pricing.price.storage.payg / HOUR + price_per_mib_credit = pricing.price.storage.credit / HOUR return _get_volumes_costs( session, @@ -416,6 +417,7 @@ def _get_execution_volumes_costs( price_per_mib_second, content.address, item_hash, + price_per_mib_credit, ) @@ -439,9 +441,11 @@ def _get_additional_storage_price( price_per_mib = pricing.price.storage.holding price_per_mib_second = pricing.price.storage.payg / HOUR + price_per_mib_credit = pricing.price.storage.credit / HOUR max_discount_hold = execution_volume_discount_mib * price_per_mib max_discount_stream = execution_volume_discount_mib * price_per_mib_second + max_discount_credit = execution_volume_discount_mib * price_per_mib_credit discount_holding = min( Decimal(reduce(lambda x, y: x + Decimal(y.cost_hold), costs, Decimal(0))), @@ -451,9 +455,14 @@ def _get_additional_storage_price( Decimal(reduce(lambda x, y: x + Decimal(y.cost_stream), costs, Decimal(0))), max_discount_stream, ) + discount_credit = min( + Decimal(reduce(lambda x, y: x + Decimal(y.cost_credit), costs, Decimal(0))), + max_discount_credit, + ) cost_hold = format_cost(-discount_holding) cost_stream = format_cost(-discount_stream) + cost_credit = format_cost(-discount_credit) costs.append( AccountCostsDb( @@ -464,6 +473,7 @@ def _get_additional_storage_price( payment_type=payment_type, cost_hold=cost_hold, cost_stream=cost_stream, + cost_credit=cost_credit, ) ) @@ -498,6 +508,7 @@ def _calculate_executable_costs( compute_unit_cost = pricing.price.compute_unit.holding compute_unit_cost_second = pricing.price.compute_unit.payg / HOUR + compute_unit_cost_credit = pricing.price.compute_unit.credit / HOUR compute_unit_price = ( compute_units_required * compute_unit_multiplier * compute_unit_cost @@ -505,9 +516,13 @@ def _calculate_executable_costs( compute_unit_price_stream = ( compute_units_required * compute_unit_multiplier * compute_unit_cost_second ) + compute_unit_price_credit = ( + compute_units_required * compute_unit_multiplier * compute_unit_cost_credit + ) cost_hold = format_cost(compute_unit_price) cost_stream = format_cost(compute_unit_price_stream) + cost_credit = format_cost(compute_unit_price_credit) execution_cost = AccountCostsDb( owner=content.address, @@ -517,6 +532,7 @@ def _calculate_executable_costs( payment_type=payment_type, cost_hold=cost_hold, cost_stream=cost_stream, + cost_credit=cost_credit, ) costs: List[AccountCostsDb] = [execution_cost] @@ -544,6 +560,7 @@ def _calculate_storage_costs( price_per_mib = pricing.price.storage.holding price_per_mib_second = pricing.price.storage.payg / HOUR + price_per_mib_credit = pricing.price.storage.credit / HOUR return _get_volumes_costs( session, @@ -553,6 +570,7 @@ def _calculate_storage_costs( price_per_mib_second, content.address, item_hash, + price_per_mib_credit, ) @@ -596,11 +614,12 @@ def get_total_and_detailed_costs( payment_type = get_payment_type(content) costs = get_detailed_costs(session, content, item_hash) - cost = format_cost( - reduce(lambda x, y: x + y.cost_stream, costs, Decimal(0)) - if payment_type == PaymentType.superfluid - else reduce(lambda x, y: x + y.cost_hold, costs, Decimal(0)) - ) + if payment_type == PaymentType.superfluid: + cost = format_cost(reduce(lambda x, y: x + y.cost_stream, costs, Decimal(0))) + elif payment_type == PaymentType.credit: + cost = format_cost(reduce(lambda x, y: x + y.cost_credit, costs, Decimal(0))) + else: + cost = format_cost(reduce(lambda x, y: x + y.cost_hold, costs, Decimal(0))) return Decimal(cost), list(costs) @@ -613,10 +632,11 @@ def get_total_and_detailed_costs_from_db( payment_type = get_payment_type(content) costs = get_message_costs(session, item_hash) - cost = format_cost( - reduce(lambda x, y: x + y.cost_stream, costs, Decimal(0)) - if payment_type == PaymentType.superfluid - else reduce(lambda x, y: x + y.cost_hold, costs, Decimal(0)) - ) + if payment_type == PaymentType.superfluid: + cost = format_cost(reduce(lambda x, y: x + y.cost_stream, costs, Decimal(0))) + elif payment_type == PaymentType.credit: + cost = format_cost(reduce(lambda x, y: x + y.cost_credit, costs, Decimal(0))) + else: + cost = format_cost(reduce(lambda x, y: x + y.cost_hold, costs, Decimal(0))) return Decimal(cost), list(costs) diff --git a/src/aleph/services/cost_validation.py b/src/aleph/services/cost_validation.py new file mode 100644 index 00000000..41c8673a --- /dev/null +++ b/src/aleph/services/cost_validation.py @@ -0,0 +1,65 @@ +from decimal import Decimal + +from aleph_message.models import PaymentType + +from aleph.db.accessors.balances import get_credit_balance, get_total_balance +from aleph.db.accessors.cost import get_total_cost_for_address +from aleph.types.db_session import DbSession +from aleph.types.message_status import ( + InsufficientBalanceException, + InsufficientCreditException, +) + + +def validate_balance_for_payment( + session: DbSession, + address: str, + message_cost: Decimal, + payment_type: PaymentType, +) -> None: + """ + Validates that an address has sufficient balance for a message cost based on payment type. + + For credit payments, validates minimum 1-day (24 hour) runtime requirement. + For other payment types, validates standard balance requirements. + + Args: + session: Database session + address: Account address to check + message_cost: Cost of the message (per hour for credits) + payment_type: Type of payment (hold, superfluid, credit) + + Raises: + InsufficientBalanceException: When token balance is insufficient + InsufficientCreditException: When credit balance is insufficient + """ + if payment_type == PaymentType.credit: + current_credit_balance = get_credit_balance(address=address, session=session) + current_credit_cost = get_total_cost_for_address( + session=session, address=address, payment_type=PaymentType.credit + ) + + # Calculate minimum required credits for 1-day runtime (24 hours) + daily_credit_cost = message_cost * 24 # Assuming message_cost is per hour + required_credits = current_credit_cost + daily_credit_cost + + if current_credit_balance < required_credits: + raise InsufficientCreditException( + credit_balance=current_credit_balance, + required_credits=required_credits, + min_runtime_days=1, + ) + else: + # Handle regular balance checks for hold/superfluid payments + current_balance = get_total_balance(address=address, session=session) + current_cost = get_total_cost_for_address( + session=session, address=address, payment_type=payment_type + ) + + required_balance = current_cost + message_cost + + if current_balance < required_balance: + raise InsufficientBalanceException( + balance=current_balance, + required_balance=required_balance, + ) diff --git a/src/aleph/toolkit/constants.py b/src/aleph/toolkit/constants.py index d33ccc1a..0c7312be 100644 --- a/src/aleph/toolkit/constants.py +++ b/src/aleph/toolkit/constants.py @@ -11,8 +11,12 @@ DEFAULT_PRICE_AGGREGATE = { "program": { "price": { - "storage": {"payg": "0.000000977", "holding": "0.05"}, - "compute_unit": {"payg": "0.011", "holding": "200"}, + "storage": { + "payg": "0.000000977", + "holding": "0.05", + "credit": "0.000000977", + }, + "compute_unit": {"payg": "0.011", "holding": "200", "credit": "0.011"}, }, "tiers": [ {"id": "tier-1", "compute_units": 1}, @@ -28,11 +32,17 @@ "memory_mib": 2048, }, }, - "storage": {"price": {"storage": {"holding": "0.333333333"}}}, + "storage": { + "price": {"storage": {"holding": "0.333333333", "credit": "0.333333333"}} + }, "instance": { "price": { - "storage": {"payg": "0.000000977", "holding": "0.05"}, - "compute_unit": {"payg": "0.055", "holding": "1000"}, + "storage": { + "payg": "0.000000977", + "holding": "0.05", + "credit": "0.000000977", + }, + "compute_unit": {"payg": "0.055", "holding": "1000", "credit": "0.055"}, }, "tiers": [ {"id": "tier-1", "compute_units": 1}, @@ -51,8 +61,12 @@ "web3_hosting": {"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}}, "program_persistent": { "price": { - "storage": {"payg": "0.000000977", "holding": "0.05"}, - "compute_unit": {"payg": "0.055", "holding": "1000"}, + "storage": { + "payg": "0.000000977", + "holding": "0.05", + "credit": "0.000000977", + }, + "compute_unit": {"payg": "0.055", "holding": "1000", "credit": "0.055"}, }, "tiers": [ {"id": "tier-1", "compute_units": 1}, @@ -70,8 +84,8 @@ }, "instance_gpu_premium": { "price": { - "storage": {"payg": "0.000000977"}, - "compute_unit": {"payg": "0.56"}, + "storage": {"payg": "0.000000977", "credit": "0.000000977"}, + "compute_unit": {"payg": "0.56", "credit": "0.56"}, }, "tiers": [ { @@ -95,8 +109,12 @@ }, "instance_confidential": { "price": { - "storage": {"payg": "0.000000977", "holding": "0.05"}, - "compute_unit": {"payg": "0.11", "holding": "2000"}, + "storage": { + "payg": "0.000000977", + "holding": "0.05", + "credit": "0.000000977", + }, + "compute_unit": {"payg": "0.11", "holding": "2000", "credit": "0.11"}, }, "tiers": [ {"id": "tier-1", "compute_units": 1}, @@ -114,8 +132,8 @@ }, "instance_gpu_standard": { "price": { - "storage": {"payg": "0.000000977"}, - "compute_unit": {"payg": "0.28"}, + "storage": {"payg": "0.000000977", "credit": "0.000000977"}, + "compute_unit": {"payg": "0.28", "credit": "0.28"}, }, "tiers": [ { diff --git a/src/aleph/types/cost.py b/src/aleph/types/cost.py index 780bef3b..64c174f9 100644 --- a/src/aleph/types/cost.py +++ b/src/aleph/types/cost.py @@ -19,14 +19,17 @@ class ProductPriceType(str, Enum): class ProductPriceOptions: holding: Decimal payg: Decimal + credit: Decimal def __init__( self, holding: Optional[str | Decimal], payg: Optional[str | Decimal] = Decimal(0), + credit: Optional[str | Decimal] = Decimal(0), ): self.holding = Decimal(holding or 0) self.payg = Decimal(payg or 0) + self.credit = Decimal(credit or 0) class ProductComputeUnit: @@ -122,11 +125,13 @@ def from_aggregate( storage=ProductPriceOptions( price["storage"].get("holding", None), price["storage"].get("payg", None), + price["storage"].get("credit", None), ), compute_unit=( ProductPriceOptions( price["compute_unit"].get("holding", None), price["compute_unit"].get("payg", None), + price["compute_unit"].get("credit", None), ) if compute_unit else None diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index 84aef7ff..a8a44e17 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -41,6 +41,7 @@ class ErrorCode(IntEnum): CONTENT_UNAVAILABLE = 3 FILE_UNAVAILABLE = 4 BALANCE_INSUFFICIENT = 5 + CREDIT_INSUFFICIENT = 6 POST_AMEND_NO_TARGET = 100 POST_AMEND_TARGET_NOT_FOUND = 101 POST_AMEND_AMEND = 102 @@ -60,6 +61,7 @@ class ErrorCode(IntEnum): class RemovedMessageReason(str, Enum): BALANCE_INSUFFICIENT = "balance_insufficient" + CREDIT_INSUFFICIENT = "credit_insufficient" class MessageProcessingException(Exception): @@ -340,6 +342,41 @@ def details(self) -> Optional[Dict[str, Any]]: } +class InsufficientCreditException(InvalidMessageException): + """ + The user does not have enough Aleph credits to process the message. + """ + + error_code = ErrorCode.CREDIT_INSUFFICIENT + + def __init__( + self, + credit_balance: Decimal, + required_credits: Decimal, + min_runtime_days: int = 1, + ): + self.credit_balance = credit_balance + self.required_credits = required_credits + self.min_runtime_days = min_runtime_days + + def details(self) -> Optional[Dict[str, Any]]: + """ + Return error details in a JSON serializable format. + + Returns: + Dictionary with error details. + """ + return { + "errors": [ + { + "required_credits": str(self.required_credits), + "account_credits": str(self.credit_balance), + "min_runtime_days": self.min_runtime_days, + } + ] + } + + class ForgetTargetNotFound(RetryMessageException): """ A target specified in the FORGET message could not be found. From 96663a12813b04ed5b91678bf2d6695f615bc85f Mon Sep 17 00:00:00 2001 From: amalcaraz Date: Thu, 14 Aug 2025 14:51:47 +0200 Subject: [PATCH 2/3] point to tmp branch for aleph-message package --- pyproject.toml | 3 ++- src/aleph/types/message_status.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 2f319de4..2066e9ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,7 +28,8 @@ dependencies = [ "aiohttp-jinja2==1.6", "aioipfs~=0.7.1", "alembic==1.15.1", - "aleph-message~=1.0.2", + # "aleph-message~=1.0.3", + "aleph-message @ git+https://github.com/aleph-im/aleph-message@andres-feature-implement_credits_payment", "aleph-nuls2==0.1", "aleph-p2p-client @ git+https://github.com/aleph-im/p2p-service-client-python@cbfebb871db94b2ca580e66104a67cd730c5020c", "asyncpg==0.30", diff --git a/src/aleph/types/message_status.py b/src/aleph/types/message_status.py index a8a44e17..ea324f4b 100644 --- a/src/aleph/types/message_status.py +++ b/src/aleph/types/message_status.py @@ -351,7 +351,7 @@ class InsufficientCreditException(InvalidMessageException): def __init__( self, - credit_balance: Decimal, + credit_balance: int, required_credits: Decimal, min_runtime_days: int = 1, ): From 374c7993adf33443b67c62995cf79906cc511d94 Mon Sep 17 00:00:00 2001 From: amalcaraz Date: Thu, 14 Aug 2025 16:22:27 +0200 Subject: [PATCH 3/3] fix: added missing migration for error code --- .../0036_b7c8d9e0f1a2_add_cost_credit_column.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py b/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py index 32d0724e..899eb662 100644 --- a/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py +++ b/deployment/migrations/versions/0036_b7c8d9e0f1a2_add_cost_credit_column.py @@ -19,8 +19,20 @@ def upgrade() -> None: # Add cost_credit column to account_costs table op.add_column('account_costs', sa.Column('cost_credit', sa.DECIMAL(), nullable=False, server_default='0')) + + # Add missing CREDIT_INSUFFICIENT error code + op.execute( + """ + INSERT INTO error_codes(code, description) VALUES + (6, 'Insufficient credit') + ON CONFLICT (code) DO NOTHING + """ + ) def downgrade() -> None: + # Remove CREDIT_INSUFFICIENT error code + op.execute("DELETE FROM error_codes WHERE code = 6") + # Remove cost_credit column from account_costs table op.drop_column('account_costs', 'cost_credit') \ No newline at end of file