Skip to content

Commit 2dffb99

Browse files
authored
Make TQueueInplace and TOneOneQueueInplace usable without an extra pointer (#21488)
1 parent 674119e commit 2dffb99

File tree

14 files changed

+381
-210
lines changed

14 files changed

+381
-210
lines changed

ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,6 @@ namespace NKikimr {
108108
std::shared_ptr<TVDiskSkeletonTrace> Trace;
109109
ui64 InternalMessageId;
110110

111-
TRecord() = default;
112-
113111
TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
114112
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
115113
const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
@@ -161,7 +159,7 @@ namespace NKikimr {
161159

162160
private:
163161
const TString VDiskLogPrefix;
164-
std::unique_ptr<TQueueType, TQueueType::TCleanDestructor> Queue;
162+
TQueueType Queue;
165163
ui64 InFlightCount;
166164
ui64 InFlightCost;
167165
ui64 InFlightBytes;
@@ -203,7 +201,6 @@ namespace NKikimr {
203201
ui64 maxInFlightCost,
204202
TIntrusivePtr<::NMonitoring::TDynamicCounters> skeletonFrontGroup)
205203
: VDiskLogPrefix(logPrefix)
206-
, Queue(new TQueueType())
207204
, InFlightCount(0)
208205
, InFlightCost(0)
209206
, InFlightBytes(0)
@@ -231,7 +228,7 @@ namespace NKikimr {
231228
}
232229

233230
ui64 GetSize() const {
234-
return Queue->GetSize();
231+
return Queue.GetSize();
235232
}
236233

237234
template<typename TFront>
@@ -240,7 +237,7 @@ namespace NKikimr {
240237
NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& /*front*/,
241238
const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
242239
ui64 internalMessageId) {
243-
if (!Queue->Head() && CanSendToSkeleton(cost)) {
240+
if (!Queue.Head() && CanSendToSkeleton(cost)) {
244241
// send to Skeleton for further processing
245242
ctx.Send(converted.release());
246243
++InFlightCount;
@@ -262,8 +259,8 @@ namespace NKikimr {
262259
*SkeletonFrontDelayedBytes += recByteSize;
263260

264261
TInstant now = TAppData::TimeProvider->Now();
265-
Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
266-
clientId, Name, std::move(trace), internalMessageId));
262+
Queue.Emplace(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
263+
clientId, Name, std::move(trace), internalMessageId);
267264
}
268265
}
269266

@@ -276,7 +273,7 @@ namespace NKikimr {
276273
template <class TFront>
277274
void ProcessNext(const TActorContext &ctx, TFront &front, bool forceError) {
278275
// we can send next element to Skeleton if any
279-
while (TRecord *rec = Queue->Head()) {
276+
while (TRecord *rec = Queue.Head()) {
280277
const ui64 cost = rec->Cost;
281278
if (CanSendToSkeleton(cost) || forceError) {
282279
ui32 recByteSize = rec->ByteSize;
@@ -314,7 +311,7 @@ namespace NKikimr {
314311
Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
315312
UpdateState();
316313
}
317-
Queue->Pop();
314+
Queue.Pop();
318315
} else {
319316
break; // stop sending requests to skeleton
320317
}

ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,6 @@ namespace NKikimr {
5555
{}
5656

5757
~TEmergencyQueue() {
58-
while (Queue.Head()) {
59-
Queue.Pop();
60-
}
6158
}
6259

6360
void Push(TEvBlobStorage::TEvVMovedPatch::TPtr ev) {

ydb/core/tablet/tablet_pipe_client.cpp

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
#include <ydb/core/base/hive.h>
1010
#include <ydb/core/base/domain.h>
1111
#include <ydb/core/base/appdata.h>
12-
#include <ydb/library/actors/util/queue_oneone_inplace.h>
12+
#include <ydb/core/util/queue_inplace.h>
1313
#include <library/cpp/random_provider/random_provider.h>
1414

1515

@@ -40,7 +40,6 @@ namespace NTabletPipe {
4040
, TabletId(tabletId)
4141
, Config(config)
4242
, IsShutdown(false)
43-
, PayloadQueue(new TPayloadQueue())
4443
, Leader(true)
4544
{
4645
Y_ABORT_UNLESS(tabletId != 0);
@@ -148,7 +147,7 @@ namespace NTabletPipe {
148147
void HandleSendQueued(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
149148
BLOG_D("queue send");
150149
Y_ABORT_UNLESS(!IsShutdown);
151-
PayloadQueue->Push(ev.Release());
150+
PayloadQueue.Push(std::move(ev));
152151
}
153152

154153
void HandleSend(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
@@ -322,10 +321,11 @@ namespace NTabletPipe {
322321
Leader, false, Generation, std::move(versionInfo)));
323322

324323
BLOG_D("send queued");
325-
while (TAutoPtr<IEventHandle> x = PayloadQueue->Pop())
324+
while (TAutoPtr<IEventHandle> x = PayloadQueue.PopDefault())
326325
Push(ctx, x);
327326

328-
PayloadQueue.Destroy();
327+
// Free buffer memory
328+
PayloadQueue.Clear();
329329

330330
if (IsShutdown) {
331331
BLOG_D("shutdown pipe due to pending shutdown request");
@@ -719,8 +719,7 @@ namespace NTabletPipe {
719719
TActorId InterconnectProxyId;
720720
TActorId InterconnectSessionId;
721721
TActorId ServerId;
722-
typedef TOneOneQueueInplace<IEventHandle*, 32> TPayloadQueue;
723-
TAutoPtr<TPayloadQueue, TPayloadQueue::TPtrCleanDestructor> PayloadQueue;
722+
TQueueInplace<TAutoPtr<IEventHandle>, 32> PayloadQueue;
724723
TClientRetryState RetryState;
725724
bool Leader;
726725
ui64 Generation = 0;

ydb/core/tablet/tablet_resolver.cpp

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include <ydb/library/actors/core/actor_bootstrapped.h>
1212
#include <ydb/library/actors/interconnect/interconnect.h>
1313
#include <ydb/core/util/cache.h>
14-
#include <ydb/core/util/queue_oneone_inplace.h>
14+
#include <ydb/core/util/queue_inplace.h>
1515
#include <util/generic/map.h>
1616
#include <util/generic/deque.h>
1717
#include <library/cpp/random_provider/random_provider.h>
@@ -94,17 +94,15 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
9494
TInstant AddInstant;
9595
TEvTabletResolver::TEvForward::TPtr Ev;
9696

97-
TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr &ev)
97+
TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr&& ev)
9898
: AddInstant(instant)
99-
, Ev(ev)
99+
, Ev(std::move(ev))
100100
{}
101101
};
102102

103-
typedef TOneOneQueueInplace<TQueueEntry *, 64> TQueueType;
104-
105103
EState State = StInit;
106104

107-
TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
105+
TQueueInplace<TQueueEntry, 128> Queue;
108106
TActorId KnownLeader;
109107
TActorId KnownLeaderTablet;
110108

@@ -200,9 +198,7 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
200198
}
201199

202200
bool PushQueue(TEvTabletResolver::TEvForward::TPtr &ev, TEntry &entry, const TActorContext &ctx) {
203-
if (!entry.Queue)
204-
entry.Queue.Reset(new TEntry::TQueueType());
205-
entry.Queue->Push(new TEntry::TQueueEntry(ctx.Now(), ev));
201+
entry.Queue.Emplace(ctx.Now(), std::move(ev));
206202
return true;
207203
}
208204

@@ -324,14 +320,15 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
324320
}
325321

326322
void SendQueued(ui64 tabletId, TEntry &entry, const TActorContext &ctx) {
327-
if (TEntry::TQueueType *queue = entry.Queue.Get()) {
328-
for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
329-
TEvTabletResolver::TEvForward *msg = x->Ev->Get();
330-
if (!SendForward(x->Ev->Sender, entry, msg, ctx))
331-
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
323+
while (TEntry::TQueueEntry* x = entry.Queue.Head()) {
324+
TEvTabletResolver::TEvForward *msg = x->Ev->Get();
325+
if (!SendForward(x->Ev->Sender, entry, msg, ctx)) {
326+
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
332327
}
333-
entry.Queue.Destroy();
328+
entry.Queue.Pop();
334329
}
330+
// Free buffer memory
331+
entry.Queue.Clear();
335332
}
336333

337334
void SendPing(ui64 tabletId, TEntry &entry, const TActorContext &ctx) {
@@ -359,10 +356,9 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
359356
LOG_DEBUG(ctx, NKikimrServices::TABLET_RESOLVER,
360357
"DropEntry tabletId: %" PRIu64 " followers: %" PRIu64,
361358
tabletId, entry.KnownFollowers.size());
362-
if (TEntry::TQueueType *queue = entry.Queue.Get()) {
363-
for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
364-
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
365-
}
359+
while (TEntry::TQueueEntry* x = entry.Queue.Head()) {
360+
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
361+
entry.Queue.Pop();
366362
}
367363
ResolvedTablets.Erase(tabletId);
368364
UnresolvedTablets.Erase(tabletId);
@@ -840,10 +836,10 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
840836
if (!value)
841837
return;
842838

843-
if (TEntry::TQueueType *queue = value->Queue.Get()) {
844-
for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
845-
ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key));
846-
}
839+
auto& queue = value->Queue;
840+
while (TEntry::TQueueEntry* x = queue.Head()) {
841+
ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key));
842+
queue.Pop();
847843
}
848844
});
849845
}

ydb/core/tablet_flat/logic_redo_entry.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ namespace NRedo {
1111

1212
struct TEntry {
1313
template<typename ... Args>
14-
static TEntry* Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
14+
static std::unique_ptr<TEntry> Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
1515
{
16-
auto *ptr = malloc(sizeof(TEntry) + affects.size() * sizeof(ui32));
16+
void* ptr = ::operator new(sizeof(TEntry) + affects.size() * sizeof(ui32));
1717

18-
return ::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...);
18+
return std::unique_ptr<TEntry>(::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...));
1919
}
2020

21-
void operator delete (void *p)
21+
void operator delete(void* p)
2222
{
23-
free(p);
23+
::operator delete(p);
2424
}
2525

2626
void Describe(IOutputStream &out) const

ydb/core/tablet_flat/logic_redo_queue.h

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ namespace NRedo {
1919
struct TQueue {
2020
using TStamp = NTable::TTxStamp;
2121
using TAffects = TArrayRef<const ui32>;
22+
using TLog = TQueueInplace<std::unique_ptr<TEntry>, 4096>;
2223

2324
TQueue(THashMap<ui32, NTable::TSnapEdge> edges)
24-
: Log(new TLog)
25-
, Edges(std::move(edges))
25+
: Edges(std::move(edges))
2626
{
2727

2828
}
@@ -45,13 +45,14 @@ namespace NRedo {
4545
<< " (" << Memory << " mem" << ", " << LargeGlobIdsBytes << " raw)b }";
4646
}
4747

48-
void Push(TEntry *entry)
48+
void Push(std::unique_ptr<TEntry>&& entryPtr)
4949
{
50+
TEntry* entry = entryPtr.get();
5051
if (bool(entry->Embedded) == bool(entry->LargeGlobId)) {
5152
Y_TABLET_ERROR(NFmt::Do(*entry) << " has incorrect payload");
5253
}
5354

54-
Log->Push(entry);
55+
Log.Push(std::move(entryPtr));
5556

5657
Items++;
5758
Memory += entry->BytesMem();
@@ -92,15 +93,15 @@ namespace NRedo {
9293
for (auto &it : Overhead)
9394
it.second.Clear();
9495

95-
auto was = std::exchange(Log, new TLog);
96+
TLog was = std::exchange(Log, TLog{});
9697

9798
Items = 0;
9899
Memory = 0;
99100
LargeGlobIdsBytes = 0;
100101

101102
auto logos = snap.MutableNonSnapLogBodies();
102103

103-
while (TAutoPtr<TEntry> entry = was->Pop()) {
104+
while (auto entry = was.PopDefault()) {
104105
if (entry->FilterTables(Edges)) {
105106
for (const auto& blobId : entry->LargeGlobId.Blobs()) {
106107
LogoBlobIDFromLogoBlobID(blobId, logos->Add());
@@ -116,7 +117,7 @@ namespace NRedo {
116117

117118
entry->References = 0;
118119

119-
Push(entry.Release());
120+
Push(std::move(entry));
120121
} else {
121122
Y_ENSURE(entry->References == 0);
122123
}
@@ -134,9 +135,7 @@ namespace NRedo {
134135
return Usage;
135136
}
136137

137-
using TLog = TOneOneQueueInplace<NRedo::TEntry *, 4096>;
138-
139-
TAutoPtr<TLog, TLog::TPtrCleanInplaceMallocDestructor> Log;
138+
TLog Log;
140139
THashMap<ui32, NTable::TSnapEdge> Edges;
141140
THashMap<ui32, TOverhead> Overhead;
142141
TIntrusiveList<TOverhead> Changes;

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2523,7 +2523,7 @@ class TDataShard
25232523

25242524
TTxProgressIdempotentScalarQueue<TEvPrivate::TEvProgressTransaction> PlanQueue;
25252525
TTxProgressIdempotentScalarScheduleQueue<TEvPrivate::TEvCleanupTransaction> CleanupQueue;
2526-
TTxProgressQueue<ui64, TNoOpDestroy, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;
2526+
TTxProgressQueue<ui64, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;
25272527

25282528
struct TPipeServerInfoOverloadSubscribersTag {};
25292529

ydb/core/tx/datashard/progress_queue.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
#pragma once
22
#include "defs.h"
3-
#include <ydb/core/util/queue_oneone_inplace.h>
3+
#include <ydb/core/util/queue_inplace.h>
44

55
namespace NKikimr {
66

7-
template <typename T, typename TDestruct, typename TEvent>
7+
template <typename T, typename TEvent>
88
class TTxProgressQueue {
99
bool HasInFly;
10-
TOneOneQueueInplace<T, 32> Queue;
10+
TQueueInplace<T, 32> Queue;
11+
1112
public:
1213
TTxProgressQueue()
1314
: HasInFly(false)
1415
{}
1516

1617
~TTxProgressQueue() {
17-
while (T head = Queue.Pop())
18-
TDestruct::Destroy(head);
1918
}
2019

2120
void Progress(T x, const TActorContext &ctx) {
@@ -24,13 +23,13 @@ class TTxProgressQueue {
2423
ctx.Send(ctx.SelfID, new TEvent(x));
2524
HasInFly = true;
2625
} else {
27-
Queue.Push(x);
26+
Queue.Push(std::move(x));
2827
}
2928
}
3029

3130
void Reset(const TActorContext &ctx) {
3231
Y_DEBUG_ABORT_UNLESS(HasInFly);
33-
if (T x = Queue.Pop())
32+
if (T x = Queue.PopDefault())
3433
ctx.Send(ctx.SelfID, new TEvent(x));
3534
else
3635
HasInFly = false;

0 commit comments

Comments
 (0)