Skip to content

Make TQueueInplace and TOneOneQueueInplace usable without an extra pointer #21488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ namespace NKikimr {
std::shared_ptr<TVDiskSkeletonTrace> Trace;
ui64 InternalMessageId;

TRecord() = default;

TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
Expand Down Expand Up @@ -161,7 +159,7 @@ namespace NKikimr {

private:
const TString VDiskLogPrefix;
std::unique_ptr<TQueueType, TQueueType::TCleanDestructor> Queue;
TQueueType Queue;
ui64 InFlightCount;
ui64 InFlightCost;
ui64 InFlightBytes;
Expand Down Expand Up @@ -203,7 +201,6 @@ namespace NKikimr {
ui64 maxInFlightCost,
TIntrusivePtr<::NMonitoring::TDynamicCounters> skeletonFrontGroup)
: VDiskLogPrefix(logPrefix)
, Queue(new TQueueType())
, InFlightCount(0)
, InFlightCost(0)
, InFlightBytes(0)
Expand Down Expand Up @@ -231,7 +228,7 @@ namespace NKikimr {
}

ui64 GetSize() const {
return Queue->GetSize();
return Queue.GetSize();
}

template<typename TFront>
Expand All @@ -240,7 +237,7 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& /*front*/,
const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
ui64 internalMessageId) {
if (!Queue->Head() && CanSendToSkeleton(cost)) {
if (!Queue.Head() && CanSendToSkeleton(cost)) {
// send to Skeleton for further processing
ctx.Send(converted.release());
++InFlightCount;
Expand All @@ -262,8 +259,8 @@ namespace NKikimr {
*SkeletonFrontDelayedBytes += recByteSize;

TInstant now = TAppData::TimeProvider->Now();
Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
clientId, Name, std::move(trace), internalMessageId));
Queue.Emplace(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
clientId, Name, std::move(trace), internalMessageId);
}
}

Expand All @@ -276,7 +273,7 @@ namespace NKikimr {
template <class TFront>
void ProcessNext(const TActorContext &ctx, TFront &front, bool forceError) {
// we can send next element to Skeleton if any
while (TRecord *rec = Queue->Head()) {
while (TRecord *rec = Queue.Head()) {
const ui64 cost = rec->Cost;
if (CanSendToSkeleton(cost) || forceError) {
ui32 recByteSize = rec->ByteSize;
Expand Down Expand Up @@ -314,7 +311,7 @@ namespace NKikimr {
Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
UpdateState();
}
Queue->Pop();
Queue.Pop();
} else {
break; // stop sending requests to skeleton
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ namespace NKikimr {
{}

~TEmergencyQueue() {
while (Queue.Head()) {
Queue.Pop();
}
}

void Push(TEvBlobStorage::TEvVMovedPatch::TPtr ev) {
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tablet_flat/logic_redo_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ namespace NRedo {

struct TEntry {
template<typename ... Args>
static TEntry* Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
static std::unique_ptr<TEntry> Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
{
auto *ptr = malloc(sizeof(TEntry) + affects.size() * sizeof(ui32));
void* ptr = ::operator new(sizeof(TEntry) + affects.size() * sizeof(ui32));

return ::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...);
return std::unique_ptr<TEntry>(::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...));
}

void operator delete (void *p)
void operator delete(void* p)
{
free(p);
::operator delete(p);
}

void Describe(IOutputStream &out) const
Expand Down
19 changes: 9 additions & 10 deletions ydb/core/tablet_flat/logic_redo_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ namespace NRedo {
struct TQueue {
using TStamp = NTable::TTxStamp;
using TAffects = TArrayRef<const ui32>;
using TLog = TQueueInplace<std::unique_ptr<TEntry>, 4096>;

TQueue(THashMap<ui32, NTable::TSnapEdge> edges)
: Log(new TLog)
, Edges(std::move(edges))
: Edges(std::move(edges))
{

}
Expand All @@ -45,13 +45,14 @@ namespace NRedo {
<< " (" << Memory << " mem" << ", " << LargeGlobIdsBytes << " raw)b }";
}

void Push(TEntry *entry)
void Push(std::unique_ptr<TEntry>&& entryPtr)
{
TEntry* entry = entryPtr.get();
if (bool(entry->Embedded) == bool(entry->LargeGlobId)) {
Y_TABLET_ERROR(NFmt::Do(*entry) << " has incorrect payload");
}

Log->Push(entry);
Log.Push(std::move(entryPtr));

Items++;
Memory += entry->BytesMem();
Expand Down Expand Up @@ -92,15 +93,15 @@ namespace NRedo {
for (auto &it : Overhead)
it.second.Clear();

auto was = std::exchange(Log, new TLog);
TLog was = std::exchange(Log, TLog{});

Items = 0;
Memory = 0;
LargeGlobIdsBytes = 0;

auto logos = snap.MutableNonSnapLogBodies();

while (TAutoPtr<TEntry> entry = was->Pop()) {
while (auto entry = was.PopDefault()) {
if (entry->FilterTables(Edges)) {
for (const auto& blobId : entry->LargeGlobId.Blobs()) {
LogoBlobIDFromLogoBlobID(blobId, logos->Add());
Expand All @@ -116,7 +117,7 @@ namespace NRedo {

entry->References = 0;

Push(entry.Release());
Push(std::move(entry));
} else {
Y_ENSURE(entry->References == 0);
}
Expand All @@ -134,9 +135,7 @@ namespace NRedo {
return Usage;
}

using TLog = TOneOneQueueInplace<NRedo::TEntry *, 4096>;

TAutoPtr<TLog, TLog::TPtrCleanInplaceMallocDestructor> Log;
TLog Log;
THashMap<ui32, NTable::TSnapEdge> Edges;
THashMap<ui32, TOverhead> Overhead;
TIntrusiveList<TOverhead> Changes;
Expand Down
Loading
Loading