Skip to content

Commit c382a67

Browse files
authored
Fixed releasing of non locked partitions (#21650)
2 parents fc3aa0d + 4ead9a6 commit c382a67

File tree

1 file changed

+24
-4
lines changed

1 file changed

+24
-4
lines changed

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1125,14 +1125,22 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvLockPartition::TPtr& ev, const T
11251125
ctx.Send(actorId, new TEvPQProxy::TEvLockPartition(0, 0, false, !ClientsideLocksAllowed));
11261126
}
11271127

1128-
void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext&) {
1128+
void TReadSessionActor::Handle(TEvPQProxy::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
11291129
if (!ActualPartitionActor(ev->Sender))
11301130
return;
11311131

11321132
auto& evTopic = ev->Get()->Topic;
11331133
auto it = Partitions.find(std::make_pair(evTopic->GetClientsideName(), ev->Get()->Partition));
11341134
Y_ABORT_UNLESS(it != Partitions.end());
1135-
Y_ABORT_UNLESS(it->second.LockGeneration);
1135+
if (!it->second.LockGeneration) {
1136+
LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY,
1137+
PQ_LOG_PREFIX << " the unlocked partition " << ev->Get()->Partition << " status has been requested");
1138+
CloseSession(
1139+
TStringBuilder() << "Internal server error, the unlocked partition " << ev->Get()->Partition << " status has been requested",
1140+
NPersQueue::NErrorCode::ERROR, ctx
1141+
);
1142+
return;
1143+
}
11361144

11371145
if (it->second.Releasing) //lock request for already released partition - ignore
11381146
return;
@@ -1219,7 +1227,17 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons
12191227
return;
12201228
}
12211229

1222-
Y_ABORT_UNLESS(!Partitions.empty());
1230+
auto onUnknownPartition = [&](auto& marker) {
1231+
LOG_ALERT_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Releasing unknown partition: " << record.ShortDebugString() << " " << marker);
1232+
CloseSession(
1233+
TStringBuilder() << "Internal server error, releasing unknown partition: " << record.ShortDebugString() << " " << marker,
1234+
NPersQueue::NErrorCode::ERROR, ctx
1235+
);
1236+
};
1237+
1238+
if (Partitions.empty()) {
1239+
return onUnknownPartition("#PQv0.01");
1240+
}
12231241

12241242
TActorId actorId = TActorId{};
12251243
auto jt = Partitions.begin();
@@ -1233,7 +1251,9 @@ void TReadSessionActor::Handle(TEvPersQueue::TEvReleasePartition::TPtr& ev, cons
12331251
}
12341252
}
12351253
}
1236-
Y_ABORT_UNLESS(actorId);
1254+
if (!actorId) {
1255+
return onUnknownPartition("#PQv0.02");
1256+
}
12371257

12381258
{
12391259
auto it = TopicCounters.find(name);

0 commit comments

Comments
 (0)