Skip to content

fix(issue summary): Add locking mechanism #90347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 72 additions & 26 deletions src/sentry/seer/issue_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sentry.autofix.utils import get_autofix_state
from sentry.constants import ObjectStatus
from sentry.eventstore.models import Event, GroupEvent
from sentry.locks import locks
from sentry.models.group import Group
from sentry.models.project import Project
from sentry.seer.autofix import trigger_autofix
Expand All @@ -29,6 +30,7 @@
from sentry.users.services.user.model import RpcUser
from sentry.users.services.user.service import user_service
from sentry.utils.cache import cache
from sentry.utils.locking import UnableToAcquireLock

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -230,33 +232,14 @@ def _get_trace_connected_issues(event: GroupEvent) -> list[Group]:
return connected_issues


def get_issue_summary(
def _generate_summary(
group: Group,
user: User | RpcUser | AnonymousUser | None = None,
force_event_id: str | None = None,
source: str = "issue_details",
user: User | RpcUser | AnonymousUser,
force_event_id: str | None,
source: str,
cache_key: str,
) -> tuple[dict[str, Any], int]:
"""
Generate an AI summary for an issue.

Args:
group: The issue group
user: The user requesting the summary
force_event_id: Optional event ID to force summarizing a specific event
source: The source triggering the summary generation

Returns:
A tuple containing (summary_data, status_code)
"""
if user is None:
user = AnonymousUser()
if not features.has("organizations:gen-ai-features", group.organization, actor=user):
return {"detail": "Feature flag not enabled"}, 400

cache_key = "ai-group-summary-v2:" + str(group.id)
if not force_event_id and (cached_summary := cache.get(cache_key)):
return convert_dict_key_case(cached_summary, snake_to_camel_case), 200

"""Core logic to generate and cache the issue summary."""
serialized_event, event = _get_event(group, user, provided_event_id=force_event_id)

if not serialized_event or not event:
Expand Down Expand Up @@ -314,4 +297,67 @@ def get_issue_summary(

cache.set(cache_key, summary_dict, timeout=int(timedelta(days=7).total_seconds()))

return convert_dict_key_case(summary_dict, snake_to_camel_case), 200
return summary_dict, 200


def get_issue_summary(
group: Group,
user: User | RpcUser | AnonymousUser | None = None,
force_event_id: str | None = None,
source: str = "issue_details",
) -> tuple[dict[str, Any], int]:
"""
Generate an AI summary for an issue.

Args:
group: The issue group
user: The user requesting the summary
force_event_id: Optional event ID to force summarizing a specific event
source: The source triggering the summary generation

Returns:
A tuple containing (summary_data, status_code)
"""
if user is None:
user = AnonymousUser()
if not features.has("organizations:gen-ai-features", group.organization, actor=user):
return {"detail": "Feature flag not enabled"}, 400

cache_key = f"ai-group-summary-v2:{group.id}"
lock_key = f"ai-group-summary-v2-lock:{group.id}"
lock_duration = 10 # How long the lock is held if acquired (seconds)
wait_timeout = 4.5 # How long to wait for the lock (seconds)

# if force_event_id is set, we always generate a new summary
if force_event_id:
summary_dict, status_code = _generate_summary(
group, user, force_event_id, source, cache_key
)
return convert_dict_key_case(summary_dict, snake_to_camel_case), status_code

# 1. Check cache first
if cached_summary := cache.get(cache_key):
return convert_dict_key_case(cached_summary, snake_to_camel_case), 200

# 2. Try to acquire lock
try:
# Acquire lock context manager. This will poll and wait.
with locks.get(
key=lock_key, duration=lock_duration, name="get_issue_summary"
).blocking_acquire(initial_delay=0.25, timeout=wait_timeout):
# Re-check cache after acquiring lock, in case another process finished
# while we were waiting for the lock.
if cached_summary := cache.get(cache_key):
return convert_dict_key_case(cached_summary, snake_to_camel_case), 200

# Lock acquired and cache is still empty, proceed with generation
summary_dict, status_code = _generate_summary(
group, user, force_event_id, source, cache_key
)
return convert_dict_key_case(summary_dict, snake_to_camel_case), status_code

except UnableToAcquireLock:
# Failed to acquire lock within timeout. Check cache one last time.
if cached_summary := cache.get(cache_key):
return convert_dict_key_case(cached_summary, snake_to_camel_case), 200
return {"detail": "Timeout waiting for summary generation lock"}, 503
114 changes: 114 additions & 0 deletions tests/sentry/seer/test_issue_summary.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import threading
import time
from typing import Any
from unittest.mock import ANY, Mock, call, patch

import orjson

from sentry.api.serializers.rest_framework.base import convert_dict_key_case, snake_to_camel_case
from sentry.locks import locks
from sentry.models.group import Group
from sentry.seer.issue_summary import (
_call_seer,
Expand All @@ -17,6 +20,7 @@
from sentry.testutils.helpers.features import apply_feature_flag_on_cls
from sentry.testutils.skips import requires_snuba
from sentry.utils.cache import cache
from sentry.utils.locking import UnableToAcquireLock

pytestmark = [requires_snuba]

Expand Down Expand Up @@ -407,3 +411,113 @@ def test_get_event_provided(self, mock_serialize, mock_get_event_by_id):
]
)
mock_serialize.assert_called_once()

@patch("sentry.seer.issue_summary._generate_summary")
def test_get_issue_summary_concurrent_wait_for_lock(self, mock_generate_summary):
"""Test that a second request waits for the lock and reads from cache."""
cache_key = f"ai-group-summary-v2:{self.group.id}"

# Mock summary generation to take time and cache the result
generated_summary = {"headline": "Generated Summary", "event_id": "gen_event"}
cache_key = f"ai-group-summary-v2:{self.group.id}"

def side_effect_generate(*args, **kwargs):
# Simulate work
time.sleep(0.3)
# Write to cache before returning (simulates behavior after lock release)
cache.set(cache_key, generated_summary, timeout=60)
return generated_summary, 200

mock_generate_summary.side_effect = side_effect_generate

results = {}
exceptions = {}

def target(req_id):
try:
summary_data, status_code = get_issue_summary(self.group, self.user)
results[req_id] = (summary_data, status_code)
except Exception as e:
exceptions[req_id] = e

# Start two threads concurrently
thread1 = threading.Thread(target=target, args=(1,))
thread2 = threading.Thread(target=target, args=(2,))

thread1.start()
# Give thread1 a slight head start, but the lock should handle the race
time.sleep(0.01)
thread2.start()

# Wait for both threads to complete
thread1.join(timeout=5)
thread2.join(timeout=5)

# Assertions
if exceptions:
raise AssertionError(f"Threads raised exceptions: {exceptions}")

assert 1 in results, "Thread 1 did not complete in time"
assert 2 in results, "Thread 2 did not complete in time"

# Both should succeed and get the same summary
assert results[1][1] == 200, f"Thread 1 failed with status {results[1][1]}"
assert results[2][1] == 200, f"Thread 2 failed with status {results[2][1]}"
expected_result = convert_dict_key_case(generated_summary, snake_to_camel_case)
assert results[1][0] == expected_result, "Thread 1 returned wrong summary"
assert results[2][0] == expected_result, "Thread 2 returned wrong summary"

# Check that _generate_summary was only called once
# (by the thread that acquired the lock)
mock_generate_summary.assert_called_once()

# Ensure the cache contains the final result
assert cache.get(cache_key) == generated_summary

@patch("sentry.seer.issue_summary._generate_summary")
def test_get_issue_summary_concurrent_force_event_id_bypasses_lock(self, mock_generate_summary):
"""Test that force_event_id bypasses lock waiting."""
# Mock summary generation
forced_summary = {"headline": "Forced Summary", "event_id": "force_event"}
mock_generate_summary.return_value = (forced_summary, 200)

# Ensure cache is empty and lock *could* be acquired if attempted
cache_key = f"ai-group-summary-v2:{self.group.id}"
lock_key = f"ai-group-summary-v2-lock:{self.group.id}"
cache.delete(cache_key)

locks.get(lock_key, duration=1).release() # Ensure lock isn't held

# Call with force_event_id=True
summary_data, status_code = get_issue_summary(
self.group, self.user, force_event_id="some_event"
)

assert status_code == 200
assert summary_data == convert_dict_key_case(forced_summary, snake_to_camel_case)

# Ensure generation was called directly
mock_generate_summary.assert_called_once()

@patch("sentry.seer.issue_summary.cache.get")
@patch("sentry.seer.issue_summary._generate_summary")
@patch("sentry.utils.locking.lock.Lock.blocking_acquire")
def test_get_issue_summary_lock_timeout(
self, mock_blocking_acquire, mock_generate_summary_core, mock_cache_get
):
"""Test that a timeout waiting for the lock returns 503."""
# Simulate lock acquisition always failing with the specific exception
mock_blocking_acquire.side_effect = UnableToAcquireLock
# Simulate cache miss even after timeout
mock_cache_get.return_value = None

summary_data, status_code = get_issue_summary(self.group, self.user)

assert status_code == 503
assert summary_data == {"detail": "Timeout waiting for summary generation lock"}
# Ensure lock acquisition was attempted
mock_blocking_acquire.assert_called_once()
# Ensure generation was NOT called
mock_generate_summary_core.assert_not_called()
# Ensure cache was checked twice (once initially, once after lock failure)
assert mock_cache_get.call_count == 2
Loading