1
+
1
2
#include " mkql_counters.h"
2
3
#include " mkql_rh_hash.h"
3
4
#include " mkql_wide_combine.h"
@@ -407,6 +408,18 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
407
408
408
409
EBucketState BucketState = EBucketState::InMemory;
409
410
ui64 LineCount = 0 ;
411
+
412
+ // / Get total memory size used by spilling adapters in this bucket
413
+ size_t GetTotalSpillerMemorySize () const {
414
+ size_t total = 0 ;
415
+ if (SpilledState) {
416
+ total += SpilledState->GetBufferSize ();
417
+ }
418
+ if (SpilledData) {
419
+ total += SpilledData->GetBufferSize ();
420
+ }
421
+ return total;
422
+ }
410
423
};
411
424
412
425
enum class EOperatingMode {
@@ -609,15 +622,33 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
609
622
ui32 largestInMemoryBucketNum = (ui32)-1 ;
610
623
for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
611
624
if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory) {
612
- if (SpilledBuckets[i].LineCount >= maxSize) {
625
+ // Consider both line count and spiller buffer memory usage
626
+ ui64 bucketSize = SpilledBuckets[i].LineCount + (SpilledBuckets[i].GetTotalSpillerMemorySize () / 1024 ); // Convert bytes to approx. line equivalents
627
+ if (bucketSize >= maxSize) {
613
628
largestInMemoryBucketNum = i;
614
- maxSize = SpilledBuckets[i]. LineCount ;
629
+ maxSize = bucketSize ;
615
630
}
616
631
}
617
632
}
618
633
return largestInMemoryBucketNum;
619
634
}
620
635
636
+ ui32 GetBucketWithLargestSpillerBuffer () const {
637
+ size_t maxSpillerSize = 0 ;
638
+ ui32 bucketWithLargestSpiller = (ui32)-1 ;
639
+ for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
640
+ if (SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::InMemory ||
641
+ SpilledBuckets[i].BucketState == TSpilledBucket::EBucketState::SpillingData) {
642
+ size_t spillerSize = SpilledBuckets[i].GetTotalSpillerMemorySize ();
643
+ if (spillerSize > maxSpillerSize) {
644
+ bucketWithLargestSpiller = i;
645
+ maxSpillerSize = spillerSize;
646
+ }
647
+ }
648
+ }
649
+ return bucketWithLargestSpiller;
650
+ }
651
+
621
652
bool IsSpillingWhileStateSplitAllowed () const {
622
653
// TODO: Write better condition here. For example: InMemorybuckets > 64
623
654
return true ;
@@ -678,48 +709,41 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
678
709
static_cast <NUdf::TUnboxedValue&>(processingState.Throat [i - KeyWidth]) = std::move (keyAndState[i]);
679
710
}
680
711
681
- if (!HasMemoryForProcessing () && IsSpillingWhileStateSplitAllowed ()) {
682
- if (InMemoryBucketsCount) {
683
- ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber ();
684
-
685
- SplitStateSpillingBucket = bucketNumToSpill;
686
-
687
- auto & bucket = SpilledBuckets[bucketNumToSpill];
688
- bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
689
- SpillingBucketsCount++;
690
- InMemoryBucketsCount--;
691
-
692
- while (const auto keyAndState = static_cast <NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState ->Extract ())) {
693
- bucket.AsyncWriteOperation = bucket.SpilledState ->WriteWideItem ({keyAndState, KeyAndStateType->GetElementsCount ()});
694
- for (size_t i = 0 ; i < KeyAndStateType->GetElementsCount (); ++i) {
695
- // releasing values stored in unsafe TUnboxedValue buffer
696
- keyAndState[i].UnRef ();
697
- }
698
- if (bucket.AsyncWriteOperation ) return true ;
712
+ if (InMemoryBucketsCount && !HasMemoryForProcessing () && IsSpillingWhileStateSplitAllowed ()) {
713
+ // First, try to spill spiller buffers if yellow zone is reached
714
+ ui32 bucketWithLargestSpiller = GetBucketWithLargestSpillerBuffer ();
715
+ if (bucketWithLargestSpiller != (ui32)-1 ) {
716
+ auto & bucket = SpilledBuckets[bucketWithLargestSpiller];
717
+ size_t spillerSize = bucket.GetTotalSpillerMemorySize ();
718
+ // If spiller buffer is reasonably large (> 1MB), try to spill it first
719
+ if (spillerSize > 1_MB && ForceSpillSpillerBuffers (bucket)) {
720
+ UDF_LOG (Logger, LogComponent, NUdf::ELogLevel::Info,
721
+ TStringBuilder () << " Force spilling spiller buffers during state split from bucket "
722
+ << bucketWithLargestSpiller << " size=" << (spillerSize/1_MB) << " MB" );
723
+ return true ;
699
724
}
725
+ }
700
726
701
- bucket.AsyncWriteOperation = bucket.SpilledState ->FinishWriting ();
702
- if (bucket.AsyncWriteOperation ) return true ;
703
- } else {
704
- ui64 bucketToSpill = -1 ;
705
- ui64 maxSize = 0 ;
706
- for (int i = 0 ; i < 128 ; ++i) {
707
- ui64 estimatedSize = SpilledBuckets[i].SpilledState ->GetEstimatedSize ();
708
- if (estimatedSize > maxSize) {
709
- maxSize = estimatedSize;
710
- bucketToSpill = i;
711
- }
712
- }
727
+ ui32 bucketNumToSpill = GetLargestInMemoryBucketNumber ();
713
728
714
- if (bucketToSpill != -1ULL ) {
715
- SpilledBuckets[bucketToSpill].AsyncWriteOperation = SpilledBuckets[bucketToSpill].SpilledState ->FinishWriting ();
716
- MKQL_ENSURE (SpilledBuckets[bucketToSpill].AsyncWriteOperation , " MISHA ERROR" );
717
- SplitStateSpillingBucket = bucketToSpill;
718
- return true ;
729
+ SplitStateSpillingBucket = bucketNumToSpill;
719
730
720
- }
731
+ auto & bucket = SpilledBuckets[bucketNumToSpill];
732
+ bucket.BucketState = TSpilledBucket::EBucketState::SpillingState;
733
+ SpillingBucketsCount++;
734
+ InMemoryBucketsCount--;
721
735
736
+ while (const auto keyAndState = static_cast <NUdf::TUnboxedValue*>(bucket.InMemoryProcessingState ->Extract ())) {
737
+ bucket.AsyncWriteOperation = bucket.SpilledState ->WriteWideItem ({keyAndState, KeyAndStateType->GetElementsCount ()});
738
+ for (size_t i = 0 ; i < KeyAndStateType->GetElementsCount (); ++i) {
739
+ // releasing values stored in unsafe TUnboxedValue buffer
740
+ keyAndState[i].UnRef ();
741
+ }
742
+ if (bucket.AsyncWriteOperation ) return true ;
722
743
}
744
+
745
+ bucket.AsyncWriteOperation = bucket.SpilledState ->FinishWriting ();
746
+ if (bucket.AsyncWriteOperation ) return true ;
723
747
}
724
748
}
725
749
@@ -806,6 +830,33 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
806
830
SpillingBucketsCount--;
807
831
}
808
832
833
+ // Force spill spiller buffers for a bucket to reduce memory usage
834
+ bool ForceSpillSpillerBuffers (TSpilledBucket& bucket) {
835
+ if (bucket.AsyncWriteOperation .has_value ()) {
836
+ return false ; // Already spilling
837
+ }
838
+
839
+ bool spillingStarted = false ;
840
+
841
+ // Force spill state buffer if it has data
842
+ if (bucket.SpilledState && bucket.SpilledState ->GetBufferSize () > 0 ) {
843
+ bucket.AsyncWriteOperation = bucket.SpilledState ->FinishWriting ();
844
+ if (bucket.AsyncWriteOperation ) {
845
+ spillingStarted = true ;
846
+ }
847
+ }
848
+
849
+ // Force spill data buffer if it has data and no state spilling is active
850
+ if (!spillingStarted && bucket.SpilledData && bucket.SpilledData ->GetBufferSize () > 0 ) {
851
+ bucket.AsyncWriteOperation = bucket.SpilledData ->FinishWriting ();
852
+ if (bucket.AsyncWriteOperation ) {
853
+ spillingStarted = true ;
854
+ }
855
+ }
856
+
857
+ return spillingStarted;
858
+ }
859
+
809
860
void UpdateSpillingBuckets () {
810
861
for (ui64 i = 0 ; i < SpilledBucketCount; ++i) {
811
862
auto & bucket = SpilledBuckets[i];
@@ -817,7 +868,16 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
817
868
SpillMoreStateFromBucket (bucket);
818
869
819
870
} else {
820
- bucket.SpilledData ->AsyncWriteCompleted (bucket.AsyncWriteOperation ->ExtractValue ());
871
+ // Complete async write for either SpilledData or forced spiller buffer spilling
872
+ // We determine which one by checking which has more data
873
+ if (bucket.SpilledData && bucket.SpilledData ->GetBufferSize () > 0 ) {
874
+ bucket.SpilledData ->AsyncWriteCompleted (bucket.AsyncWriteOperation ->ExtractValue ());
875
+ } else if (bucket.SpilledState && bucket.SpilledState ->GetBufferSize () > 0 ) {
876
+ bucket.SpilledState ->AsyncWriteCompleted (bucket.AsyncWriteOperation ->ExtractValue ());
877
+ } else {
878
+ // Default to SpilledData for backward compatibility
879
+ bucket.SpilledData ->AsyncWriteCompleted (bucket.AsyncWriteOperation ->ExtractValue ());
880
+ }
821
881
bucket.AsyncWriteOperation = std::nullopt;
822
882
}
823
883
}
@@ -828,6 +888,22 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
828
888
if (SpillingBucketsCount > 0 ) {
829
889
return true ;
830
890
}
891
+
892
+ // First, try to spill spiller buffers from buckets with large buffers
893
+ ui32 bucketWithLargestSpiller = GetBucketWithLargestSpillerBuffer ();
894
+ if (bucketWithLargestSpiller != (ui32)-1 ) {
895
+ auto & bucket = SpilledBuckets[bucketWithLargestSpiller];
896
+ size_t spillerSize = bucket.GetTotalSpillerMemorySize ();
897
+ // If spiller buffer is reasonably large (> 1MB), try to spill it first
898
+ if (spillerSize > 1_MB && ForceSpillSpillerBuffers (bucket)) {
899
+ UDF_LOG (Logger, LogComponent, NUdf::ELogLevel::Info,
900
+ TStringBuilder () << " Force spilling spiller buffers from bucket "
901
+ << bucketWithLargestSpiller << " size=" << (spillerSize/1_MB) << " MB" );
902
+ return true ;
903
+ }
904
+ }
905
+
906
+ // If no spiller buffers to spill or they are small, proceed with regular bucket spilling
831
907
while (InMemoryBucketsCount > 0 ) {
832
908
ui32 maxLineBucketInd = GetLargestInMemoryBucketNumber ();
833
909
MKQL_ENSURE (maxLineBucketInd != (ui32)-1 , " Internal logic error" );
@@ -838,36 +914,6 @@ class TSpillingSupportState : public TComputationValue<TSpillingSupportState> {
838
914
return true ;
839
915
}
840
916
}
841
-
842
- ui64 bucketToSpill = -1 ;
843
- ui64 maxSize = 0 ;
844
- for (int i = 0 ; i < 128 ; ++i) {
845
- auto & bucket = SpilledBuckets[i];
846
- if (bucket.BucketState == TSpilledBucket::EBucketState::SpillingData) {
847
- ui64 estimatedSize = SpilledBuckets[i].SpilledData ->GetEstimatedSize ();
848
- if (estimatedSize > maxSize) {
849
- maxSize = estimatedSize;
850
- bucketToSpill = i;
851
- }
852
- } else if (bucket.BucketState == TSpilledBucket::EBucketState::SpillingState) {
853
- ui64 estimatedSize = SpilledBuckets[i].SpilledState ->GetEstimatedSize ();
854
- if (estimatedSize > maxSize) {
855
- maxSize = estimatedSize;
856
- bucketToSpill = i;
857
- }
858
- }
859
- }
860
-
861
- if (bucketToSpill != -1ULL ) {
862
- if (SpilledBuckets[bucketToSpill].BucketState == TSpilledBucket::EBucketState::SpillingData) {
863
- SpilledBuckets[bucketToSpill].AsyncWriteOperation = SpilledBuckets[bucketToSpill].SpilledData ->FinishWriting ();
864
- } else if (SpilledBuckets[bucketToSpill].BucketState == TSpilledBucket::EBucketState::SpillingState) {
865
- SpilledBuckets[bucketToSpill].AsyncWriteOperation = SpilledBuckets[bucketToSpill].SpilledState ->FinishWriting ();
866
- }
867
- MKQL_ENSURE (SpilledBuckets[bucketToSpill].AsyncWriteOperation , " MISHA ERROR" );
868
- return true ;
869
-
870
- }
871
917
return false ;
872
918
}
873
919
@@ -2092,3 +2138,4 @@ IComputationNode* WrapWideLastCombinerWithSpilling(TCallable& callable, const TC
2092
2138
2093
2139
}
2094
2140
}
2141
+
0 commit comments