diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index b080dbb9c79d..eb3871877b05 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -108,8 +108,6 @@ namespace NKikimr { std::shared_ptr Trace; ui64 InternalMessageId; - TRecord() = default; - TRecord(std::unique_ptr ev, TInstant now, ui32 recByteSize, const NBackpressure::TMessageId &msgId, ui64 cost, TInstant deadline, NKikimrBlobStorage::EVDiskQueueId extQueueId, const NBackpressure::TQueueClientId& clientId, TString name, std::shared_ptr &&trace, @@ -161,7 +159,7 @@ namespace NKikimr { private: const TString VDiskLogPrefix; - std::unique_ptr Queue; + TQueueType Queue; ui64 InFlightCount; ui64 InFlightCost; ui64 InFlightBytes; @@ -203,7 +201,6 @@ namespace NKikimr { ui64 maxInFlightCost, TIntrusivePtr<::NMonitoring::TDynamicCounters> skeletonFrontGroup) : VDiskLogPrefix(logPrefix) - , Queue(new TQueueType()) , InFlightCount(0) , InFlightCost(0) , InFlightBytes(0) @@ -231,7 +228,7 @@ namespace NKikimr { } ui64 GetSize() const { - return Queue->GetSize(); + return Queue.GetSize(); } template @@ -240,7 +237,7 @@ namespace NKikimr { NKikimrBlobStorage::EVDiskQueueId extQueueId, TFront& /*front*/, const NBackpressure::TQueueClientId& clientId, std::shared_ptr &&trace, ui64 internalMessageId) { - if (!Queue->Head() && CanSendToSkeleton(cost)) { + if (!Queue.Head() && CanSendToSkeleton(cost)) { // send to Skeleton for further processing ctx.Send(converted.release()); ++InFlightCount; @@ -262,8 +259,8 @@ namespace NKikimr { *SkeletonFrontDelayedBytes += recByteSize; TInstant now = TAppData::TimeProvider->Now(); - Queue->Push(TRecord(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId, - clientId, Name, std::move(trace), internalMessageId)); + Queue.Emplace(std::move(converted), now, recByteSize, msgId, cost, deadline, extQueueId, + clientId, Name, std::move(trace), internalMessageId); } } @@ -276,7 +273,7 @@ namespace NKikimr { template void ProcessNext(const TActorContext &ctx, TFront &front, bool forceError) { // we can send next element to Skeleton if any - while (TRecord *rec = Queue->Head()) { + while (TRecord *rec = Queue.Head()) { const ui64 cost = rec->Cost; if (CanSendToSkeleton(cost) || forceError) { ui32 recByteSize = rec->ByteSize; @@ -314,7 +311,7 @@ namespace NKikimr { Msgs.emplace(rec->InternalMessageId, TMsgInfo(rec->MsgId.MsgId, ctx.Now(), std::move(rec->Trace))); UpdateState(); } - Queue->Pop(); + Queue.Pop(); } else { break; // stop sending requests to skeleton } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp index c338f1a239e9..772d39b9f7a2 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_overload_handler.cpp @@ -55,9 +55,6 @@ namespace NKikimr { {} ~TEmergencyQueue() { - while (Queue.Head()) { - Queue.Pop(); - } } void Push(TEvBlobStorage::TEvVMovedPatch::TPtr ev) { diff --git a/ydb/core/tablet/tablet_pipe_client.cpp b/ydb/core/tablet/tablet_pipe_client.cpp index 879c574da4e8..8eabef158af9 100644 --- a/ydb/core/tablet/tablet_pipe_client.cpp +++ b/ydb/core/tablet/tablet_pipe_client.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include @@ -40,7 +40,6 @@ namespace NTabletPipe { , TabletId(tabletId) , Config(config) , IsShutdown(false) - , PayloadQueue(new TPayloadQueue()) , Leader(true) { Y_ABORT_UNLESS(tabletId != 0); @@ -148,7 +147,7 @@ namespace NTabletPipe { void HandleSendQueued(TAutoPtr& ev, const TActorContext& ctx) { BLOG_D("queue send"); Y_ABORT_UNLESS(!IsShutdown); - PayloadQueue->Push(ev.Release()); + PayloadQueue.Push(std::move(ev)); } void HandleSend(TAutoPtr& ev, const TActorContext& ctx) { @@ -322,10 +321,11 @@ namespace NTabletPipe { Leader, false, Generation, std::move(versionInfo))); BLOG_D("send queued"); - while (TAutoPtr x = PayloadQueue->Pop()) + while (TAutoPtr x = PayloadQueue.PopDefault()) Push(ctx, x); - PayloadQueue.Destroy(); + // Free buffer memory + PayloadQueue.Clear(); if (IsShutdown) { BLOG_D("shutdown pipe due to pending shutdown request"); @@ -719,8 +719,7 @@ namespace NTabletPipe { TActorId InterconnectProxyId; TActorId InterconnectSessionId; TActorId ServerId; - typedef TOneOneQueueInplace TPayloadQueue; - TAutoPtr PayloadQueue; + TQueueInplace, 32> PayloadQueue; TClientRetryState RetryState; bool Leader; ui64 Generation = 0; diff --git a/ydb/core/tablet/tablet_resolver.cpp b/ydb/core/tablet/tablet_resolver.cpp index d3eb050c22f7..15d0aa9c269d 100644 --- a/ydb/core/tablet/tablet_resolver.cpp +++ b/ydb/core/tablet/tablet_resolver.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include #include #include @@ -94,17 +94,15 @@ class TTabletResolver : public TActorBootstrapped { TInstant AddInstant; TEvTabletResolver::TEvForward::TPtr Ev; - TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr &ev) + TQueueEntry(TInstant instant, TEvTabletResolver::TEvForward::TPtr&& ev) : AddInstant(instant) - , Ev(ev) + , Ev(std::move(ev)) {} }; - typedef TOneOneQueueInplace TQueueType; - EState State = StInit; - TAutoPtr Queue; + TQueueInplace Queue; TActorId KnownLeader; TActorId KnownLeaderTablet; @@ -200,9 +198,7 @@ class TTabletResolver : public TActorBootstrapped { } bool PushQueue(TEvTabletResolver::TEvForward::TPtr &ev, TEntry &entry, const TActorContext &ctx) { - if (!entry.Queue) - entry.Queue.Reset(new TEntry::TQueueType()); - entry.Queue->Push(new TEntry::TQueueEntry(ctx.Now(), ev)); + entry.Queue.Emplace(ctx.Now(), std::move(ev)); return true; } @@ -324,14 +320,15 @@ class TTabletResolver : public TActorBootstrapped { } void SendQueued(ui64 tabletId, TEntry &entry, const TActorContext &ctx) { - if (TEntry::TQueueType *queue = entry.Queue.Get()) { - for (TAutoPtr x = queue->Pop(); !!x; x.Reset(queue->Pop())) { - TEvTabletResolver::TEvForward *msg = x->Ev->Get(); - if (!SendForward(x->Ev->Sender, entry, msg, ctx)) - ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId)); + while (TEntry::TQueueEntry* x = entry.Queue.Head()) { + TEvTabletResolver::TEvForward *msg = x->Ev->Get(); + if (!SendForward(x->Ev->Sender, entry, msg, ctx)) { + ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId)); } - entry.Queue.Destroy(); + entry.Queue.Pop(); } + // Free buffer memory + entry.Queue.Clear(); } void SendPing(ui64 tabletId, TEntry &entry, const TActorContext &ctx) { @@ -359,10 +356,9 @@ class TTabletResolver : public TActorBootstrapped { LOG_DEBUG(ctx, NKikimrServices::TABLET_RESOLVER, "DropEntry tabletId: %" PRIu64 " followers: %" PRIu64, tabletId, entry.KnownFollowers.size()); - if (TEntry::TQueueType *queue = entry.Queue.Get()) { - for (TAutoPtr x = queue->Pop(); !!x; x.Reset(queue->Pop())) { - ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId)); - } + while (TEntry::TQueueEntry* x = entry.Queue.Head()) { + ctx.Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::ERROR, tabletId)); + entry.Queue.Pop(); } ResolvedTablets.Erase(tabletId); UnresolvedTablets.Erase(tabletId); @@ -840,10 +836,10 @@ class TTabletResolver : public TActorBootstrapped { if (!value) return; - if (TEntry::TQueueType *queue = value->Queue.Get()) { - for (TAutoPtr x = queue->Pop(); !!x; x.Reset(queue->Pop())) { - ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key)); - } + auto& queue = value->Queue; + while (TEntry::TQueueEntry* x = queue.Head()) { + ActorSystem->Send(x->Ev->Sender, new TEvTabletResolver::TEvForwardResult(NKikimrProto::RACE, key)); + queue.Pop(); } }); } diff --git a/ydb/core/tablet_flat/logic_redo_entry.h b/ydb/core/tablet_flat/logic_redo_entry.h index ec947f04d326..c8d7741cc71b 100644 --- a/ydb/core/tablet_flat/logic_redo_entry.h +++ b/ydb/core/tablet_flat/logic_redo_entry.h @@ -11,16 +11,16 @@ namespace NRedo { struct TEntry { template - static TEntry* Create(NTable::TTxStamp stamp, TArrayRef affects, Args&& ... args) + static std::unique_ptr Create(NTable::TTxStamp stamp, TArrayRef affects, Args&& ... args) { - auto *ptr = malloc(sizeof(TEntry) + affects.size() * sizeof(ui32)); + void* ptr = ::operator new(sizeof(TEntry) + affects.size() * sizeof(ui32)); - return ::new(ptr) TEntry(stamp, affects, std::forward(args)...); + return std::unique_ptr(::new(ptr) TEntry(stamp, affects, std::forward(args)...)); } - void operator delete (void *p) + void operator delete(void* p) { - free(p); + ::operator delete(p); } void Describe(IOutputStream &out) const diff --git a/ydb/core/tablet_flat/logic_redo_queue.h b/ydb/core/tablet_flat/logic_redo_queue.h index 648047c560ba..529dd5a64543 100644 --- a/ydb/core/tablet_flat/logic_redo_queue.h +++ b/ydb/core/tablet_flat/logic_redo_queue.h @@ -19,10 +19,10 @@ namespace NRedo { struct TQueue { using TStamp = NTable::TTxStamp; using TAffects = TArrayRef; + using TLog = TQueueInplace, 4096>; TQueue(THashMap edges) - : Log(new TLog) - , Edges(std::move(edges)) + : Edges(std::move(edges)) { } @@ -45,13 +45,14 @@ namespace NRedo { << " (" << Memory << " mem" << ", " << LargeGlobIdsBytes << " raw)b }"; } - void Push(TEntry *entry) + void Push(std::unique_ptr&& entryPtr) { + TEntry* entry = entryPtr.get(); if (bool(entry->Embedded) == bool(entry->LargeGlobId)) { Y_TABLET_ERROR(NFmt::Do(*entry) << " has incorrect payload"); } - Log->Push(entry); + Log.Push(std::move(entryPtr)); Items++; Memory += entry->BytesMem(); @@ -92,7 +93,7 @@ namespace NRedo { for (auto &it : Overhead) it.second.Clear(); - auto was = std::exchange(Log, new TLog); + TLog was = std::exchange(Log, TLog{}); Items = 0; Memory = 0; @@ -100,7 +101,7 @@ namespace NRedo { auto logos = snap.MutableNonSnapLogBodies(); - while (TAutoPtr entry = was->Pop()) { + while (auto entry = was.PopDefault()) { if (entry->FilterTables(Edges)) { for (const auto& blobId : entry->LargeGlobId.Blobs()) { LogoBlobIDFromLogoBlobID(blobId, logos->Add()); @@ -116,7 +117,7 @@ namespace NRedo { entry->References = 0; - Push(entry.Release()); + Push(std::move(entry)); } else { Y_ENSURE(entry->References == 0); } @@ -134,9 +135,7 @@ namespace NRedo { return Usage; } - using TLog = TOneOneQueueInplace; - - TAutoPtr Log; + TLog Log; THashMap Edges; THashMap Overhead; TIntrusiveList Changes; diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 587ecd5927d1..1b1a235adec1 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2523,7 +2523,7 @@ class TDataShard TTxProgressIdempotentScalarQueue PlanQueue; TTxProgressIdempotentScalarScheduleQueue CleanupQueue; - TTxProgressQueue ResendReadSetQueue; + TTxProgressQueue ResendReadSetQueue; struct TPipeServerInfoOverloadSubscribersTag {}; diff --git a/ydb/core/tx/datashard/progress_queue.h b/ydb/core/tx/datashard/progress_queue.h index 0e99651853af..a02ecb663a00 100644 --- a/ydb/core/tx/datashard/progress_queue.h +++ b/ydb/core/tx/datashard/progress_queue.h @@ -1,21 +1,20 @@ #pragma once #include "defs.h" -#include +#include namespace NKikimr { -template +template class TTxProgressQueue { bool HasInFly; - TOneOneQueueInplace Queue; + TQueueInplace Queue; + public: TTxProgressQueue() : HasInFly(false) {} ~TTxProgressQueue() { - while (T head = Queue.Pop()) - TDestruct::Destroy(head); } void Progress(T x, const TActorContext &ctx) { @@ -24,13 +23,13 @@ class TTxProgressQueue { ctx.Send(ctx.SelfID, new TEvent(x)); HasInFly = true; } else { - Queue.Push(x); + Queue.Push(std::move(x)); } } void Reset(const TActorContext &ctx) { Y_DEBUG_ABORT_UNLESS(HasInFly); - if (T x = Queue.Pop()) + if (T x = Queue.PopDefault()) ctx.Send(ctx.SelfID, new TEvent(x)); else HasInFly = false; diff --git a/ydb/core/tx/tx_proxy/proxy_impl.cpp b/ydb/core/tx/tx_proxy/proxy_impl.cpp index 26be5bd05775..c430fe132ae1 100644 --- a/ydb/core/tx/tx_proxy/proxy_impl.cpp +++ b/ydb/core/tx/tx_proxy/proxy_impl.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace NKikimr { using namespace NTabletFlatExecutor; @@ -34,12 +35,10 @@ struct TDelayedQueue { } }; typedef TAutoPtr TRequestPtr; - typedef TOneOneQueueInplace TQueueType; - typedef TAutoPtr TSafeQueue; - TSafeQueue Queue; + typedef TQueueInplace TQueueType; + TQueueType Queue; TDelayedQueue() - : Queue(new TQueueType()) {} }; @@ -129,30 +128,30 @@ class TTxProxy : public TActorBootstrapped { void DelayRequest(TEvTxUserProxy::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) { auto request = new TDelayedProposal::TRequest(ev, ctx.Now() + TimeoutDelayedRequest); - DelayedProposal.Queue->Push(request); + DelayedProposal.Queue.Emplace(request); } void DelayRequest(TEvTxUserProxy::TEvProposeKqpTransaction::TPtr &ev, const TActorContext &ctx) { auto request = new TDelayedKqpProposal::TRequest(ev, ctx.Now() + TimeoutDelayedRequest); - DelayedKqpProposal.Queue->Push(request); + DelayedKqpProposal.Queue.Emplace(request); } void DelayRequest(TEvTxUserProxy::TEvAllocateTxId::TPtr &ev, const TActorContext &ctx) { auto request = new TDelayedAllocateTxId::TRequest(ev, ctx.Now() + TimeoutDelayedRequest); - DelayedAllocateTxId.Queue->Push(request); + DelayedAllocateTxId.Queue.Emplace(request); } template void PlayQueue(TDelayedQueue &delayed, const TActorContext &ctx) { typedef typename TDelayedQueue::TRequestPtr TRequestPtr; - while (delayed.Queue->Head()) { + while (delayed.Queue.Head()) { TVector txIds = TxAllocatorClient.AllocateTxIds(1, ctx); if (!txIds) { return; } - TRequestPtr extracted = delayed.Queue->Pop(); + TRequestPtr extracted = delayed.Queue.PopDefault(); ProcessRequest(extracted->GetRequest(), ctx, txIds.front()); } } @@ -167,12 +166,12 @@ class TTxProxy : public TActorBootstrapped { void CheckTimeout(TDelayedQueue &delayed, const TActorContext &ctx) { typedef typename TDelayedQueue::TRequestPtr TRequestPtr; - while (const auto head = delayed.Queue->Head()) { - const TInstant &expireAt = head->GetExpireMoment(); + while (const auto head = delayed.Queue.Head()) { + const TInstant &expireAt = (*head)->GetExpireMoment(); if (expireAt > ctx.Now()) { break; } - TRequestPtr extracted = delayed.Queue->Pop(); + TRequestPtr extracted = delayed.Queue.PopDefault(); Decline(extracted->GetRequest(), ctx); } } diff --git a/ydb/core/util/queue_inplace.h b/ydb/core/util/queue_inplace.h index 3d2cd472ffcc..03aaab9f7ba2 100644 --- a/ydb/core/util/queue_inplace.h +++ b/ydb/core/util/queue_inplace.h @@ -1,117 +1,186 @@ #pragma once #include "defs.h" +#include +#include template struct TSimpleQueueChunk { static const ui32 EntriesCount = (TSize - sizeof(TSimpleQueueChunk*)) / sizeof(T); static_assert(EntriesCount > 0, "expect EntriesCount > 0"); - T Entries[EntriesCount]; - TSimpleQueueChunk * volatile Next; + union { + T Entries[EntriesCount]; + char Data[EntriesCount * sizeof(T)]; + }; + TSimpleQueueChunk* Next = nullptr; - TSimpleQueueChunk() { - } + TSimpleQueueChunk() {} + ~TSimpleQueueChunk() {} }; - template> -class TQueueInplace : TNonCopyable { - TChunk * ReadFrom; +class TQueueInplace { + TChunk* ReadFrom; ui32 ReadPosition; ui32 WritePosition; - TChunk * WriteTo; + TChunk* WriteTo; size_t Size; public: - TQueueInplace() - : ReadFrom(new TChunk()) + TQueueInplace() noexcept + : ReadFrom(nullptr) , ReadPosition(0) , WritePosition(0) - , WriteTo(ReadFrom) + , WriteTo(nullptr) , Size(0) {} - ~TQueueInplace() { - Y_DEBUG_ABORT_UNLESS(Head() == nullptr && Size == 0); - delete ReadFrom; + TQueueInplace(TQueueInplace&& rhs) noexcept + : ReadFrom(rhs.ReadFrom) + , ReadPosition(rhs.ReadPosition) + , WritePosition(rhs.WritePosition) + , WriteTo(rhs.WriteTo) + , Size(rhs.Size) + { + rhs.ReadFrom = nullptr; + rhs.ReadPosition = 0; + rhs.WritePosition = 0; + rhs.WriteTo = nullptr; + rhs.Size = 0; } - struct TPtrCleanDestructor { - static inline void Destroy(TQueueInplace *x) noexcept { - while (const T *head = x->Head()) { - delete *head; - x->Pop(); - } - delete x; + TQueueInplace& operator=(TQueueInplace&& rhs) noexcept { + if (this != &rhs) [[likely]] { + Clear(); + ReadFrom = rhs.ReadFrom; + ReadPosition = rhs.ReadPosition; + WritePosition = rhs.WritePosition; + WriteTo = rhs.WriteTo; + Size = rhs.Size; + rhs.ReadFrom = nullptr; + rhs.ReadPosition = 0; + rhs.WritePosition = 0; + rhs.WriteTo = nullptr; + rhs.Size = 0; } - }; + return *this; + } + + ~TQueueInplace() { + Clear(); + } - struct TCleanDestructor { - static inline void Destroy(TQueueInplace *x) noexcept { - while (x->Head()) { - x->Pop(); + void Clear() noexcept { + TChunk* head = ReadFrom; + if (head) { + if constexpr (std::is_trivially_destructible_v) { + do { + TChunk* next = head->Next; + delete head; + head = next; + } while (head); + } else { + ui32 start = ReadPosition; + do { + TChunk* next = head->Next; + ui32 end = next ? TChunk::EntriesCount : WritePosition; + for (ui32 index = start; index != end; ++index) { + std::destroy_at(&head->Entries[index]); + } + delete head; + head = next; + start = 0; + } while (head); } - delete x; + ReadFrom = nullptr; + ReadPosition = 0; + WritePosition = 0; + WriteTo = nullptr; + Size = 0; } + } - void operator ()(TQueueInplace *x) const noexcept { - Destroy(x); - } - }; + void Push(const T& x) { + ::new (NewEntry()) T(x); + ++WritePosition; + ++Size; + } - void Push(const T &x) noexcept { + void Push(T&& x) { + ::new (NewEntry()) T(std::move(x)); + ++WritePosition; ++Size; - if (WritePosition != TChunk::EntriesCount) { - WriteTo->Entries[WritePosition] = x; - ++WritePosition; - } else { - TChunk *next = new TChunk(); - next->Entries[0] = x; - WriteTo->Next = next; - WriteTo = next; - WritePosition = 1; - } } - void Push(T &&x) noexcept { + template + T& Emplace(TArgs&&... args) { + T& result = *::new (NewEntry()) T(std::forward(args)...); + ++WritePosition; ++Size; - if (WritePosition != TChunk::EntriesCount) { - WriteTo->Entries[WritePosition] = std::move(x); - ++WritePosition; - } else { - TChunk *next = new TChunk(); - next->Entries[0] = std::move(x); - WriteTo->Next = next; - WriteTo = next; - WritePosition = 1; - } + return result; } - T *Head() { - TChunk *head = ReadFrom; - if (ReadFrom == WriteTo && ReadPosition == WritePosition) { + T* Head() noexcept { + TChunk* head = ReadFrom; + if (head == WriteTo && ReadPosition == WritePosition) { + // Note: this also handles ReadFrom == WriteTo == nullptr return nullptr; - } else if (ReadPosition != TChunk::EntriesCount) { - return &(head->Entries[ReadPosition]); - } else if (TChunk *next = head->Next) { - ReadFrom = next; + } + if (ReadPosition == TChunk::EntriesCount) [[unlikely]] { + TChunk* next = head->Next; + if (!next) { + return nullptr; + } delete head; + head = next; + ReadFrom = next; ReadPosition = 0; - return Head(); } - return nullptr; + return &head->Entries[ReadPosition]; } void Pop() { - const T *ret = Head(); - if (ret) { + if (T* x = Head()) [[likely]] { + std::destroy_at(x); + ++ReadPosition; + --Size; + } + } + + T PopDefault() { + if (T* x = Head()) [[likely]] { + T result(std::move(*x)); + std::destroy_at(x); ++ReadPosition; --Size; + return result; + } else { + return T{}; } } + explicit operator bool() const { + return Size > 0; + } + size_t GetSize() const { return Size; } + +private: + void* NewEntry() { + if (WriteTo) [[likely]] { + if (WritePosition == TChunk::EntriesCount) [[unlikely]] { + TChunk* next = new TChunk; + WriteTo->Next = next; + WriteTo = next; + WritePosition = 0; + } + } else { + // Note: ReadPosition == WritePosition == 0 + ReadFrom = WriteTo = new TChunk; + } + return &WriteTo->Entries[WritePosition]; + } }; diff --git a/ydb/core/util/queue_inplace_ut.cpp b/ydb/core/util/queue_inplace_ut.cpp index fb2d511d0487..2cc9d08eb91a 100644 --- a/ydb/core/util/queue_inplace_ut.cpp +++ b/ydb/core/util/queue_inplace_ut.cpp @@ -11,16 +11,18 @@ Y_UNIT_TEST_SUITE(TQueueInplaceTests) { struct TStruct { ui32 X; ui32 Y; - TStruct(ui32 i = 0) + TStruct(ui32 i) : X(i) , Y(i) {} + TStruct(TStruct&& s) + : X(s.X) + , Y(s.Y) + {} + TStruct(const TStruct&) = delete; - TStruct &operator = (const TStruct &s) { - X = s.X; - Y = s.Y; - return *this; - } + TStruct& operator=(TStruct&& s) = delete; + TStruct& operator=(const TStruct&) = delete; bool operator == (ui32 i) const { return X == i && Y == i; @@ -53,23 +55,119 @@ Y_UNIT_TEST_SUITE(TQueueInplaceTests) { UNIT_ASSERT(queue.Head() == nullptr); } - Y_UNIT_TEST(CleanInDestructor) { - using TQueueType = TQueueInplace *, 32>; + Y_UNIT_TEST(DestroyInDestructor) { + using TQueueType = TQueueInplace, 32>; std::shared_ptr p(new bool(true)); UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count()); { - TAutoPtr queue(new TQueueType()); - queue->Push(new std::shared_ptr(p)); - queue->Push(new std::shared_ptr(p)); - queue->Push(new std::shared_ptr(p)); - queue->Push(new std::shared_ptr(p)); + TQueueType queue; + queue.Push(p); + queue.Push(p); + queue.Push(p); + queue.Push(p); UNIT_ASSERT_VALUES_EQUAL(5u, p.use_count()); + + queue.Pop(); + + UNIT_ASSERT_VALUES_EQUAL(4u, p.use_count()); } - UNIT_ASSERT_VALUES_EQUAL(1, p.use_count()); + UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count()); + } + + Y_UNIT_TEST(EmplacePopDefault) { + using TQueueType = TQueueInplace, 32>; + + TQueueType queue; + queue.Push(std::make_unique(10)); + queue.Emplace(new int(11)); + queue.Emplace(new int(12)); + queue.Emplace(std::make_unique(13)); + + auto a = queue.PopDefault(); + UNIT_ASSERT(a && *a == 10); + auto b = queue.PopDefault(); + UNIT_ASSERT(b && *b == 11); + auto c = queue.PopDefault(); + UNIT_ASSERT(c && *c == 12); + auto d = queue.PopDefault(); + UNIT_ASSERT(d && *d == 13); + auto e = queue.PopDefault(); + UNIT_ASSERT(!e); + } + + Y_UNIT_TEST(PopTooManyTimes) { + using TQueueType = TQueueInplace, 32>; + + TQueueType queue; + queue.Push(std::make_unique(10)); + queue.Push(std::make_unique(11)); + queue.Push(std::make_unique(12)); + queue.Push(std::make_unique(13)); + UNIT_ASSERT(queue.GetSize() == 4); + + queue.Pop(); + queue.Pop(); + queue.Pop(); + queue.Pop(); + queue.Pop(); + UNIT_ASSERT(queue.GetSize() == 0); + } + + Y_UNIT_TEST(MoveConstructor) { + using TQueueType = TQueueInplace; + + TQueueType a; + a.Push(10); + a.Push(11); + a.Push(12); + a.Push(13); + UNIT_ASSERT(a.GetSize() == 4); + + TQueueType b(std::move(a)); + + UNIT_ASSERT(a.GetSize() == 0); + UNIT_ASSERT(a.PopDefault() == 0u); + + UNIT_ASSERT(b.GetSize() == 4); + UNIT_ASSERT(b.PopDefault() == 10u); + UNIT_ASSERT(b.PopDefault() == 11u); + UNIT_ASSERT(b.PopDefault() == 12u); + UNIT_ASSERT(b.PopDefault() == 13u); + UNIT_ASSERT(b.PopDefault() == 0u); + } + + Y_UNIT_TEST(MoveAssignment) { + using TQueueType = TQueueInplace; + + TQueueType a; + a.Push(10); + a.Push(11); + a.Push(12); + a.Push(13); + UNIT_ASSERT(a.GetSize() == 4); + + TQueueType b; + b.Push(20); + b.Push(21); + b.Push(22); + b.Push(23); + UNIT_ASSERT(b.GetSize() == 4); + + a = std::move(b); + + UNIT_ASSERT(a.GetSize() == 4); + UNIT_ASSERT(a.PopDefault() == 20u); + UNIT_ASSERT(a.PopDefault() == 21u); + UNIT_ASSERT(a.PopDefault() == 22u); + UNIT_ASSERT(a.PopDefault() == 23u); + UNIT_ASSERT(a.PopDefault() == 0u); + + UNIT_ASSERT(b.GetSize() == 0); + UNIT_ASSERT(b.PopDefault() == 0u); } } diff --git a/ydb/core/util/queue_oneone_inplace_ut.cpp b/ydb/core/util/queue_oneone_inplace_ut.cpp index ac60a69f3dd4..0ec514ecd967 100644 --- a/ydb/core/util/queue_oneone_inplace_ut.cpp +++ b/ydb/core/util/queue_oneone_inplace_ut.cpp @@ -52,7 +52,26 @@ Y_UNIT_TEST_SUITE(TOneOneQueueTests) { UNIT_ASSERT_VALUES_EQUAL(5u, p.use_count()); } - UNIT_ASSERT_VALUES_EQUAL(1, p.use_count()); + UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count()); + } + + Y_UNIT_TEST(DeleteInDestructor) { + using TQueueType = TOneOneQueueInplace *, 32, TDelete>; + + std::shared_ptr p(new bool(true)); + UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count()); + + { + TQueueType queue; + queue.Push(new std::shared_ptr(p)); + queue.Push(new std::shared_ptr(p)); + queue.Push(new std::shared_ptr(p)); + queue.Push(new std::shared_ptr(p)); + + UNIT_ASSERT_VALUES_EQUAL(5u, p.use_count()); + } + + UNIT_ASSERT_VALUES_EQUAL(1u, p.use_count()); } Y_UNIT_TEST(ReadIterator) { diff --git a/ydb/library/actors/testlib/test_runtime.cpp b/ydb/library/actors/testlib/test_runtime.cpp index b57c92e8aac1..951b35910a28 100644 --- a/ydb/library/actors/testlib/test_runtime.cpp +++ b/ydb/library/actors/testlib/test_runtime.cpp @@ -1896,12 +1896,11 @@ namespace NActors { struct TStrandingActorDecoratorContext : public TThrRefBase { TStrandingActorDecoratorContext() - : Queue(new TQueueType) { } - typedef TOneOneQueueInplace TQueueType; - TAutoPtr Queue; + typedef TOneOneQueueInplace TQueueType; + TQueueType Queue; }; class TStrandingActorDecorator : public TActorBootstrapped { @@ -1958,8 +1957,8 @@ namespace NActors { } STFUNC(StateFunc) { - bool wasEmpty = !Context->Queue->Head(); - Context->Queue->Push(ev.Release()); + bool wasEmpty = !Context->Queue.Head(); + Context->Queue.Push(ev.Release()); if (wasEmpty) { SendHead(ActorContext()); } @@ -1967,15 +1966,15 @@ namespace NActors { STFUNC(Reply) { Y_ABORT_UNLESS(!HasReply); - IEventHandle *requestEv = Context->Queue->Head(); + IEventHandle *requestEv = Context->Queue.Head(); TActorId originalSender = requestEv->Sender; HasReply = !ReplyChecker->IsWaitingForMoreResponses(ev.Get()); if (HasReply) { - delete Context->Queue->Pop(); + delete Context->Queue.Pop(); } auto ctx(ActorContext()); ctx.Send(IEventHandle::Forward(ev, originalSender)); - if (!IsSync && Context->Queue->Head()) { + if (!IsSync && Context->Queue.Head()) { SendHead(ctx); } } @@ -1985,7 +1984,7 @@ namespace NActors { if (!IsSync) { ctx.Send(GetForwardedEvent().Release()); } else { - while (Context->Queue->Head()) { + while (Context->Queue.Head()) { ctx.Send(GetForwardedEvent().Release()); int count = 100; while (!HasReply && count > 0) { @@ -2003,7 +2002,7 @@ namespace NActors { } TAutoPtr GetForwardedEvent() { - IEventHandle* ev = Context->Queue->Head(); + IEventHandle* ev = Context->Queue.Head(); RequestType = ev->GetTypeRewrite(); HasReply = !ReplyChecker->OnRequest(ev); TAutoPtr forwardedEv = ev->HasEvent() @@ -2011,7 +2010,7 @@ namespace NActors { : new IEventHandle(ev->GetTypeRewrite(), ev->Flags, Delegatee, ReplyId, ev->ReleaseChainBuffer(), ev->Cookie); if (HasReply) { - delete Context->Queue->Pop(); + delete Context->Queue.Pop(); } return forwardedEv; } diff --git a/ydb/library/actors/util/queue_oneone_inplace.h b/ydb/library/actors/util/queue_oneone_inplace.h index 288011955a8b..00708cad8b25 100644 --- a/ydb/library/actors/util/queue_oneone_inplace.h +++ b/ydb/library/actors/util/queue_oneone_inplace.h @@ -3,17 +3,15 @@ #include "defs.h" #include "queue_chunk.h" -template > +template > class TOneOneQueueInplace : TNonCopyable { - static_assert(std::is_integral::value || std::is_pointer::value, "expect std::is_integral::value || std::is_pointer::valuer"); + static_assert(std::is_integral::value || std::is_pointer::value, "expect std::is_integral::value || std::is_pointer::value"); TChunk* ReadFrom; ui32 ReadPosition; ui32 WritePosition; TChunk* WriteTo; - friend class TReadIterator; - public: class TReadIterator { TChunk* ReadFrom; @@ -28,14 +26,15 @@ class TOneOneQueueInplace : TNonCopyable { inline T Next() { TChunk* head = ReadFrom; - if (ReadPosition != TChunk::EntriesCount) { - return AtomicLoad(&head->Entries[ReadPosition++]); - } else if (TChunk* next = AtomicLoad(&head->Next)) { - ReadFrom = next; + if (ReadPosition == TChunk::EntriesCount) [[unlikely]] { + head = AtomicLoad(&head->Next); + if (!head) { + return T{}; + } + ReadFrom = head; ReadPosition = 0; - return Next(); } - return T{}; + return AtomicLoad(&head->Entries[ReadPosition++]); } }; @@ -48,67 +47,68 @@ class TOneOneQueueInplace : TNonCopyable { } ~TOneOneQueueInplace() { - Y_DEBUG_ABORT_UNLESS(Head() == 0); - delete ReadFrom; + if constexpr (!std::is_same_v) { + while (T x = Pop()) { + D::Destroy(x); + } + delete ReadFrom; + } else { + TChunk* next = ReadFrom; + do { + TChunk* head = next; + next = AtomicLoad(&head->Next); + delete head; + } while (next); + } } struct TPtrCleanDestructor { - static inline void Destroy(TOneOneQueueInplace* x) noexcept { - while (T head = x->Pop()) - delete head; + static inline void Destroy(TOneOneQueueInplace* x) noexcept { + while (T head = x->Pop()) { + ::CheckedDelete(head); + } delete x; } }; struct TCleanDestructor { - static inline void Destroy(TOneOneQueueInplace* x) noexcept { - while (x->Pop() != nullptr) - continue; - delete x; - } - }; - - struct TPtrCleanInplaceMallocDestructor { - template - static inline void Destroy(TOneOneQueueInplace* x) noexcept { - while (TPtrVal* head = x->Pop()) { - head->~TPtrVal(); - free(head); - } + static inline void Destroy(TOneOneQueueInplace* x) noexcept { delete x; } }; - void Push(T x) noexcept { - if (WritePosition != TChunk::EntriesCount) { - AtomicStore(&WriteTo->Entries[WritePosition], x); - ++WritePosition; - } else { + void Push(T x) { + if (WritePosition == TChunk::EntriesCount) [[unlikely]] { TChunk* next = new TChunk(); - next->Entries[0] = x; + AtomicStore(&next->Entries[0], x); AtomicStore(&WriteTo->Next, next); WriteTo = next; WritePosition = 1; + } else { + AtomicStore(&WriteTo->Entries[WritePosition++], x); } } T Head() { TChunk* head = ReadFrom; - if (ReadPosition != TChunk::EntriesCount) { - return AtomicLoad(&head->Entries[ReadPosition]); - } else if (TChunk* next = AtomicLoad(&head->Next)) { - ReadFrom = next; + if (ReadPosition == TChunk::EntriesCount) [[unlikely]] { + TChunk* next = AtomicLoad(&head->Next); + if (!next) { + return T{}; + } delete head; + head = next; + ReadFrom = next; ReadPosition = 0; - return Head(); } - return T{}; + return AtomicLoad(&head->Entries[ReadPosition]); } T Pop() { T ret = Head(); - if (ret) + if (ret) { ++ReadPosition; + } return ret; }