Skip to content

feat: Implement comprehensive credit-based cost calculation and lifecycle management #836

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: angel-credit_balances
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""add cost_credit column to account_costs table

Revision ID: b7c8d9e0f1a2
Revises: a1b2c3d4e5f6
Create Date: 2025-01-11 00:00:00.000000

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'b7c8d9e0f1a2'
down_revision = 'a1b2c3d4e5f6'
branch_labels = None
depends_on = None


def upgrade() -> None:
# Add cost_credit column to account_costs table
op.add_column('account_costs', sa.Column('cost_credit', sa.DECIMAL(), nullable=False, server_default='0'))

# Add missing CREDIT_INSUFFICIENT error code
op.execute(
"""
INSERT INTO error_codes(code, description) VALUES
(6, 'Insufficient credit')
ON CONFLICT (code) DO NOTHING
"""
)


def downgrade() -> None:
# Remove CREDIT_INSUFFICIENT error code
op.execute("DELETE FROM error_codes WHERE code = 6")

# Remove cost_credit column from account_costs table
op.drop_column('account_costs', 'cost_credit')
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add credit_balance cron job

Revision ID: c8d9e0f1a2b3
Revises: b7c8d9e0f1a2
Create Date: 2025-01-11 00:00:00.000000

"""
from alembic import op


# revision identifiers, used by Alembic.
revision = 'c8d9e0f1a2b3'
down_revision = 'b7c8d9e0f1a2'
branch_labels = None
depends_on = None


def upgrade() -> None:
# Add credit_balance cron job to run every hour (3600 seconds)
op.execute(
"""
INSERT INTO cron_jobs(id, interval, last_run)
VALUES ('credit_balance', 3600, '2025-01-01 00:00:00')
"""
)


def downgrade() -> None:
# Remove credit_balance cron job
op.execute(
"""
DELETE FROM cron_jobs WHERE id = 'credit_balance'
"""
)
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ dependencies = [
"aiohttp-jinja2==1.6",
"aioipfs~=0.7.1",
"alembic==1.15.1",
"aleph-message~=1.0.2",
# "aleph-message~=1.0.3",
"aleph-message @ git+https://github.com/aleph-im/aleph-message@andres-feature-implement_credits_payment",
"aleph-nuls2==0.1",
"aleph-p2p-client @ git+https://github.com/aleph-im/p2p-service-client-python@cbfebb871db94b2ca580e66104a67cd730c5020c",
"asyncpg==0.30",
Expand Down
6 changes: 5 additions & 1 deletion src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from aleph.exceptions import InvalidConfigException, KeyNotFoundException
from aleph.jobs import start_jobs
from aleph.jobs.cron.balance_job import BalanceCronJob
from aleph.jobs.cron.credit_balance_job import CreditBalanceCronJob
from aleph.jobs.cron.cron_job import CronJob, cron_job_task
from aleph.network import listener_tasks
from aleph.services import p2p
Expand Down Expand Up @@ -151,7 +152,10 @@ async def main(args: List[str]) -> None:
)
cron_job = CronJob(
session_factory=session_factory,
jobs={"balance": BalanceCronJob(session_factory=session_factory)},
jobs={
"balance": BalanceCronJob(session_factory=session_factory),
"credit_balance": CreditBalanceCronJob(session_factory=session_factory),
},
)
chain_data_service = ChainDataService(
session_factory=session_factory,
Expand Down
12 changes: 12 additions & 0 deletions src/aleph/db/accessors/balances.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,15 @@ def update_credit_balances_airdrop(

# Drop the temporary table
session.execute("DROP TABLE temp_credit_balances") # type: ignore[arg-type]


def get_updated_credit_balance_accounts(session: DbSession, last_update: dt.datetime):
"""
Get addresses that have had their credit balances updated since the given timestamp.
"""
select_stmt = (
select(AlephCreditBalanceDb.address)
.where(AlephCreditBalanceDb.last_update >= last_update)
.distinct()
)
return session.execute(select_stmt).scalars().all()
27 changes: 17 additions & 10 deletions src/aleph/db/accessors/cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ def get_total_cost_for_address(
address: str,
payment_type: Optional[PaymentType] = PaymentType.hold,
) -> Decimal:
total_prop = (
AccountCostsDb.cost_hold
if payment_type == PaymentType.hold
else AccountCostsDb.cost_stream
)
if payment_type == PaymentType.hold:
total_prop = AccountCostsDb.cost_hold
elif payment_type == PaymentType.superfluid:
total_prop = AccountCostsDb.cost_stream
elif payment_type == PaymentType.credit:
total_prop = AccountCostsDb.cost_credit
else:
total_prop = AccountCostsDb.cost_hold

select_stmt = (
select(func.sum(total_prop))
Expand All @@ -43,11 +46,14 @@ def get_total_costs_for_address_grouped_by_message(
address: str,
payment_type: Optional[PaymentType] = PaymentType.hold,
):
total_prop = (
AccountCostsDb.cost_hold
if payment_type == PaymentType.hold
else AccountCostsDb.cost_stream
)
if payment_type == PaymentType.hold:
total_prop = AccountCostsDb.cost_hold
elif payment_type == PaymentType.superfluid:
total_prop = AccountCostsDb.cost_stream
elif payment_type == PaymentType.credit:
total_prop = AccountCostsDb.cost_credit
else:
total_prop = AccountCostsDb.cost_hold

id_field = func.min(AccountCostsDb.id)

Expand Down Expand Up @@ -94,6 +100,7 @@ def make_costs_upsert_query(costs: List[AccountCostsDb]) -> Insert:
set_={
"cost_hold": upsert_stmt.excluded.cost_hold,
"cost_stream": upsert_stmt.excluded.cost_stream,
"cost_credit": upsert_stmt.excluded.cost_credit,
},
)

Expand Down
1 change: 1 addition & 0 deletions src/aleph/db/models/account_costs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ class AccountCostsDb(Base):
payment_type: PaymentType = Column(ChoiceType(PaymentType), nullable=False)
cost_hold: Decimal = Column(DECIMAL, nullable=False)
cost_stream: Decimal = Column(DECIMAL, nullable=False)
cost_credit: Decimal = Column(DECIMAL, nullable=False, default=0)

__table_args__ = (UniqueConstraint("owner", "item_hash", "type", "name"),)
49 changes: 22 additions & 27 deletions src/aleph/handlers/content/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from aleph_message.models import ItemHash, ItemType, StoreContent

from aleph.config import get_config
from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.cost import get_total_cost_for_address
from aleph.db.accessors.files import (
delete_file_pin,
get_file_tag,
Expand All @@ -33,7 +31,12 @@
from aleph.exceptions import AlephStorageException, UnknownHashError
from aleph.handlers.content.content_handler import ContentHandler
from aleph.schemas.cost_estimation_messages import CostEstimationStoreContent
from aleph.services.cost import calculate_storage_size, get_total_and_detailed_costs
from aleph.services.cost import (
calculate_storage_size,
get_payment_type,
get_total_and_detailed_costs,
)
from aleph.services.cost_validation import validate_balance_for_payment
from aleph.storage import StorageService
from aleph.toolkit.constants import MAX_UNAUTHENTICATED_UPLOAD_FILE_SIZE, MiB
from aleph.toolkit.costs import are_store_and_program_free
Expand All @@ -42,7 +45,6 @@
from aleph.types.files import FileType
from aleph.types.message_status import (
FileUnavailable,
InsufficientBalanceException,
InvalidMessageFormat,
PermissionDenied,
StoreCannotUpdateStoreWithRef,
Expand Down Expand Up @@ -190,11 +192,6 @@ async def pre_check_balance(self, session: DbSession, message: MessageDb):
config = get_config()
ipfs_enabled = config.ipfs.enabled.value

current_balance = get_total_balance(session=session, address=content.address)
current_cost = get_total_cost_for_address(
session=session, address=content.address
)

engine = content.item_type
# Initially only do that balance pre-check for ipfs files.
if engine == ItemType.ipfs and ipfs_enabled:
Expand Down Expand Up @@ -229,13 +226,14 @@ async def pre_check_balance(self, session: DbSession, message: MessageDb):
else:
message_cost = Decimal(0)

required_balance = current_cost + message_cost

if current_balance < required_balance:
raise InsufficientBalanceException(
balance=current_balance,
required_balance=required_balance,
)
# Use reusable validation for all payment types including credit
payment_type = get_payment_type(content)
validate_balance_for_payment(
session=session,
address=content.address,
message_cost=message_cost,
payment_type=payment_type,
)

return None

Expand All @@ -258,18 +256,15 @@ async def check_balance(
):
return costs

current_balance = get_total_balance(address=content.address, session=session)
current_cost = get_total_cost_for_address(
session=session, address=content.address
)

required_balance = current_cost + message_cost
payment_type = get_payment_type(content)

if current_balance < required_balance:
raise InsufficientBalanceException(
balance=current_balance,
required_balance=required_balance,
)
# Use reusable validation for all payment types
validate_balance_for_payment(
session=session,
address=content.address,
message_cost=message_cost,
payment_type=payment_type,
)

return costs

Expand Down
31 changes: 13 additions & 18 deletions src/aleph/handlers/content/vm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
PersistentVolume,
)

from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.cost import get_total_cost_for_address
from aleph.db.accessors.files import (
find_file_pins,
find_file_tags,
Expand Down Expand Up @@ -51,13 +49,13 @@
)
from aleph.db.models.account_costs import AccountCostsDb
from aleph.handlers.content.content_handler import ContentHandler
from aleph.services.cost import get_total_and_detailed_costs
from aleph.services.cost import get_payment_type, get_total_and_detailed_costs
from aleph.services.cost_validation import validate_balance_for_payment
from aleph.toolkit.costs import are_store_and_program_free
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.db_session import DbSession
from aleph.types.files import FileTag
from aleph.types.message_status import (
InsufficientBalanceException,
InternalError,
InvalidMessageFormat,
VmCannotUpdateUpdate,
Expand Down Expand Up @@ -352,22 +350,19 @@ async def check_balance(
):
return costs

# NOTE: For now allow to create anything that is being paid with STREAM for free, but generate a cost depending on the content.payment prop (HOLD / STREAM)
if content.payment and content.payment.is_stream:
return costs

# NOTE: Instances and persistent Programs being paid by HOLD are the only ones being checked for now
current_balance = get_total_balance(address=content.address, session=session)
current_cost = get_total_cost_for_address(
session=session, address=content.address
)
payment_type = get_payment_type(content)

required_balance = current_cost + message_cost
# NOTE: For now allow to create anything that is being paid with STREAM for free, but generate a cost depending on the content.payment prop (HOLD / STREAM / CREDIT)
if payment_type == PaymentType.superfluid:
return costs

if current_balance < required_balance:
raise InsufficientBalanceException(
balance=current_balance,
required_balance=required_balance,
# Handle credit and other payment types using reusable validation
if payment_type in [PaymentType.credit, PaymentType.hold]:
validate_balance_for_payment(
session=session,
address=content.address,
message_cost=message_cost,
payment_type=payment_type,
)

return costs
Expand Down
Loading
Loading