Skip to content

[NOT FOR MERGE] report memory allocated with tcmalloc to RM #21499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
29 changes: 29 additions & 0 deletions ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,35 @@ namespace NYql::NDq {
});
}

bool AllocateQuota(ui64 bytes, NKikimr::NMiniKQL::TScopedAlloc* alloc) {
bool allocated = MemoryLimits.MemoryQuotaManager->AllocateQuota(bytes);
if (MemoryLimits.MemoryQuotaManager->IsReasonableToUseSpilling()) {
alloc->SetMaximumLimitValueReached(true);
} else {
alloc->SetMaximumLimitValueReached(false);
}

if (MkqlMemoryQuota) {
MkqlMemoryQuota->Add(bytes);
}
return allocated;
}

void FreeQuota(ui64 bytes, NKikimr::NMiniKQL::TScopedAlloc* alloc) {
if (!MkqlMemoryLimit) {
return;
}
MemoryLimits.MemoryQuotaManager->FreeQuota(bytes);
if (MemoryLimits.MemoryQuotaManager->IsReasonableToUseSpilling()) {
alloc->SetMaximumLimitValueReached(true);
} else {
alloc->SetMaximumLimitValueReached(false);
}
if (MkqlMemoryQuota) {
MkqlMemoryQuota->Sub(bytes);
}
}

void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) {
if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) {
alloc->ReleaseFreePages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,12 @@ class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActo
}

if (this->Task.GetEnableSpilling()) {
TaskRunner->SetSpillerFactory(std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()));
auto spillerFactory = std::make_shared<TDqSpillerFactory>(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback());
spillerFactory->SetMemoryUsageReporter(std::make_shared<NKikimr::NMiniKQL::TMemoryUsageReporter>(
[this, alloc](ui64 bytes){ return this->MemoryQuota->AllocateQuota(bytes, alloc); },
[this, alloc](ui64 bytes){ this->MemoryQuota->FreeQuota(bytes, alloc); }
));
TaskRunner->SetSpillerFactory(spillerFactory);
}

TaskRunner->Prepare(this->Task, limits, execCtx);
Expand Down
9 changes: 9 additions & 0 deletions ydb/library/yql/dq/actors/spilling/spiller_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ class TDqSpillerFactory : public NKikimr::NMiniKQL::ISpillerFactory
SpillingTaskCounters_ = spillingTaskCounters;
}

void SetMemoryUsageReporter(NKikimr::NMiniKQL::TMemoryUsageReporter::TPtr memoryUsageReporter) override {
MemoryUsageReporter_ = memoryUsageReporter;
}

NKikimr::NMiniKQL::TMemoryUsageReporter::TPtr GetMemoryUsageReporter() const override {
return MemoryUsageReporter_;
}

NKikimr::NMiniKQL::ISpiller::TPtr CreateSpiller() override {
return std::make_shared<TDqComputeStorage>(TxId_, WakeUpCallback_, ErrorCallback_, SpillingTaskCounters_, ActorSystem_);
}
Expand All @@ -34,6 +42,7 @@ class TDqSpillerFactory : public NKikimr::NMiniKQL::ISpillerFactory
TWakeUpCallback WakeUpCallback_;
TErrorCallback ErrorCallback_;
TIntrusivePtr<TSpillingTaskCounters> SpillingTaskCounters_;
NKikimr::NMiniKQL::TMemoryUsageReporter::TPtr MemoryUsageReporter_;
};

} // namespace NYql::NDq
2 changes: 1 addition & 1 deletion ydb/library/yql/dq/common/dq_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ class TSpillingSettings {

private:
const ui64 Mask = 0;
};

};
} // namespace NYql::NDq

IOutputStream& operator<<(IOutputStream& stream, const NYql::NDq::TTxId& txId);
4 changes: 2 additions & 2 deletions ydb/library/yql/dq/runtime/dq_output_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TDqOutputChannel : public IDqOutputChannel {
const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc,
NDqProto::EDataTransportVersion transportVersion)
: OutputType(outputType)
, Packer(OutputType, settings.BufferPageAllocSize)
, Packer(OutputType, 4_KB)
, Width(OutputType->IsMulti() ? static_cast<NMiniKQL::TMultiType*>(OutputType)->GetElementsCount() : 1u)
, Storage(settings.ChannelStorage)
, HolderFactory(holderFactory)
Expand Down Expand Up @@ -391,7 +391,7 @@ class TDqOutputChannel : public IDqOutputChannel {
}

void Finish() override {
LOG("Finish request");
// LOG("Finish request");
Finished = true;
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ class TDqTaskRunner : public IDqTaskRunner {
SpillerFactory = spillerFactory;
}



TString GetOutputDebugString() override {
if (AllocatedHolder->Output) {
switch (AllocatedHolder->Output->GetFillLevel()) {
Expand Down Expand Up @@ -760,7 +762,7 @@ class TDqTaskRunner : public IDqTaskRunner {
}

ERunStatus Run() final {
LOG(TStringBuilder() << "Run task: " << TaskId);
// LOG(TStringBuilder() << "Run task: " << TaskId);
if (!AllocatedHolder->ResultStream) {
auto guard = BindAllocator();
TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator());
Expand Down
109 changes: 104 additions & 5 deletions yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

#include "mkql_counters.h"
#include "mkql_rh_hash.h"
#include "mkql_wide_combine.h"
Expand Down Expand Up @@ -407,6 +408,18 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {

EBucketState BucketState = EBucketState::InMemory;
ui64 LineCount = 0;

/// Get total memory size used by spilling adapters in this bucket
size_t GetTotalSpillerMemorySize() const {
size_t total = 0;
if (SpilledState) {
total += SpilledState->GetBufferSize();
}
if (SpilledData) {
total += SpilledData->GetBufferSize();
}
return total;
}
};

enum class EOperatingMode {
Expand Down Expand Up @@ -609,15 +622,33 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
ui32 largestInMemoryBucketNum = (ui32)-1;
for (ui64 i = 0; i < SpilledBucketCount; ++i) {
if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory) {
if (SpilledBuckets[i].LineCount >= maxSize) {
// Consider both line count and spiller buffer memory usage
ui64 bucketSize = SpilledBuckets[i].LineCount + (SpilledBuckets[i].GetTotalSpillerMemorySize() / 1024); // Convert bytes to approx. line equivalents
if (bucketSize >= maxSize) {
largestInMemoryBucketNum = i;
maxSize = SpilledBuckets[i].LineCount;
maxSize = bucketSize;
}
}
}
return largestInMemoryBucketNum;
}

ui32 GetBucketWithLargestSpillerBuffer() const {
size_t maxSpillerSize = 0;
ui32 bucketWithLargestSpiller = (ui32)-1;
for (ui64 i = 0; i < SpilledBucketCount; ++i) {
if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory ||
SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::SpillingData) {
size_t spillerSize = SpilledBuckets[i].GetTotalSpillerMemorySize();
if (spillerSize > maxSpillerSize) {
bucketWithLargestSpiller = i;
maxSpillerSize = spillerSize;
}
}
}
return bucketWithLargestSpiller;
}

bool IsSpillingWhileStateSplitAllowed() const {
// TODO: Write better condition here. For example: InMemorybuckets > 64
return true;
Expand Down Expand Up @@ -679,6 +710,20 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
}

if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed()) {
// First, try to spill spiller buffers if yellow zone is reached
ui32 bucketWithLargestSpiller = GetBucketWithLargestSpillerBuffer();
if (bucketWithLargestSpiller != (ui32)-1) {
auto& bucket = SpilledBuckets[bucketWithLargestSpiller];
size_t spillerSize = bucket.GetTotalSpillerMemorySize();
// If spiller buffer is reasonably large (> 1MB), try to spill it first
if (spillerSize > 10_KB && ForceSpillSpillerBuffers(bucket)) {
UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info,
TStringBuilder() << "Force spilling spiller buffers during state split from bucket "
<< bucketWithLargestSpiller << " size=" << (spillerSize/10_KB) << "MB");
return true;
}
}

ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber();

SplitStateSpillingBucket = bucketNumToSpill;
Expand Down Expand Up @@ -785,6 +830,33 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
SpillingBucketsCount--;
}

// Force spill spiller buffers for a bucket to reduce memory usage
bool ForceSpillSpillerBuffers(TSpilledBucket& bucket) {
if (bucket.AsyncWriteOperation.has_value()) {
return false; // Already spilling
}

bool spillingStarted = false;

// Force spill state buffer if it has data
if (bucket.SpilledState && bucket.SpilledState->GetBufferSize() > 0) {
bucket.AsyncWriteOperation = bucket.SpilledState->FinishWriting();
if (bucket.AsyncWriteOperation) {
spillingStarted = true;
}
}

// Force spill data buffer if it has data and no state spilling is active
if (!spillingStarted && bucket.SpilledData && bucket.SpilledData->GetBufferSize() > 0) {
bucket.AsyncWriteOperation = bucket.SpilledData->FinishWriting();
if (bucket.AsyncWriteOperation) {
spillingStarted = true;
}
}

return spillingStarted;
}

void UpdateSpillingBuckets() {
for (ui64 i = 0; i < SpilledBucketCount; ++i) {
auto& bucket = SpilledBuckets[i];
Expand All @@ -796,7 +868,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
SpillMoreStateFromBucket(bucket);

} else {
bucket.SpilledData->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue());
// Complete async write for either SpilledData or forced spiller buffer spilling
// We determine which one by checking which has more data
if (bucket.SpilledData && bucket.SpilledData->GetBufferSize() > 0) {
bucket.SpilledData->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue());
} else if (bucket.SpilledState && bucket.SpilledState->GetBufferSize() > 0) {
bucket.SpilledState->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue());
} else {
// Default to SpilledData for backward compatibility
bucket.SpilledData->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue());
}
bucket.AsyncWriteOperation = std::nullopt;
}
}
Expand All @@ -807,6 +888,22 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
if (SpillingBucketsCount > 0) {
return true;
}

// First, try to spill spiller buffers from buckets with large buffers
ui32 bucketWithLargestSpiller = GetBucketWithLargestSpillerBuffer();
if (bucketWithLargestSpiller != (ui32)-1) {
auto& bucket = SpilledBuckets[bucketWithLargestSpiller];
size_t spillerSize = bucket.GetTotalSpillerMemorySize();
// If spiller buffer is reasonably large (> 1MB), try to spill it first
if (spillerSize > 1_MB && ForceSpillSpillerBuffers(bucket)) {
UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info,
TStringBuilder() << "Force spilling spiller buffers from bucket "
<< bucketWithLargestSpiller << " size=" << (spillerSize/1_MB) << "MB");
return true;
}
}

// If no spiller buffers to spill or they are small, proceed with regular bucket spilling
while (InMemoryBucketsCount > 0) {
ui32 maxLineBucketInd = GetLargestInMemoryBucketNumber();
MKQL_ENSURE(maxLineBucketInd != (ui32)-1, "Internal logic error");
Expand Down Expand Up @@ -887,9 +984,10 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
SpilledBuckets.resize(SpilledBucketCount);
auto spiller = Ctx.SpillerFactory->CreateSpiller();
auto memoryReporter = Ctx.SpillerFactory->GetMemoryUsageReporter();
for (auto &b: SpilledBuckets) {
b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, KeyAndStateType, 5_MB);
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, UsedInputItemType, 5_MB);
b.SpilledState = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, memoryReporter, KeyAndStateType, 5_MB);
b.SpilledData = std::make_unique<TWideUnboxedValuesSpillerAdapter>(spiller, memoryReporter, UsedInputItemType, 5_MB);
b.InMemoryProcessingState = std::make_unique<TState>(MemInfo, KeyWidth,
KeyAndStateType->GetElementsCount() - KeyWidth, Hasher, Equal, Logger, LogComponent, false);
}
Expand Down Expand Up @@ -2041,3 +2139,4 @@ IComputationNode* WrapWideLastCombinerWithSpilling(TCallable& callable, const TC

}
}

2 changes: 1 addition & 1 deletion yql/essentials/minikql/comp_nodes/mkql_wide_top_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ using TBase = TComputationValue<TSpillingSupportState>;
case EOperatingMode::Spilling:
{
const size_t PACK_SIZE = 5_MB;
SpilledStates.emplace_back(std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, TupleMultiType, PACK_SIZE));
SpilledStates.emplace_back(std::make_unique<TWideUnboxedValuesSpillerAdapter>(Spiller, nullptr, TupleMultiType, PACK_SIZE));
break;
}
case EOperatingMode::ProcessSpilled:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ struct TComputationContext : public TComputationContextLLVM {
const NUdf::ILogProvider* LogProvider;
NYql::TLangVersion LangVer = NYql::UnknownLangVersion;


TComputationContext(const THolderFactory& holderFactory,
const NUdf::IValueBuilder* builder,
const TComputationOptsFull& opts,
Expand Down
57 changes: 57 additions & 0 deletions yql/essentials/minikql/computation/mkql_memory_usage_reporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include <util/generic/yexception.h>
#include <util/system/types.h>

#include <functional>
#include <format>

namespace NKikimr::NMiniKQL {

class TMemoryUsageReporter {
public:
using TAllocateReportCallback = std::function<bool(ui64)>;
using TFreeReportCallback = std::function<void(ui64)>;
using TPtr = std::shared_ptr<TMemoryUsageReporter>;
public:
TMemoryUsageReporter(TAllocateReportCallback reportAllocateCallback, TFreeReportCallback reportFreeCallback): ReportAllocateCallback_(reportAllocateCallback), ReportFreeCallback_(reportFreeCallback) {}
void ReportAllocate(ui64 bytes) {
// Cerr << "[MISHA][ALLOC]: " << bytes << ", Total: " << BytesAllocated_ << Endl;
Y_ENSURE(ReportAllocateCallback_ != nullptr);
if (ReportAllocateCallback_(bytes)) {
BytesAllocated_ += bytes;
}
}

void ReportFree(ui64 bytes) {
// Cerr << "[MISHA][FREE]: " << bytes << ", Total: " << BytesAllocated_ << Endl;
Y_ENSURE(ReportFreeCallback_ != nullptr);
ui64 toFree = std::min(BytesAllocated_, bytes);
if (toFree) {
ReportFreeCallback_(bytes);
}
BytesAllocated_ -= toFree;
}

~TMemoryUsageReporter() {
// used only for test purposes. Must be changed to
// ReportFreeCallback_(BytesAllocated_);
// Cerr << "[MISHA][DESTR]: " << "Total: " << BytesAllocated_ << Endl;
if (BytesAllocated_) {
// WHY??
ReportFreeCallback_(BytesAllocated_);
}
Cerr << std::format("[MISHA] Bytes not freed: {}\n", BytesAllocated_);
// Y_ENSURE(BytesAllocated_ == 0, "Memory leak");
}

private:
ui64 BytesAllocated_{0};
TAllocateReportCallback ReportAllocateCallback_{nullptr};
TFreeReportCallback ReportFreeCallback_{nullptr};
};

}//namespace NKikimr::NMiniKQL



Loading
Loading