diff --git a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp index f6aa09ebf0d4..3fc739644067 100644 --- a/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp +++ b/ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp @@ -1125,14 +1125,22 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, 0, false, !ClientsideLocksAllowed)); } -void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext&) { +void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) { if (!ActualPartitionActor(ev->Sender)) return; auto& evTopic = ev->Get()->Topic; auto it = Partitions.find(std::make_pair(evTopic->GetClientsideName(), ev->Get()->Partition)); Y_ABORT_UNLESS(it != Partitions.end()); - Y_ABORT_UNLESS(it->second.LockGeneration); + if (!it->second.LockGeneration) { + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, + PQ_LOG_PREFIX << " the unlocked partition " << ev->Get()->Partition << " status has been requested"); + CloseSession( + TStringBuilder() << "Internal server error, the unlocked partition " << ev->Get()->Partition << " status has been requested", + NPersQueue::NErrorCode::ERROR, ctx + ); + return; + } if (it->second.Releasing) //lock request for already released partition - ignore return; @@ -1219,7 +1227,17 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons return; } - Y_ABORT_UNLESS(!Partitions.empty()); + auto onUnknownPartition = [&](auto& marker) { + LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Releasing unknown partition: " << record.ShortDebugString() << " " << marker); + CloseSession( + TStringBuilder() << "Internal server error, releasing unknown partition: " << record.ShortDebugString() << " " << marker, + NPersQueue::NErrorCode::ERROR, ctx + ); + }; + + if (Partitions.empty()) { + return onUnknownPartition("#PQv0.01"); + } TActorId actorId = TActorId{}; auto jt = Partitions.begin(); @@ -1233,7 +1251,9 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons } } } - Y_ABORT_UNLESS(actorId); + if (!actorId) { + return onUnknownPartition("#PQv0.02"); + } { auto it = TopicCounters.find(name);