diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h index 0c4e83531c73..e2c3423c1ec1 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_memory_quota.h @@ -73,6 +73,35 @@ namespace NYql::NDq { }); } + bool AllocateQuota(ui64 bytes, NKikimr::NMiniKQL::TScopedAlloc* alloc) { + bool allocated = MemoryLimits.MemoryQuotaManager->AllocateQuota(bytes); + if (MemoryLimits.MemoryQuotaManager->IsReasonableToUseSpilling()) { + alloc->SetMaximumLimitValueReached(true); + } else { + alloc->SetMaximumLimitValueReached(false); + } + + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Add(bytes); + } + return allocated; + } + + void FreeQuota(ui64 bytes, NKikimr::NMiniKQL::TScopedAlloc* alloc) { + if (!MkqlMemoryLimit) { + return; + } + MemoryLimits.MemoryQuotaManager->FreeQuota(bytes); + if (MemoryLimits.MemoryQuotaManager->IsReasonableToUseSpilling()) { + alloc->SetMaximumLimitValueReached(true); + } else { + alloc->SetMaximumLimitValueReached(false); + } + if (MkqlMemoryQuota) { + MkqlMemoryQuota->Sub(bytes); + } + } + void TryShrinkMemory(NKikimr::NMiniKQL::TScopedAlloc* alloc) { if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) { alloc->ReleaseFreePages(); diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index 90bd0a1a8eaa..f96c7b471ef0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -234,7 +234,12 @@ class TDqSyncComputeActorBase: public TDqComputeActorBaseTask.GetEnableSpilling()) { - TaskRunner->SetSpillerFactory(std::make_shared(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback())); + auto spillerFactory = std::make_shared(execCtx.GetTxId(), NActors::TActivationContext::ActorSystem(), execCtx.GetWakeupCallback(), execCtx.GetErrorCallback()); + spillerFactory->SetMemoryReportingCallbacks( + [this, alloc](ui64 bytes){ return this->MemoryQuota->AllocateQuota(bytes, alloc); }, + [this, alloc](ui64 bytes){ this->MemoryQuota->FreeQuota(bytes, alloc); } + ); + TaskRunner->SetSpillerFactory(spillerFactory); } TaskRunner->Prepare(this->Task, limits, execCtx); diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp index 04dab7ff070f..770a545a9b59 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.cpp +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.cpp @@ -8,7 +8,10 @@ namespace NYql::NDq { using namespace NActors; TDqComputeStorage::TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, - TIntrusivePtr spillingTaskCounters, TActorSystem* actorSystem) : ActorSystem_(actorSystem) { + TIntrusivePtr spillingTaskCounters, TSpillerMemoryUsageReporter::TPtr memoryUsageReporter, TActorSystem* actorSystem) + : ActorSystem_(actorSystem) + , MemoryUsageReporter_(memoryUsageReporter) +{ TStringStream spillerName; spillerName << "Spiller" << "_" << CreateGuidAsString(); ComputeStorageActor_ = CreateDqComputeStorageActor(txId, spillerName.Str(), wakeUpCallback, errorCallback, spillingTaskCounters); @@ -44,6 +47,15 @@ NThreading::TFuture> TDqComputeStorage::Extract(TK return GetInternal(key, true); } +void TDqComputeStorage::ReportAlloc(ui64 bytes) { + // TODO: collect 10KB and only then report + MemoryUsageReporter_->ReportAlloc(bytes); +} + +void TDqComputeStorage::ReportFree(ui64 bytes) { + MemoryUsageReporter_->ReportFree(bytes); +} + NThreading::TFuture> TDqComputeStorage::GetInternal(TKey key, bool removeBlobAfterRead) { auto promise = NThreading::NewPromise>(); diff --git a/ydb/library/yql/dq/actors/spilling/compute_storage.h b/ydb/library/yql/dq/actors/spilling/compute_storage.h index 6b7314525c9c..9f9f7f26993a 100644 --- a/ydb/library/yql/dq/actors/spilling/compute_storage.h +++ b/ydb/library/yql/dq/actors/spilling/compute_storage.h @@ -1,6 +1,7 @@ #pragma once #include "compute_storage_actor.h" +#include "spiller_memory_reporter.h" #include #include @@ -17,7 +18,7 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller { public: TDqComputeStorage(TTxId txId, TWakeUpCallback wakeUpCallback, TErrorCallback errorCallback, - TIntrusivePtr spillingTaskCounters, NActors::TActorSystem* actorSystem); + TIntrusivePtr spillingTaskCounters, TSpillerMemoryUsageReporter::TPtr memoryUsageReporter, NActors::TActorSystem* actorSystem); ~TDqComputeStorage(); @@ -29,12 +30,16 @@ class TDqComputeStorage : public NKikimr::NMiniKQL::ISpiller NThreading::TFuture Delete(TKey key) override; + void ReportAlloc(ui64 bytes) override; + void ReportFree(ui64 bytes) override; + private: NThreading::TFuture> GetInternal(TKey key, bool removeBlobAfterRead); NActors::TActorSystem* ActorSystem_; IDqComputeStorageActor* ComputeStorageActor_; NActors::TActorId ComputeStorageActorId_; + TSpillerMemoryUsageReporter::TPtr MemoryUsageReporter_; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spiller_factory.h b/ydb/library/yql/dq/actors/spilling/spiller_factory.h index b458f17f49b1..af6d695fe0d2 100644 --- a/ydb/library/yql/dq/actors/spilling/spiller_factory.h +++ b/ydb/library/yql/dq/actors/spilling/spiller_factory.h @@ -1,6 +1,7 @@ #pragma once #include "compute_storage.h" +#include "spiller_memory_reporter.h" #include #include @@ -24,8 +25,13 @@ class TDqSpillerFactory : public NKikimr::NMiniKQL::ISpillerFactory SpillingTaskCounters_ = spillingTaskCounters; } + void SetMemoryReportingCallbacks(std::function reportAlloc, std::function reportFree) override { + ReportFreeCallback_ = reportFree; + ReportAllocCallback_ = reportAlloc; + } + NKikimr::NMiniKQL::ISpiller::TPtr CreateSpiller() override { - return std::make_shared(TxId_, WakeUpCallback_, ErrorCallback_, SpillingTaskCounters_, ActorSystem_); + return std::make_shared(TxId_, WakeUpCallback_, ErrorCallback_, SpillingTaskCounters_, MakeSpillerMemoryUsageReporter(ReportAllocCallback_, ReportFreeCallback_), ActorSystem_); } private: @@ -34,6 +40,8 @@ class TDqSpillerFactory : public NKikimr::NMiniKQL::ISpillerFactory TWakeUpCallback WakeUpCallback_; TErrorCallback ErrorCallback_; TIntrusivePtr SpillingTaskCounters_; + TSpillerMemoryUsageReporter::TReportFreeCallback ReportFreeCallback_ = nullptr; + TSpillerMemoryUsageReporter::TReportAllocCallback ReportAllocCallback_ = nullptr; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/spiller_memory_reporter.cpp b/ydb/library/yql/dq/actors/spilling/spiller_memory_reporter.cpp new file mode 100644 index 000000000000..c151c68d537d --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spiller_memory_reporter.cpp @@ -0,0 +1,46 @@ +#include "spiller_memory_reporter.h" + +#include + +namespace NYql::NDq { + +TSpillerMemoryUsageReporter::TSpillerMemoryUsageReporter( + TReportAllocCallback reportAllocCallback, + TReportFreeCallback reportFreeCallback) + : ReportAllocCallback_(reportAllocCallback) + , ReportFreeCallback_(reportFreeCallback) +{} + +void TSpillerMemoryUsageReporter::ReportAlloc(ui64 bytes) { + Y_ENSURE(ReportAllocCallback_ != nullptr); + if (ReportAllocCallback_(bytes)) { + BytesAllocated_ += bytes; + } +} + +void TSpillerMemoryUsageReporter::ReportFree(ui64 bytes) { + Y_ENSURE(ReportFreeCallback_ != nullptr); + ui64 toFree = std::min(BytesAllocated_, bytes); + if (toFree) { + ReportFreeCallback_(bytes); + } + BytesAllocated_ -= toFree; +} + +TSpillerMemoryUsageReporter::~TSpillerMemoryUsageReporter() { + if (BytesAllocated_) { + ReportFreeCallback_(BytesAllocated_); + } + Cerr << std::format("[MISHA] Bytes not freed: {}\n", BytesAllocated_); + // Y_ENSURE(BytesAllocated_ == 0, "Memory leak"); +} + +TSpillerMemoryUsageReporter::TPtr MakeSpillerMemoryUsageReporter( + TSpillerMemoryUsageReporter::TReportAllocCallback reportAllocCallback, + TSpillerMemoryUsageReporter::TReportFreeCallback reportFreeCallback) +{ + return std::make_shared(reportAllocCallback, reportFreeCallback); +} + +} // namespace NYql::NDq + diff --git a/ydb/library/yql/dq/actors/spilling/spiller_memory_reporter.h b/ydb/library/yql/dq/actors/spilling/spiller_memory_reporter.h new file mode 100644 index 000000000000..8b31754d95b6 --- /dev/null +++ b/ydb/library/yql/dq/actors/spilling/spiller_memory_reporter.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include + +#include +#include + +namespace NYql::NDq { + +class TSpillerMemoryUsageReporter { +public: + using TReportAllocCallback = std::function; + using TReportFreeCallback = std::function; + using TPtr = std::shared_ptr; + +public: + TSpillerMemoryUsageReporter(TReportAllocCallback reportAllocCallback, TReportFreeCallback reportFreeCallback); + ~TSpillerMemoryUsageReporter(); + + void ReportAlloc(ui64 bytes); + void ReportFree(ui64 bytes); + +private: + ui64 BytesAllocated_{0}; + TReportAllocCallback ReportAllocCallback_{nullptr}; + TReportFreeCallback ReportFreeCallback_{nullptr}; +}; + +TSpillerMemoryUsageReporter::TPtr MakeSpillerMemoryUsageReporter( + TSpillerMemoryUsageReporter::TReportAllocCallback reportAllocCallback, + TSpillerMemoryUsageReporter::TReportFreeCallback reportFreeCallback); + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/spilling/ya.make b/ydb/library/yql/dq/actors/spilling/ya.make index ac9daf1fa0e6..f0de59b1cc81 100644 --- a/ydb/library/yql/dq/actors/spilling/ya.make +++ b/ydb/library/yql/dq/actors/spilling/ya.make @@ -8,6 +8,7 @@ SRCS( spilling_counters.cpp spilling_file.cpp spilling.cpp + spiller_memory_reporter.cpp ) PEERDIR( diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index e3563fa2d0ec..9a49306c1664 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -124,8 +124,8 @@ class TSpillingSettings { private: const ui64 Mask = 0; -}; +}; } // namespace NYql::NDq IOutputStream& operator<<(IOutputStream& stream, const NYql::NDq::TTxId& txId); diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 8821ad7bfbec..a605da4cadb2 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -34,7 +34,7 @@ class TDqOutputChannel : public IDqOutputChannel { const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc, NDqProto::EDataTransportVersion transportVersion) : OutputType(outputType) - , Packer(OutputType, settings.BufferPageAllocSize) + , Packer(OutputType, 4_KB) , Width(OutputType->IsMulti() ? static_cast(OutputType)->GetElementsCount() : 1u) , Storage(settings.ChannelStorage) , HolderFactory(holderFactory) @@ -391,7 +391,7 @@ class TDqOutputChannel : public IDqOutputChannel { } void Finish() override { - LOG("Finish request"); + // LOG("Finish request"); Finished = true; } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 80c1c86a571b..4fd5bdb62d76 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -299,6 +299,8 @@ class TDqTaskRunner : public IDqTaskRunner { SpillerFactory = spillerFactory; } + + TString GetOutputDebugString() override { if (AllocatedHolder->Output) { switch (AllocatedHolder->Output->GetFillLevel()) { @@ -760,7 +762,7 @@ class TDqTaskRunner : public IDqTaskRunner { } ERunStatus Run() final { - LOG(TStringBuilder() << "Run task: " << TaskId); + // LOG(TStringBuilder() << "Run task: " << TaskId); if (!AllocatedHolder->ResultStream) { auto guard = BindAllocator(); TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator()); diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index 68b22cd7364e..47f85fc8a2ba 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -1,3 +1,4 @@ + #include "mkql_counters.h" #include "mkql_rh_hash.h" #include "mkql_wide_combine.h" @@ -407,6 +408,18 @@ class TSpillingSupportState : public TComputationValue { EBucketState BucketState = EBucketState::InMemory; ui64 LineCount = 0; + + /// Get total memory size used by spilling adapters in this bucket + size_t GetTotalSpillerMemorySize() const { + size_t total = 0; + if (SpilledState) { + total += SpilledState->GetBufferSize(); + } + if (SpilledData) { + total += SpilledData->GetBufferSize(); + } + return total; + } }; enum class EOperatingMode { @@ -609,15 +622,33 @@ class TSpillingSupportState : public TComputationValue { ui32 largestInMemoryBucketNum = (ui32)-1; for (ui64 i = 0; i < SpilledBucketCount; ++i) { if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory) { - if (SpilledBuckets[i].LineCount >= maxSize) { + // Consider both line count and spiller buffer memory usage + ui64 bucketSize = SpilledBuckets[i].LineCount + (SpilledBuckets[i].GetTotalSpillerMemorySize() / 1024); // Convert bytes to approx. line equivalents + if (bucketSize >= maxSize) { largestInMemoryBucketNum = i; - maxSize = SpilledBuckets[i].LineCount; + maxSize = bucketSize; } } } return largestInMemoryBucketNum; } + ui32 GetBucketWithLargestSpillerBuffer() const { + size_t maxSpillerSize = 0; + ui32 bucketWithLargestSpiller = (ui32)-1; + for (ui64 i = 0; i < SpilledBucketCount; ++i) { + if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory || + SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::SpillingData) { + size_t spillerSize = SpilledBuckets[i].GetTotalSpillerMemorySize(); + if (spillerSize > maxSpillerSize) { + bucketWithLargestSpiller = i; + maxSpillerSize = spillerSize; + } + } + } + return bucketWithLargestSpiller; + } + bool IsSpillingWhileStateSplitAllowed() const { // TODO: Write better condition here. For example: InMemorybuckets > 64 return true; @@ -679,6 +710,20 @@ class TSpillingSupportState : public TComputationValue { } if (InMemoryBucketsCount && !HasMemoryForProcessing() && IsSpillingWhileStateSplitAllowed()) { + // First, try to spill spiller buffers if yellow zone is reached + ui32 bucketWithLargestSpiller = GetBucketWithLargestSpillerBuffer(); + if (bucketWithLargestSpiller != (ui32)-1) { + auto& bucket = SpilledBuckets[bucketWithLargestSpiller]; + size_t spillerSize = bucket.GetTotalSpillerMemorySize(); + // If spiller buffer is reasonably large (> 1MB), try to spill it first + if (spillerSize > 10_KB && ForceSpillSpillerBuffers(bucket)) { + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, + TStringBuilder() << "Force spilling spiller buffers during state split from bucket " + << bucketWithLargestSpiller << " size=" << (spillerSize/10_KB) << "MB"); + return true; + } + } + ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber(); SplitStateSpillingBucket = bucketNumToSpill; @@ -785,6 +830,33 @@ class TSpillingSupportState : public TComputationValue { SpillingBucketsCount--; } + // Force spill spiller buffers for a bucket to reduce memory usage + bool ForceSpillSpillerBuffers(TSpilledBucket& bucket) { + if (bucket.AsyncWriteOperation.has_value()) { + return false; // Already spilling + } + + bool spillingStarted = false; + + // Force spill state buffer if it has data + if (bucket.SpilledState && bucket.SpilledState->GetBufferSize() > 0) { + bucket.AsyncWriteOperation = bucket.SpilledState->FinishWriting(); + if (bucket.AsyncWriteOperation) { + spillingStarted = true; + } + } + + // Force spill data buffer if it has data and no state spilling is active + if (!spillingStarted && bucket.SpilledData && bucket.SpilledData->GetBufferSize() > 0) { + bucket.AsyncWriteOperation = bucket.SpilledData->FinishWriting(); + if (bucket.AsyncWriteOperation) { + spillingStarted = true; + } + } + + return spillingStarted; + } + void UpdateSpillingBuckets() { for (ui64 i = 0; i < SpilledBucketCount; ++i) { auto& bucket = SpilledBuckets[i]; @@ -796,7 +868,16 @@ class TSpillingSupportState : public TComputationValue { SpillMoreStateFromBucket(bucket); } else { - bucket.SpilledData->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue()); + // Complete async write for either SpilledData or forced spiller buffer spilling + // We determine which one by checking which has more data + if (bucket.SpilledData && bucket.SpilledData->GetBufferSize() > 0) { + bucket.SpilledData->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue()); + } else if (bucket.SpilledState && bucket.SpilledState->GetBufferSize() > 0) { + bucket.SpilledState->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue()); + } else { + // Default to SpilledData for backward compatibility + bucket.SpilledData->AsyncWriteCompleted(bucket.AsyncWriteOperation->ExtractValue()); + } bucket.AsyncWriteOperation = std::nullopt; } } @@ -807,6 +888,22 @@ class TSpillingSupportState : public TComputationValue { if (SpillingBucketsCount > 0) { return true; } + + // First, try to spill spiller buffers from buckets with large buffers + ui32 bucketWithLargestSpiller = GetBucketWithLargestSpillerBuffer(); + if (bucketWithLargestSpiller != (ui32)-1) { + auto& bucket = SpilledBuckets[bucketWithLargestSpiller]; + size_t spillerSize = bucket.GetTotalSpillerMemorySize(); + // If spiller buffer is reasonably large (> 1MB), try to spill it first + if (spillerSize > 1_MB && ForceSpillSpillerBuffers(bucket)) { + UDF_LOG(Logger, LogComponent, NUdf::ELogLevel::Info, + TStringBuilder() << "Force spilling spiller buffers from bucket " + << bucketWithLargestSpiller << " size=" << (spillerSize/1_MB) << "MB"); + return true; + } + } + + // If no spiller buffers to spill or they are small, proceed with regular bucket spilling while (InMemoryBucketsCount > 0) { ui32 maxLineBucketInd = GetLargestInMemoryBucketNumber(); MKQL_ENSURE(maxLineBucketInd != (ui32)-1, "Internal logic error"); @@ -2041,3 +2138,4 @@ IComputationNode* WrapWideLastCombinerWithSpilling(TCallable& callable, const TC } } + diff --git a/yql/essentials/minikql/computation/mkql_computation_node.h b/yql/essentials/minikql/computation/mkql_computation_node.h index 741d77371ce6..73cd53f0a263 100644 --- a/yql/essentials/minikql/computation/mkql_computation_node.h +++ b/yql/essentials/minikql/computation/mkql_computation_node.h @@ -126,6 +126,7 @@ struct TComputationContext : public TComputationContextLLVM { const NUdf::ILogProvider* LogProvider; NYql::TLangVersion LangVer = NYql::UnknownLangVersion; + TComputationContext(const THolderFactory& holderFactory, const NUdf::IValueBuilder* builder, const TComputationOptsFull& opts, diff --git a/yql/essentials/minikql/computation/mkql_spiller.h b/yql/essentials/minikql/computation/mkql_spiller.h index e0ce2706d8e6..c7639bc0d188 100644 --- a/yql/essentials/minikql/computation/mkql_spiller.h +++ b/yql/essentials/minikql/computation/mkql_spiller.h @@ -5,8 +5,7 @@ namespace NKikimr::NMiniKQL { -struct ISpiller -{ +struct ISpiller { using TPtr = std::shared_ptr; virtual ~ISpiller(){} using TKey = ui64; @@ -20,6 +19,16 @@ struct ISpiller ///Get + Delete ///Stored value may be moved to future virtual NThreading::TFuture> Extract(TKey key) = 0; + + /// This set of functions is used to report memory allocated for spilling. + /// Typically data is packed into some buffer before spilling. This buffer may not be + /// allocated with MKQL_alloc -> this memory won't be tracked. That's why the spiller provides an + /// interface to report allocated memory to the Resource Manager. + /// Expected usage is: + /// ReportAlloc(buffer_size) when the buffer is created + /// ReportFree(buffer_size) when the buffer is written to disk and freed + virtual void ReportAlloc(ui64 bytes) = 0; + virtual void ReportFree(ui64 bytes) = 0; }; }//namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter.h b/yql/essentials/minikql/computation/mkql_spiller_adapter.h index a5c3c56c1225..90dc86c42cd2 100644 --- a/yql/essentials/minikql/computation/mkql_spiller_adapter.h +++ b/yql/essentials/minikql/computation/mkql_spiller_adapter.h @@ -21,6 +21,10 @@ class TWideUnboxedValuesSpillerAdapter { { } + ui64 GetBufferSize() const { + return PackerEstimatedSize; + } + /// Write wide UV item /// \returns /// - nullopt, if thee values are accumulated @@ -29,7 +33,13 @@ class TWideUnboxedValuesSpillerAdapter { /// Design note: not using Subscribe on a Future here to avoid possible race condition std::optional> WriteWideItem(const TArrayRef& wideItem) { Packer.AddWideItem(wideItem.data(), wideItem.size()); - if (Packer.PackedSizeEstimate() > SizeLimit) { + + auto newPackerEstimatedSize = Packer.PackedSizeEstimate(); + Y_ENSURE(newPackerEstimatedSize >= PackerEstimatedSize, "Internal logic error"); + Spiller->ReportAlloc(newPackerEstimatedSize - PackerEstimatedSize); + PackerEstimatedSize = newPackerEstimatedSize; + + if (newPackerEstimatedSize > SizeLimit) { return Spiller->Put(std::move(Packer.Finish())); } else { return std::nullopt; @@ -39,11 +49,14 @@ class TWideUnboxedValuesSpillerAdapter { std::optional> FinishWriting() { if (Packer.IsEmpty()) return std::nullopt; + return Spiller->Put(std::move(Packer.Finish())); } void AsyncWriteCompleted(ISpiller::TKey key) { StoredChunks.push_back(key); + Spiller->ReportFree(PackerEstimatedSize); + PackerEstimatedSize = 0; } //Extracting interface @@ -81,6 +94,7 @@ class TWideUnboxedValuesSpillerAdapter { const TMultiType* const ItemType; const size_t SizeLimit; TValuePackerTransport Packer; + ui64 PackerEstimatedSize = 0; std::deque StoredChunks; std::optional CurrentBatch; }; diff --git a/yql/essentials/minikql/computation/mkql_spiller_adapter_ut.cpp b/yql/essentials/minikql/computation/mkql_spiller_adapter_ut.cpp new file mode 100644 index 000000000000..bfabc34e1987 --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_spiller_adapter_ut.cpp @@ -0,0 +1,329 @@ + +#include "mkql_spiller_adapter.h" +#include "mock_spiller_ut.h" +#include "mock_spiller_factory_ut.h" + +#include +#include +#include +#include +#include + +namespace NKikimr::NMiniKQL { + +namespace { + + THolderFactory CreateTestHolderFactory() { + TScopedAlloc alloc(__LOCATION__); + TMemoryUsageInfo memInfo("test"); + return THolderFactory(alloc.Ref(), memInfo); + } + + TMultiType* CreateTestMultiType(TTypeEnvironment& typeEnv) { + TTypeBuilder builder(typeEnv); + std::vector types = { + builder.NewDataType(NUdf::TDataType::Id), + builder.NewDataType(NUdf::TDataType::Id), + builder.NewDataType(NUdf::TDataType::Id) + }; + return TMultiType::Create(types.size(), types.data(), typeEnv); + } + + std::vector CreateTestWideItem(ui32 val1, ui64 val2, const TString& str) { + return { + NUdf::TUnboxedValuePod(val1), + NUdf::TUnboxedValuePod(val2), + MakeString(NUdf::TStringRef(str)) + }; + } + + void VerifyWideItem(const TArrayRef& wideItem, ui32 expectedVal1, ui64 expectedVal2, const TString& expectedStr) { + UNIT_ASSERT_VALUES_EQUAL(wideItem.size(), 3); + UNIT_ASSERT_VALUES_EQUAL(wideItem[0].Get(), expectedVal1); + UNIT_ASSERT_VALUES_EQUAL(wideItem[1].Get(), expectedVal2); + UNIT_ASSERT_VALUES_EQUAL(TString(wideItem[2].AsStringRef()), expectedStr); + } + +} // namespace + +Y_UNIT_TEST_SUITE(TWideUnboxedValuesSpillerAdapterTest) { + + Y_UNIT_TEST(TestBasicWriteAndRead) { + auto holderFactory = CreateTestHolderFactory(); + + TScopedAlloc Alloc(__LOCATION__); + TTypeEnvironment TypeEnv(Alloc); + + auto multiType = CreateTestMultiType(TypeEnv); + auto spiller = CreateMockSpiller(); + + TWideUnboxedValuesSpillerAdapter adapter(spiller, multiType, 1000); + + // Write some items + auto item1 = CreateTestWideItem(1, 100, "test1"); + auto item2 = CreateTestWideItem(2, 200, "test2"); + + auto future1 = adapter.WriteWideItem(item1); + UNIT_ASSERT(!future1.has_value()); // Should not spill yet + + auto future2 = adapter.WriteWideItem(item2); + UNIT_ASSERT(!future2.has_value()); // Should not spill yet + + // Finish writing + auto finishFuture = adapter.FinishWriting(); + UNIT_ASSERT(finishFuture.has_value()); + + // Wait for async operation + auto key = finishFuture->GetValueSync(); + adapter.AsyncWriteCompleted(key); + + // Read items back + std::vector readItem1(3); + auto readFuture1 = adapter.ExtractWideItem(readItem1); + UNIT_ASSERT(readFuture1.has_value()); + + // Wait for async read + auto rope = readFuture1->GetValueSync(); + UNIT_ASSERT(rope.has_value()); + adapter.AsyncReadCompleted(std::move(*rope), holderFactory); + + // Extract the first item + std::vector extractedItem1(3); + auto extractFuture1 = adapter.ExtractWideItem(extractedItem1); + UNIT_ASSERT(!extractFuture1.has_value()); + VerifyWideItem(extractedItem1, 1, 100, "test1"); + + // Extract the second item (should be in the same batch) + std::vector extractedItem2(3); + auto extractFuture2 = adapter.ExtractWideItem(extractedItem2); + UNIT_ASSERT(!extractFuture2.has_value()); + VerifyWideItem(extractedItem2, 2, 200, "test2"); + + // Should be empty now + UNIT_ASSERT(adapter.Empty()); + } + + Y_UNIT_TEST(TestReportAllocAndReportFree) { + auto holderFactory = CreateTestHolderFactory(); + + TScopedAlloc Alloc(__LOCATION__); + TTypeEnvironment TypeEnv(Alloc); + + auto multiType = CreateTestMultiType(TypeEnv); + auto spiller = CreateMockSpiller(); + auto mockSpiller = static_cast(spiller.get()); + + TWideUnboxedValuesSpillerAdapter adapter(spiller, multiType, 100); // Small size limit to force spilling + + // Write items that will exceed the size limit + auto item1 = CreateTestWideItem(1, 100, "test1"); + auto item2 = CreateTestWideItem(2, 200, "test2"); + auto item3 = CreateTestWideItem(3, 300, "test3"); + + // First item should not spill + auto future1 = adapter.WriteWideItem(item1); + UNIT_ASSERT(!future1.has_value()); + + // Check that ReportAlloc was called + UNIT_ASSERT_VALUES_EQUAL(mockSpiller->GetAllocCalls().size(), 1); + UNIT_ASSERT(mockSpiller->GetAllocatedMemory() > 0); + + // Second item might cause spilling depending on size + auto future2 = adapter.WriteWideItem(item2); + // Note: spilling might not happen if items are small enough + + // Wait for async operation if spilling happened + if (future2.has_value()) { + auto key = future2->GetValueSync(); + adapter.AsyncWriteCompleted(key); + + // Check that ReportFree was called after AsyncWriteCompleted + UNIT_ASSERT_VALUES_EQUAL(mockSpiller->GetFreeCalls().size(), 1); + UNIT_ASSERT(mockSpiller->GetFreedMemory() > 0); + } + + // Write third item + auto future3 = adapter.WriteWideItem(item3); + UNIT_ASSERT(!future3.has_value()); + + // Finish writing + auto finishFuture = adapter.FinishWriting(); + UNIT_ASSERT(finishFuture.has_value()); + + auto finishKey = finishFuture->GetValueSync(); + adapter.AsyncWriteCompleted(finishKey); + + // Verify memory tracking + UNIT_ASSERT(mockSpiller->GetAllocCalls().size() >= 1); + // Free calls depend on whether spilling happened + if (mockSpiller->GetFreeCalls().size() > 0) { + UNIT_ASSERT(mockSpiller->GetFreedMemory() > 0); + } + + // Read all items back + for (int i = 0; i < 3; ++i) { + std::vector readItem(3); + auto readFuture = adapter.ExtractWideItem(readItem); + + if (readFuture.has_value()) { + // Data is in spiller, need to read asynchronously + auto rope = readFuture->GetValueSync(); + UNIT_ASSERT(rope.has_value()); + adapter.AsyncReadCompleted(std::move(*rope), holderFactory); + + std::vector extractedItem(3); + auto extractFuture = adapter.ExtractWideItem(extractedItem); + UNIT_ASSERT(!extractFuture.has_value()); + } else { + // Data is already in CurrentBatch, no need for async read + // The data is already in readItem + } + } + + UNIT_ASSERT(adapter.Empty()); + } + + Y_UNIT_TEST(TestEmptyAdapter) { + TScopedAlloc Alloc(__LOCATION__); + TTypeEnvironment TypeEnv(Alloc); + + auto multiType = CreateTestMultiType(TypeEnv); + auto spiller = CreateMockSpiller(); + + TWideUnboxedValuesSpillerAdapter adapter(spiller, multiType, 1000); + + // Should be empty initially + UNIT_ASSERT(adapter.Empty()); + + // Finish writing without any data + auto finishFuture = adapter.FinishWriting(); + UNIT_ASSERT(!finishFuture.has_value()); + + // Should still be empty + UNIT_ASSERT(adapter.Empty()); + } + + Y_UNIT_TEST(TestLargeItems) { + auto holderFactory = CreateTestHolderFactory(); + + TScopedAlloc Alloc(__LOCATION__); + TTypeEnvironment TypeEnv(Alloc); + + auto multiType = CreateTestMultiType(TypeEnv); + auto spiller = CreateMockSpiller(); + auto mockSpiller = static_cast(spiller.get()); + + TWideUnboxedValuesSpillerAdapter adapter(spiller, multiType, 10); // Very small size limit + + // Create a string + TString testString("test_string"); + auto item = CreateTestWideItem(1, 100, testString); + + // This should immediately spill due to size + auto future = adapter.WriteWideItem(item); + UNIT_ASSERT(future.has_value()); + + // Wait for async operation + auto key = future->GetValueSync(); + adapter.AsyncWriteCompleted(key); + + // Verify memory tracking + UNIT_ASSERT(mockSpiller->GetAllocCalls().size() >= 1); + UNIT_ASSERT(mockSpiller->GetFreeCalls().size() >= 1); + + // Read back + std::vector readItem(3); + auto readFuture = adapter.ExtractWideItem(readItem); + UNIT_ASSERT(readFuture.has_value()); + + auto rope = readFuture->GetValueSync(); + UNIT_ASSERT(rope.has_value()); + adapter.AsyncReadCompleted(std::move(*rope), holderFactory); + + std::vector extractedItem(3); + auto extractFuture = adapter.ExtractWideItem(extractedItem); + UNIT_ASSERT(!extractFuture.has_value()); + VerifyWideItem(extractedItem, 1, 100, testString); + } + + Y_UNIT_TEST(TestMockSpillerFactory) { + TScopedAlloc Alloc(__LOCATION__); + TTypeEnvironment TypeEnv(Alloc); + + auto factory = std::make_shared(); + + // Create spiller through factory + auto spiller = factory->CreateSpiller(); + UNIT_ASSERT(spiller != nullptr); + + // Verify spiller was tracked + UNIT_ASSERT_VALUES_EQUAL(factory->GetCreatedSpillers().size(), 1); + UNIT_ASSERT_EQUAL(factory->GetCreatedSpillers()[0], spiller); + + // Create another spiller + auto spiller2 = factory->CreateSpiller(); + UNIT_ASSERT(spiller2 != nullptr); + UNIT_ASSERT_VALUES_EQUAL(factory->GetCreatedSpillers().size(), 2); + + // Test memory reporting callbacks + bool allocCalled = false; + bool freeCalled = false; + + factory->SetMemoryReportingCallbacks( + [&allocCalled](ui64 /*size*/) { allocCalled = true; return true; }, + [&freeCalled](ui64 /*size*/) { freeCalled = true; } + ); + + UNIT_ASSERT(factory->GetReportAllocCallback()); + UNIT_ASSERT(factory->GetReportFreeCallback()); + } + + Y_UNIT_TEST(TestNoMemoryReportingCallbacks) { + TScopedAlloc Alloc(__LOCATION__); + TTypeEnvironment TypeEnv(Alloc); + + auto multiType = CreateTestMultiType(TypeEnv); + auto spiller = CreateMockSpiller(); + auto mockSpiller = static_cast(spiller.get()); + + // Create adapter with very small size limit to force spilling + TWideUnboxedValuesSpillerAdapter adapter(spiller, multiType, 10); + + // Create test data + TString testString("test_string"); + auto item = CreateTestWideItem(1, 100, testString); + + // Write item - should spill due to small size limit + auto future = adapter.WriteWideItem(item); + UNIT_ASSERT(future.has_value()); + + // Wait for async operation + auto key = future->GetValueSync(); + adapter.AsyncWriteCompleted(key); + + // Verify that ReportAlloc and ReportFree were called even without callbacks + UNIT_ASSERT(mockSpiller->GetAllocCalls().size() >= 1); + UNIT_ASSERT(mockSpiller->GetFreeCalls().size() >= 1); + + // Read back the data + auto holderFactory = CreateTestHolderFactory(); + std::vector readItem(3); + auto readFuture = adapter.ExtractWideItem(readItem); + UNIT_ASSERT(readFuture.has_value()); + + auto rope = readFuture->GetValueSync(); + UNIT_ASSERT(rope.has_value()); + adapter.AsyncReadCompleted(std::move(*rope), holderFactory); + + std::vector extractedItem(3); + auto extractFuture = adapter.ExtractWideItem(extractedItem); + UNIT_ASSERT(!extractFuture.has_value()); + VerifyWideItem(extractedItem, 1, 100, testString); + + // Verify adapter is empty after reading + UNIT_ASSERT(adapter.Empty()); + } + +} // Y_UNIT_TEST_SUITE + +} // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mkql_spiller_factory.h b/yql/essentials/minikql/computation/mkql_spiller_factory.h index 0d6dffe67341..fc075395aa61 100644 --- a/yql/essentials/minikql/computation/mkql_spiller_factory.h +++ b/yql/essentials/minikql/computation/mkql_spiller_factory.h @@ -15,6 +15,8 @@ class ISpillerFactory : private TNonCopyable virtual void SetTaskCounters(const TIntrusivePtr& spillingTaskCounters) = 0; + virtual void SetMemoryReportingCallbacks(std::function reportAlloc, std::function reportFree) = 0; + virtual ~ISpillerFactory(){} }; diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h index 4b0b2ed24a3b..710903010ff7 100644 --- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h @@ -11,6 +11,11 @@ class TMockSpillerFactory : public ISpillerFactory void SetTaskCounters(const TIntrusivePtr& /*spillingTaskCounters*/) override { } + void SetMemoryReportingCallbacks(std::function reportAlloc, std::function reportFree) override { + ReportAllocCallback_ = std::move(reportAlloc); + ReportFreeCallback_ = std::move(reportFree); + } + ISpiller::TPtr CreateSpiller() override { auto new_spiller = CreateMockSpiller(); Spillers_.push_back(new_spiller); @@ -21,8 +26,18 @@ class TMockSpillerFactory : public ISpillerFactory return Spillers_; } + const std::function& GetReportAllocCallback() const { + return ReportAllocCallback_; + } + + const std::function& GetReportFreeCallback() const { + return ReportFreeCallback_; + } + private: std::vector Spillers_; + std::function ReportAllocCallback_; + std::function ReportFreeCallback_; }; } // namespace NKikimr::NMiniKQL diff --git a/yql/essentials/minikql/computation/mock_spiller_ut.h b/yql/essentials/minikql/computation/mock_spiller_ut.h index 715018f3e0ef..aa229cc43c4e 100644 --- a/yql/essentials/minikql/computation/mock_spiller_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_ut.h @@ -54,13 +54,43 @@ class TMockSpiller: public ISpiller{ return promise.GetFuture(); } + void ReportAlloc(size_t size) override { + AllocatedMemory_ += size; + AllocCalls_.push_back(size); + } + + void ReportFree(size_t size) override { + FreedMemory_ += size; + FreeCalls_.push_back(size); + } + const std::vector& GetPutSizes() const { return PutSizes_; } + + const std::vector& GetAllocCalls() const { + return AllocCalls_; + } + + const std::vector& GetFreeCalls() const { + return FreeCalls_; + } + + size_t GetAllocatedMemory() const { + return AllocatedMemory_; + } + + size_t GetFreedMemory() const { + return FreedMemory_; + } private: ISpiller::TKey NextKey_; std::unordered_map Storage_; std::vector PutSizes_; + std::vector AllocCalls_; + std::vector FreeCalls_; + size_t AllocatedMemory_ = 0; + size_t FreedMemory_ = 0; }; inline ISpiller::TPtr CreateMockSpiller() { return std::make_shared(); diff --git a/yql/essentials/minikql/computation/ut/ya.make.inc b/yql/essentials/minikql/computation/ut/ya.make.inc index 969083f969e8..cccf90fec21f 100644 --- a/yql/essentials/minikql/computation/ut/ya.make.inc +++ b/yql/essentials/minikql/computation/ut/ya.make.inc @@ -25,6 +25,7 @@ SRCS( presort_ut.cpp mkql_vector_spiller_adapter_ut.cpp mkql_method_address_helper_ut.cpp + mkql_spiller_adapter_ut.cpp ) PEERDIR(