Skip to content

Commit 81c13d4

Browse files
authored
feat(taskworker): Make ingest errors taskworker compatible (#90340)
This work is required to migrate tasks from celery to the new taskbroker system. The sentry option will be used to control the rollout of these tasks. The full migration plan is describe in this [document](https://www.notion.so/sentry/Rollout-Planning-1bd8b10e4b5d80aeaaa7dba0efca83bc).
1 parent b4d35ea commit 81c13d4

File tree

6 files changed

+53
-17
lines changed

6 files changed

+53
-17
lines changed

src/sentry/conf/server.py

+15-7
Original file line numberDiff line numberDiff line change
@@ -1406,6 +1406,11 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
14061406
"sentry.deletions.tasks.hybrid_cloud",
14071407
"sentry.deletions.tasks.scheduled",
14081408
"sentry.demo_mode.tasks",
1409+
"sentry.dynamic_sampling.tasks.boost_low_volume_projects",
1410+
"sentry.dynamic_sampling.tasks.boost_low_volume_transactions",
1411+
"sentry.dynamic_sampling.tasks.custom_rule_notifications",
1412+
"sentry.dynamic_sampling.tasks.recalibrate_orgs",
1413+
"sentry.dynamic_sampling.tasks.sliding_window_org",
14091414
"sentry.hybridcloud.tasks.deliver_from_outbox",
14101415
"sentry.hybridcloud.tasks.deliver_webhooks",
14111416
"sentry.incidents.tasks",
@@ -1453,6 +1458,7 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
14531458
"sentry.tasks.auto_source_code_config",
14541459
"sentry.tasks.autofix",
14551460
"sentry.tasks.beacon",
1461+
"sentry.tasks.check_am2_compatibility",
14561462
"sentry.tasks.check_new_issue_threshold_met",
14571463
"sentry.tasks.clear_expired_resolutions",
14581464
"sentry.tasks.clear_expired_rulesnoozes",
@@ -1465,14 +1471,15 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
14651471
"sentry.tasks.delete_seer_grouping_records",
14661472
"sentry.tasks.digests",
14671473
"sentry.tasks.email",
1468-
"sentry.tasks.relay",
14691474
"sentry.tasks.embeddings_grouping.backfill_seer_grouping_records_for_project",
14701475
"sentry.tasks.groupowner",
14711476
"sentry.tasks.merge",
14721477
"sentry.tasks.on_demand_metrics",
14731478
"sentry.tasks.options",
14741479
"sentry.tasks.ping",
1480+
"sentry.tasks.post_process",
14751481
"sentry.tasks.process_buffer",
1482+
"sentry.tasks.relay",
14761483
"sentry.tasks.release_registry",
14771484
"sentry.tasks.repository",
14781485
"sentry.tasks.reprocessing2",
@@ -1490,12 +1497,6 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
14901497
"sentry.uptime.rdap.tasks",
14911498
"sentry.uptime.subscriptions.tasks",
14921499
"sentry.workflow_engine.processors.delayed_workflow",
1493-
"sentry.dynamic_sampling.tasks.boost_low_volume_projects",
1494-
"sentry.dynamic_sampling.tasks.recalibrate_orgs",
1495-
"sentry.dynamic_sampling.tasks.custom_rule_notifications",
1496-
"sentry.dynamic_sampling.tasks.sliding_window_org",
1497-
"sentry.dynamic_sampling.tasks.boost_low_volume_transactions",
1498-
"sentry.tasks.check_am2_compatibility",
14991500
# Used for tests
15001501
"sentry.taskworker.tasks.examples",
15011502
)
@@ -1508,6 +1509,13 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
15081509
},
15091510
}
15101511

1512+
TASKWORKER_ENABLE_HIGH_THROUGHPUT_NAMESPACES = False
1513+
TASKWORKER_HIGH_THROUGHPUT_NAMESPACES = {
1514+
"ingest.profiling",
1515+
"ingest.transactions",
1516+
"ingest.errors",
1517+
}
1518+
15111519
# Sentry logs to two major places: stdout, and its internal project.
15121520
# To disable logging to the internal project, add a logger whose only
15131521
# handler is 'console' and disable propagating upwards.

src/sentry/options/defaults.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -3291,7 +3291,11 @@
32913291
default={},
32923292
flags=FLAG_AUTOMATOR_MODIFIABLE,
32933293
)
3294-
3294+
register(
3295+
"taskworker.ingest.errors.rollout",
3296+
default={},
3297+
flags=FLAG_AUTOMATOR_MODIFIABLE,
3298+
)
32953299
# Orgs for which compression should be disabled in the chunk upload endpoint.
32963300
# This is intended to circumvent sporadic 503 errors reported by some customers.
32973301
register("chunk-upload.no-compression", default=[], flags=FLAG_AUTOMATOR_MODIFIABLE)

src/sentry/tasks/base.py

+12-8
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,18 @@ def taskworker_override(
7575
def override(*args: P.args, **kwargs: P.kwargs) -> R:
7676
rollout_rate = 0
7777
option_flag = f"taskworker.{namespace}.rollout"
78-
rollout_map = options.get(option_flag)
79-
if rollout_map:
80-
if task_name in rollout_map:
81-
rollout_rate = rollout_map.get(task_name, 0)
82-
elif "*" in rollout_map:
83-
rollout_rate = rollout_map.get("*", 0)
84-
85-
random.seed(datetime.now().timestamp())
78+
check_option = True
79+
if namespace in settings.TASKWORKER_HIGH_THROUGHPUT_NAMESPACES:
80+
check_option = settings.TASKWORKER_ENABLE_HIGH_THROUGHPUT_NAMESPACES
81+
82+
if check_option:
83+
rollout_map = options.get(option_flag)
84+
if rollout_map:
85+
if task_name in rollout_map:
86+
rollout_rate = rollout_map.get(task_name, 0)
87+
elif "*" in rollout_map:
88+
rollout_rate = rollout_map.get("*", 0)
89+
8690
if rollout_rate > random.random():
8791
return taskworker_attr(*args, **kwargs)
8892

src/sentry/tasks/post_process.py

+6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
from sentry.signals import event_processed, issue_unignored
2626
from sentry.silo.base import SiloMode
2727
from sentry.tasks.base import instrumented_task
28+
from sentry.taskworker.config import TaskworkerConfig
29+
from sentry.taskworker.namespaces import ingest_errors_tasks
2830
from sentry.types.group import GroupSubStatus
2931
from sentry.utils import json, metrics
3032
from sentry.utils.cache import cache
@@ -483,6 +485,10 @@ def should_update_escalating_metrics(event: Event) -> bool:
483485
time_limit=120,
484486
soft_time_limit=110,
485487
silo_mode=SiloMode.REGION,
488+
taskworker_config=TaskworkerConfig(
489+
namespace=ingest_errors_tasks,
490+
processing_deadline_duration=120,
491+
),
486492
)
487493
def post_process_group(
488494
is_new,

src/sentry/tasks/store.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces
2727
from sentry.tasks.base import instrumented_task
2828
from sentry.taskworker.config import TaskworkerConfig
29-
from sentry.taskworker.namespaces import issues_tasks
29+
from sentry.taskworker.namespaces import ingest_errors_tasks, issues_tasks
3030
from sentry.utils import metrics
3131
from sentry.utils.event_tracker import TransactionStageStatus, track_sampled_event
3232
from sentry.utils.safe import safe_execute
@@ -228,6 +228,10 @@ def _do_preprocess_event(
228228
time_limit=65,
229229
soft_time_limit=60,
230230
silo_mode=SiloMode.REGION,
231+
taskworker_config=TaskworkerConfig(
232+
namespace=ingest_errors_tasks,
233+
processing_deadline_duration=65,
234+
),
231235
)
232236
def preprocess_event(
233237
cache_key: str,
@@ -438,6 +442,10 @@ def _continue_to_save_event() -> None:
438442
time_limit=65,
439443
soft_time_limit=60,
440444
silo_mode=SiloMode.REGION,
445+
taskworker_config=TaskworkerConfig(
446+
namespace=ingest_errors_tasks,
447+
processing_deadline_duration=65,
448+
),
441449
)
442450
def process_event(
443451
cache_key: str,
@@ -632,6 +640,10 @@ def _do_save_event(
632640
time_limit=65,
633641
soft_time_limit=60,
634642
silo_mode=SiloMode.REGION,
643+
taskworker_config=TaskworkerConfig(
644+
namespace=ingest_errors_tasks,
645+
processing_deadline_duration=65,
646+
),
635647
)
636648
def save_event(
637649
cache_key: str | None = None,

src/sentry/taskworker/namespaces.py

+2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
app_feature="profiles",
5858
)
5959

60+
ingest_errors_tasks = taskregistry.create_namespace("ingest.errors", app_feature="errors")
61+
6062
issues_tasks = taskregistry.create_namespace("issues", app_feature="issueplatform")
6163

6264
integrations_tasks = taskregistry.create_namespace("integrations", app_feature="integrations")

0 commit comments

Comments
 (0)