Skip to content

Make TQueueInplace and TOneOneQueueInplace usable without an extra pointer #21488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ namespace NKikimr {
std::shared_ptr<TVDiskSkeletonTrace> Trace;
ui64 InternalMessageId;

TRecord() = default;

TRecord(std::unique_ptr<IEventHandle> ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId,
ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId,
const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
Expand Down Expand Up @@ -161,7 +159,7 @@ namespace NKikimr {

private:
const TString VDiskLogPrefix;
std::unique_ptr<TQueueType, TQueueType::TCleanDestructor> Queue;
TQueueType Queue;
ui64 InFlightCount;
ui64 InFlightCost;
ui64 InFlightBytes;
Expand Down Expand Up @@ -203,7 +201,6 @@ namespace NKikimr {
ui64 maxInFlightCost,
TIntrusivePtr<::NMonitoring::TDynamicCounters> skeletonFrontGroup)
: VDiskLogPrefix(logPrefix)
, Queue(new TQueueType())
, InFlightCount(0)
, InFlightCost(0)
, InFlightBytes(0)
Expand Down Expand Up @@ -231,7 +228,7 @@ namespace NKikimr {
}

ui64 GetSize() const {
return Queue->GetSize();
return Queue.GetSize();
}

template<typename TFront>
Expand All @@ -240,7 +237,7 @@ namespace NKikimr {
NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& /*front*/,
const NBackpressure::TQueueClientId& clientId, std::shared_ptr<TVDiskSkeletonTrace> &&trace,
ui64 internalMessageId) {
if (!Queue->Head() && CanSendToSkeleton(cost)) {
if (!Queue.Head() && CanSendToSkeleton(cost)) {
// send to Skeleton for further processing
ctx.Send(converted.release());
++InFlightCount;
Expand All @@ -262,8 +259,8 @@ namespace NKikimr {
*SkeletonFrontDelayedBytes += recByteSize;

TInstant now = TAppData::TimeProvider->Now();
Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
clientId, Name, std::move(trace), internalMessageId));
Queue.Emplace(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId,
clientId, Name, std::move(trace), internalMessageId);
}
}

Expand All @@ -276,7 +273,7 @@ namespace NKikimr {
template <class TFront>
void ProcessNext(const TActorContext &ctx, TFront &front, bool forceError) {
// we can send next element to Skeleton if any
while (TRecord *rec = Queue->Head()) {
while (TRecord *rec = Queue.Head()) {
const ui64 cost = rec->Cost;
if (CanSendToSkeleton(cost) || forceError) {
ui32 recByteSize = rec->ByteSize;
Expand Down Expand Up @@ -314,7 +311,7 @@ namespace NKikimr {
Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace)));
UpdateState();
}
Queue->Pop();
Queue.Pop();
} else {
break; // stop sending requests to skeleton
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ namespace NKikimr {
{}

~TEmergencyQueue() {
while (Queue.Head()) {
Queue.Pop();
}
}

void Push(TEvBlobStorage::TEvVMovedPatch::TPtr ev) {
Expand Down
13 changes: 6 additions & 7 deletions ydb/core/tablet/tablet_pipe_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <ydb/core/base/hive.h>
#include <ydb/core/base/domain.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/actors/util/queue_oneone_inplace.h>
#include <ydb/core/util/queue_inplace.h>
#include <library/cpp/random_provider/random_provider.h>


Expand Down Expand Up @@ -40,7 +40,6 @@ namespace NTabletPipe {
, TabletId(tabletId)
, Config(config)
, IsShutdown(false)
, PayloadQueue(new TPayloadQueue())
, Leader(true)
{
Y_ABORT_UNLESS(tabletId != 0);
Expand Down Expand Up @@ -148,7 +147,7 @@ namespace NTabletPipe {
void HandleSendQueued(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
BLOG_D("queue send");
Y_ABORT_UNLESS(!IsShutdown);
PayloadQueue->Push(ev.Release());
PayloadQueue.Push(std::move(ev));
}

void HandleSend(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) {
Expand Down Expand Up @@ -322,10 +321,11 @@ namespace NTabletPipe {
Leader, false, Generation, std::move(versionInfo)));

BLOG_D("send queued");
while (TAutoPtr<IEventHandle> x = PayloadQueue->Pop())
while (TAutoPtr<IEventHandle> x = PayloadQueue.PopDefault())
Push(ctx, x);

PayloadQueue.Destroy();
// Free buffer memory
PayloadQueue.Clear();

if (IsShutdown) {
BLOG_D("shutdown pipe due to pending shutdown request");
Expand Down Expand Up @@ -719,8 +719,7 @@ namespace NTabletPipe {
TActorId InterconnectProxyId;
TActorId InterconnectSessionId;
TActorId ServerId;
typedef TOneOneQueueInplace<IEventHandle*, 32> TPayloadQueue;
TAutoPtr<TPayloadQueue, TPayloadQueue::TPtrCleanDestructor> PayloadQueue;
TQueueInplace<TAutoPtr<IEventHandle>, 32> PayloadQueue;
TClientRetryState RetryState;
bool Leader;
ui64 Generation = 0;
Expand Down
42 changes: 19 additions & 23 deletions ydb/core/tablet/tablet_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/interconnect/interconnect.h>
#include <ydb/core/util/cache.h>
#include <ydb/core/util/queue_oneone_inplace.h>
#include <ydb/core/util/queue_inplace.h>
#include <util/generic/map.h>
#include <util/generic/deque.h>
#include <library/cpp/random_provider/random_provider.h>
Expand Down Expand Up @@ -94,17 +94,15 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
TInstant AddInstant;
TEvTabletResolver::TEvForward::TPtr Ev;

TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr &ev)
TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr&& ev)
: AddInstant(instant)
, Ev(ev)
, Ev(std::move(ev))
{}
};

typedef TOneOneQueueInplace<TQueueEntry *, 64> TQueueType;

EState State = StInit;

TAutoPtr<TQueueType, TQueueType::TPtrCleanDestructor> Queue;
TQueueInplace<TQueueEntry, 128> Queue;
TActorId KnownLeader;
TActorId KnownLeaderTablet;

Expand Down Expand Up @@ -200,9 +198,7 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
}

bool PushQueue(TEvTabletResolver::TEvForward::TPtr &ev, TEntry &entry, const TActorContext &ctx) {
if (!entry.Queue)
entry.Queue.Reset(new TEntry::TQueueType());
entry.Queue->Push(new TEntry::TQueueEntry(ctx.Now(), ev));
entry.Queue.Emplace(ctx.Now(), std::move(ev));
return true;
}

Expand Down Expand Up @@ -324,14 +320,15 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
}

void SendQueued(ui64 tabletId, TEntry &entry, const TActorContext &ctx) {
if (TEntry::TQueueType *queue = entry.Queue.Get()) {
for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
TEvTabletResolver::TEvForward *msg = x->Ev->Get();
if (!SendForward(x->Ev->Sender, entry, msg, ctx))
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
while (TEntry::TQueueEntry* x = entry.Queue.Head()) {
TEvTabletResolver::TEvForward *msg = x->Ev->Get();
if (!SendForward(x->Ev->Sender, entry, msg, ctx)) {
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
}
entry.Queue.Destroy();
entry.Queue.Pop();
}
// Free buffer memory
entry.Queue.Clear();
}

void SendPing(ui64 tabletId, TEntry &entry, const TActorContext &ctx) {
Expand Down Expand Up @@ -359,10 +356,9 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
LOG_DEBUG(ctx, NKikimrServices::TABLET_RESOLVER,
"DropEntry tabletId: %" PRIu64 " followers: %" PRIu64,
tabletId, entry.KnownFollowers.size());
if (TEntry::TQueueType *queue = entry.Queue.Get()) {
for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
}
while (TEntry::TQueueEntry* x = entry.Queue.Head()) {
ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId));
entry.Queue.Pop();
}
ResolvedTablets.Erase(tabletId);
UnresolvedTablets.Erase(tabletId);
Expand Down Expand Up @@ -840,10 +836,10 @@ class TTabletResolver : public TActorBootstrapped<TTabletResolver> {
if (!value)
return;

if (TEntry::TQueueType *queue = value->Queue.Get()) {
for (TAutoPtr<TEntry::TQueueEntry> x = queue->Pop(); !!x; x.Reset(queue->Pop())) {
ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key));
}
auto& queue = value->Queue;
while (TEntry::TQueueEntry* x = queue.Head()) {
ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key));
queue.Pop();
}
});
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tablet_flat/logic_redo_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ namespace NRedo {

struct TEntry {
template<typename ... Args>
static TEntry* Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
static std::unique_ptr<TEntry> Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
{
auto *ptr = malloc(sizeof(TEntry) + affects.size() * sizeof(ui32));
void* ptr = ::operator new(sizeof(TEntry) + affects.size() * sizeof(ui32));

return ::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...);
return std::unique_ptr<TEntry>(::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...));
}

void operator delete (void *p)
void operator delete(void* p)
{
free(p);
::operator delete(p);
}

void Describe(IOutputStream &out) const
Expand Down
19 changes: 9 additions & 10 deletions ydb/core/tablet_flat/logic_redo_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ namespace NRedo {
struct TQueue {
using TStamp = NTable::TTxStamp;
using TAffects = TArrayRef<const ui32>;
using TLog = TQueueInplace<std::unique_ptr<TEntry>, 4096>;

TQueue(THashMap<ui32, NTable::TSnapEdge> edges)
: Log(new TLog)
, Edges(std::move(edges))
: Edges(std::move(edges))
{

}
Expand All @@ -45,13 +45,14 @@ namespace NRedo {
<< " (" << Memory << " mem" << ", " << LargeGlobIdsBytes << " raw)b }";
}

void Push(TEntry *entry)
void Push(std::unique_ptr<TEntry>&& entryPtr)
{
TEntry* entry = entryPtr.get();
if (bool(entry->Embedded) == bool(entry->LargeGlobId)) {
Y_TABLET_ERROR(NFmt::Do(*entry) << " has incorrect payload");
}

Log->Push(entry);
Log.Push(std::move(entryPtr));

Items++;
Memory += entry->BytesMem();
Expand Down Expand Up @@ -92,15 +93,15 @@ namespace NRedo {
for (auto &it : Overhead)
it.second.Clear();

auto was = std::exchange(Log, new TLog);
TLog was = std::exchange(Log, TLog{});

Items = 0;
Memory = 0;
LargeGlobIdsBytes = 0;

auto logos = snap.MutableNonSnapLogBodies();

while (TAutoPtr<TEntry> entry = was->Pop()) {
while (auto entry = was.PopDefault()) {
if (entry->FilterTables(Edges)) {
for (const auto& blobId : entry->LargeGlobId.Blobs()) {
LogoBlobIDFromLogoBlobID(blobId, logos->Add());
Expand All @@ -116,7 +117,7 @@ namespace NRedo {

entry->References = 0;

Push(entry.Release());
Push(std::move(entry));
} else {
Y_ENSURE(entry->References == 0);
}
Expand All @@ -134,9 +135,7 @@ namespace NRedo {
return Usage;
}

using TLog = TOneOneQueueInplace<NRedo::TEntry *, 4096>;

TAutoPtr<TLog, TLog::TPtrCleanInplaceMallocDestructor> Log;
TLog Log;
THashMap<ui32, NTable::TSnapEdge> Edges;
THashMap<ui32, TOverhead> Overhead;
TIntrusiveList<TOverhead> Changes;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2523,7 +2523,7 @@ class TDataShard

TTxProgressIdempotentScalarQueue<TEvPrivate::TEvProgressTransaction> PlanQueue;
TTxProgressIdempotentScalarScheduleQueue<TEvPrivate::TEvCleanupTransaction> CleanupQueue;
TTxProgressQueue<ui64, TNoOpDestroy, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;
TTxProgressQueue<ui64, TEvPrivate::TEvProgressResendReadSet> ResendReadSetQueue;

struct TPipeServerInfoOverloadSubscribersTag {};

Expand Down
13 changes: 6 additions & 7 deletions ydb/core/tx/datashard/progress_queue.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
#pragma once
#include "defs.h"
#include <ydb/core/util/queue_oneone_inplace.h>
#include <ydb/core/util/queue_inplace.h>

namespace NKikimr {

template <typename T, typename TDestruct, typename TEvent>
template <typename T, typename TEvent>
class TTxProgressQueue {
bool HasInFly;
TOneOneQueueInplace<T, 32> Queue;
TQueueInplace<T, 32> Queue;

public:
TTxProgressQueue()
: HasInFly(false)
{}

~TTxProgressQueue() {
while (T head = Queue.Pop())
TDestruct::Destroy(head);
}

void Progress(T x, const TActorContext &ctx) {
Expand All @@ -24,13 +23,13 @@ class TTxProgressQueue {
ctx.Send(ctx.SelfID, new TEvent(x));
HasInFly = true;
} else {
Queue.Push(x);
Queue.Push(std::move(x));
}
}

void Reset(const TActorContext &ctx) {
Y_DEBUG_ABORT_UNLESS(HasInFly);
if (T x = Queue.Pop())
if (T x = Queue.PopDefault())
ctx.Send(ctx.SelfID, new TEvent(x));
else
HasInFly = false;
Expand Down
Loading
Loading