Skip to content

Commit 208bbc8

Browse files
committed
Make TOneOneQueueInplace usable without an extra pointer layer
1 parent cc78f3e commit 208bbc8

File tree

3 files changed

+66
-55
lines changed

3 files changed

+66
-55
lines changed

ydb/core/tablet_flat/logic_redo_entry.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,16 @@ namespace NRedo {
1111

1212
struct TEntry {
1313
template<typename ... Args>
14-
static TEntry* Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
14+
static std::unique_ptr<TEntry> Create(NTable::TTxStamp stamp, TArrayRef<const ui32> affects, Args&& ... args)
1515
{
16-
auto *ptr = malloc(sizeof(TEntry) + affects.size() * sizeof(ui32));
16+
void* ptr = ::operator new(sizeof(TEntry) + affects.size() * sizeof(ui32));
1717

18-
return ::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...);
18+
return std::unique_ptr<TEntry>(::new(ptr) TEntry(stamp, affects, std::forward<Args>(args)...));
1919
}
2020

21-
void operator delete (void *p)
21+
void operator delete(void* p)
2222
{
23-
free(p);
23+
::operator delete(p);
2424
}
2525

2626
void Describe(IOutputStream &out) const

ydb/core/tablet_flat/logic_redo_queue.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ namespace NRedo {
1919
struct TQueue {
2020
using TStamp = NTable::TTxStamp;
2121
using TAffects = TArrayRef<const ui32>;
22+
using TLog = TOneOneQueueInplace<TEntry*, 4096, TDelete>;
2223

2324
TQueue(THashMap<ui32, NTable::TSnapEdge> edges)
24-
: Log(new TLog)
25-
, Edges(std::move(edges))
25+
: Edges(std::move(edges))
2626
{
2727

2828
}
@@ -45,13 +45,14 @@ namespace NRedo {
4545
<< " (" << Memory << " mem" << ", " << LargeGlobIdsBytes << " raw)b }";
4646
}
4747

48-
void Push(TEntry *entry)
48+
void Push(std::unique_ptr<TEntry>&& entryPtr)
4949
{
50+
TEntry* entry = entryPtr.get();
5051
if (bool(entry->Embedded) == bool(entry->LargeGlobId)) {
5152
Y_TABLET_ERROR(NFmt::Do(*entry) << " has incorrect payload");
5253
}
5354

54-
Log->Push(entry);
55+
Log.Push(entryPtr.release());
5556

5657
Items++;
5758
Memory += entry->BytesMem();
@@ -92,15 +93,16 @@ namespace NRedo {
9293
for (auto &it : Overhead)
9394
it.second.Clear();
9495

95-
auto was = std::exchange(Log, new TLog);
96+
TLog was;
97+
was.Swap(Log);
9698

9799
Items = 0;
98100
Memory = 0;
99101
LargeGlobIdsBytes = 0;
100102

101103
auto logos = snap.MutableNonSnapLogBodies();
102104

103-
while (TAutoPtr<TEntry> entry = was->Pop()) {
105+
while (auto entry = std::unique_ptr<TEntry>(was.Pop())) {
104106
if (entry->FilterTables(Edges)) {
105107
for (const auto& blobId : entry->LargeGlobId.Blobs()) {
106108
LogoBlobIDFromLogoBlobID(blobId, logos->Add());
@@ -116,7 +118,7 @@ namespace NRedo {
116118

117119
entry->References = 0;
118120

119-
Push(entry.Release());
121+
Push(std::move(entry));
120122
} else {
121123
Y_ENSURE(entry->References == 0);
122124
}
@@ -134,9 +136,7 @@ namespace NRedo {
134136
return Usage;
135137
}
136138

137-
using TLog = TOneOneQueueInplace<NRedo::TEntry *, 4096>;
138-
139-
TAutoPtr<TLog, TLog::TPtrCleanInplaceMallocDestructor> Log;
139+
TLog Log;
140140
THashMap<ui32, NTable::TSnapEdge> Edges;
141141
THashMap<ui32, TOverhead> Overhead;
142142
TIntrusiveList<TOverhead> Changes;

ydb/library/actors/util/queue_oneone_inplace.h

Lines changed: 51 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,15 @@
33
#include "defs.h"
44
#include "queue_chunk.h"
55

6-
template <typename T, ui32 TSize, typename TChunk = TQueueChunk<T, TSize>>
6+
template <typename T, ui32 TSize, class D = TNoAction, typename TChunk = TQueueChunk<T, TSize>>
77
class TOneOneQueueInplace : TNonCopyable {
8-
static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::valuer");
8+
static_assert(std::is_integral<T>::value || std::is_pointer<T>::value, "expect std::is_integral<T>::value || std::is_pointer<T>::value");
99

1010
TChunk* ReadFrom;
1111
ui32 ReadPosition;
1212
ui32 WritePosition;
1313
TChunk* WriteTo;
1414

15-
friend class TReadIterator;
16-
1715
public:
1816
class TReadIterator {
1917
TChunk* ReadFrom;
@@ -28,14 +26,15 @@ class TOneOneQueueInplace : TNonCopyable {
2826

2927
inline T Next() {
3028
TChunk* head = ReadFrom;
31-
if (ReadPosition != TChunk::EntriesCount) {
32-
return AtomicLoad(&head->Entries[ReadPosition++]);
33-
} else if (TChunk* next = AtomicLoad(&head->Next)) {
34-
ReadFrom = next;
29+
if (ReadPosition == TChunk::EntriesCount) [[unlikely]] {
30+
head = AtomicLoad(&head->Next);
31+
if (!head) {
32+
return T{};
33+
}
34+
ReadFrom = head;
3535
ReadPosition = 0;
36-
return Next();
3736
}
38-
return T{};
37+
return AtomicLoad(&head->Entries[ReadPosition++]);
3938
}
4039
};
4140

@@ -48,71 +47,83 @@ class TOneOneQueueInplace : TNonCopyable {
4847
}
4948

5049
~TOneOneQueueInplace() {
51-
Y_DEBUG_ABORT_UNLESS(Head() == 0);
52-
delete ReadFrom;
53-
}
54-
55-
struct TPtrCleanDestructor {
56-
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
57-
while (T head = x->Pop())
50+
if constexpr (!std::is_same_v<D, TNoAction>) {
51+
while (T x = Pop()) {
52+
D::Destroy(x);
53+
}
54+
delete ReadFrom;
55+
} else {
56+
TChunk* next = ReadFrom;
57+
do {
58+
TChunk* head = next;
59+
next = AtomicLoad(&head->Next);
5860
delete head;
59-
delete x;
61+
} while (next);
6062
}
61-
};
63+
}
6264

6365
struct TCleanDestructor {
6466
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
65-
while (x->Pop() != nullptr)
66-
continue;
6767
delete x;
6868
}
6969
};
7070

71-
struct TPtrCleanInplaceMallocDestructor {
72-
template <typename TPtrVal>
73-
static inline void Destroy(TOneOneQueueInplace<TPtrVal*, TSize>* x) noexcept {
74-
while (TPtrVal* head = x->Pop()) {
75-
head->~TPtrVal();
76-
free(head);
71+
struct TPtrCleanDestructor {
72+
static inline void Destroy(TOneOneQueueInplace<T, TSize>* x) noexcept {
73+
while (T head = x->Pop()) {
74+
::CheckedDelete(head);
7775
}
7876
delete x;
7977
}
8078
};
8179

82-
void Push(T x) noexcept {
83-
if (WritePosition != TChunk::EntriesCount) {
84-
AtomicStore(&WriteTo->Entries[WritePosition], x);
85-
++WritePosition;
86-
} else {
80+
void Push(T x) {
81+
if (WritePosition == TChunk::EntriesCount) [[unlikely]] {
8782
TChunk* next = new TChunk();
88-
next->Entries[0] = x;
83+
AtomicStore(&next->Entries[0], x);
8984
AtomicStore(&WriteTo->Next, next);
9085
WriteTo = next;
9186
WritePosition = 1;
87+
} else {
88+
AtomicStore(&WriteTo->Entries[WritePosition++], x);
9289
}
9390
}
9491

9592
T Head() {
9693
TChunk* head = ReadFrom;
97-
if (ReadPosition != TChunk::EntriesCount) {
98-
return AtomicLoad(&head->Entries[ReadPosition]);
99-
} else if (TChunk* next = AtomicLoad(&head->Next)) {
100-
ReadFrom = next;
94+
if (ReadPosition == TChunk::EntriesCount) [[unlikely]] {
95+
TChunk* next = AtomicLoad(&head->Next);
96+
if (!next) {
97+
return T{};
98+
}
10199
delete head;
100+
head = next;
101+
ReadFrom = next;
102102
ReadPosition = 0;
103-
return Head();
104103
}
105-
return T{};
104+
return AtomicLoad(&head->Entries[ReadPosition]);
106105
}
107106

108107
T Pop() {
109108
T ret = Head();
110-
if (ret)
109+
if (ret) {
111110
++ReadPosition;
111+
}
112112
return ret;
113113
}
114114

115115
TReadIterator Iterator() {
116116
return TReadIterator(ReadFrom, ReadPosition);
117117
}
118+
119+
/**
120+
* Swap must synchronize with both reader and writer
121+
*/
122+
void Swap(TOneOneQueueInplace& other) {
123+
using std::swap;
124+
swap(ReadFrom, other.ReadFrom);
125+
swap(ReadPosition, other.ReadPosition);
126+
swap(WritePosition, other.WritePosition);
127+
swap(WriteTo, other.WriteTo);
128+
}
118129
};

0 commit comments

Comments
 (0)