Skip to content

feat: enable block stream write #18285

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 4, 2025
Merged

feat: enable block stream write #18285

merged 13 commits into from
Aug 4, 2025

Conversation

zhyass
Copy link
Member

@zhyass zhyass commented Jul 1, 2025

I hereby agree to the terms of the CLA available at: https://docs.databend.com/dev/policies/cla/

Summary

This PR introduces two main features:

  1. Stream Write Functionality: This feature enables the stream write functionality and sets it as the default behavior for writes. Stream writes allow for more efficient and flexible handling of data insertion, especially useful for high-throughput scenarios.

  2. Block-Level Statistics: The PR introduces support for block-level statistics by integrating HyperLogLog (HLL) for distinct count estimation. These block-level statistics are consolidated at the segment level, enabling more efficient storage and management. This change improves the accuracy and performance of data analysis at the block level, which can be aggregated for higher-level statistics such as segment or table-level analysis.

  3. New Table Option - approx_distinct_columns: A new table option approx_distinct_columns is introduced. This option allows users to specify which columns should have HyperLogLog (HLL) statistics for approximate distinct count calculation. By default, all eligible columns are considered for HLL statistics, but this option provides users more control over which columns to track.

root@localhost:8000/default/default> create table t1(a int, b string);

root@localhost:8000/default/default> insert into t1 values(1,'a'),(2,'b');
╭─────────────────────────╮
│ number of rows inserted │
│          UInt64         │
├─────────────────────────┤
│                       2 │
╰─────────────────────────╯
2 rows written in 0.071 sec. Processed 2 rows, 36 B (28.17 rows/s, 507 B/s)

root@localhost:8000/default/default> select file_location, block_count, row_count, segment_stats_size from fuse_segment('default', 't1');
╭──────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                     file_location                     │ block_count │ row_count │ segment_stats_size │
│                         String                        │    UInt64   │   UInt64  │  Nullable(UInt64)  │
├───────────────────────────────────────────────────────┼─────────────┼───────────┼────────────────────┤
│ '1/5518/_sg/h0198618bea9b7d01bce2f2e56d4addbc_v4.mpk'12128 │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────╯
1 row read in 0.036 sec. Processed 1 row, 174 B (27.78 rows/s, 4.72 KiB/s)

root@localhost:8000/default/default> create table t2(a int, b string) approx_distinct_columns = '';

root@localhost:8000/default/default> insert into t2 values(1,'a'),(2,'b');
╭─────────────────────────╮
│ number of rows inserted │
│          UInt64         │
├─────────────────────────┤
│                       2 │
╰─────────────────────────╯
2 rows written in 0.077 sec. Processed 2 rows, 36 B (25.97 rows/s, 467 B/s)

root@localhost:8000/default/default> select file_location, block_count, row_count, segment_stats_size from fuse_segment('default', 't2');
╭──────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                     file_location                     │ block_count │ row_count │ segment_stats_size │
│                         String                        │    UInt64   │   UInt64  │  Nullable(UInt64)  │
├───────────────────────────────────────────────────────┼─────────────┼───────────┼────────────────────┤
│ '1/5532/_sg/h0198618ce40171c9a0bd98368798f8fc_v4.mpk'12NULL │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────╯
1 row read in 0.046 sec. Processed 1 row, 174 B (21.74 rows/s, 3.69 KiB/s)

HLL File Write & Read & Cleanup Flow

                            HLL LIFECYCLE
                    ═══════════════════════════════

                        WRITE PATH
                        ──────────
                            │
                            ▼
            ┌─────────────────────────────────────┐
            │         Block Processing            │
            │                                     │
            │  Block A → RawBlockHLL (Vec<u8>)    │
            │  Block B → RawBlockHLL (Vec<u8>)    │
            │  Block C → RawBlockHLL (Vec<u8>)    │
            └─────────────┬───────────────────────┘
                          │ collect
                          ▼
            ┌─────────────────────────────────────┐
            │      SegmentStatistics::new()       │
            │                                     │
            │  block_hlls: Vec<RawBlockHLL>       │
            │  format_version: 0                  │
            └─────────────┬───────────────────────┘
                          │ to_bytes()
                          │ (MessagePack + Zstd)
                          ▼
            ┌─────────────────────────────────────┐
            │        Write HLL File               │
            │                                     │
            │   /_hs/xxxxx_v0.mpk                │
            │                                     │
            │ [Version][Encoding][Compression]    │
            │ [DataSize][CompressedData]          │
            └─────────────┬───────────────────────┘
                          │ reference
                          ▼
            ┌─────────────────────────────────────┐
            │      Segment Metadata               │
            │                                     │
            │   /_sg/xxxxx_v4.mpk                │
            │   additional_stats_meta: {          │
            │     location: "/_hs/xxxxx_v0.mpk"  │
            │   }                                 │
            └─────────────────────────────────────┘

                        READ PATH
                        ─────────
                            │
                            ▼
            ┌─────────────────────────────────────┐
            │     1. Read Segment Metadata        │
            │        /_sg/xxxxx_v4.mpk          │
            └─────────────┬───────────────────────┘
                          │ extract location
                          ▼
            ┌─────────────────────────────────────┐
            │     2. Read HLL Statistics File     │
            │        /_hs/xxxxx_v0.mpk           │
            │                                     │
            │   SegmentStatistics::from_read()    │
            └─────────────┬───────────────────────┘
                          │ access block_hlls[idx]
                          ▼
            ┌─────────────────────────────────────┐
            │     3. Decode Block HLL             │
            │                                     │
            │   decode_column_hll()               │
            │   → HashMap<ColumnId, MetaHLL>      │
            └─────────────────────────────────────┘

                       CLEANUP PATH (GC)
                       ─────────────────
                            │
                            ▼
            ┌─────────────────────────────────────┐
            │    1. Collect Referenced Files      │
            │                                     │
            │  Scan active snapshots:             │
            │  ├─ segments: /_sg/*.mpk          │
            │  ├─ blocks: /_b/*.parquet          │
            │  ├─ bloom_indexes: /_i_b_v2/*      │
            │  └─ hll_stats: /_hs/*.mpk          │
            └─────────────┬───────────────────────┘
                          │
                          ▼
            ┌─────────────────────────────────────┐
            │    2. Identify Orphan HLL Files     │
            │                                     │
            │  All /_hs/*.mpk files               │
            │       MINUS                         │
            │  Referenced /_hs/*.mpk files        │
            │       EQUALS                        │
            │  Orphan HLL files to purge          │
            └─────────────┬───────────────────────┘
                          │
                          ▼
            ┌─────────────────────────────────────┐
            │    3. Apply Retention Policy        │
            │                                     │
            │  Check file modification time:      │
            │  if (now - file_mtime) > retention  │
            │  then mark for deletion             │
            └─────────────┬───────────────────────┘
                          │
                          ▼
            ┌─────────────────────────────────────┐
            │    4. Purge Orphan HLL Files        │
            │                                     │
            │  Delete orphan /_hs/*.mpk files     │
            │  Update purge counter:              │
            │  counter.hlls += deleted_count      │
            └─────────────────────────────────────┘

                        FILE STRUCTURE
                        ──────────────
            
            Table Root/
            ├── _sg/xxxxx_v4.mpk    (segment metadata)
            ├── _hs/xxxxx_v0.mpk     (HLL statistics) ← GC target
            └── _b/xxxxx_v4.parquet  (block data)

                        GC TRIGGERS
                        ───────────
            
            • VACUUM TABLE ... (manual)
            • OPTIMIZE TABLE PURGE (manual)  
            • Background auto-vacuum (automatic)
            • Retention policy enforcement

Tests

  • Unit Test
  • Logic Test
  • Benchmark Test
  • No Test - Explain why

Type of change

  • Bug Fix (non-breaking change which fixes an issue)
  • New Feature (non-breaking change which adds functionality)
  • Breaking Change (fix or feature that could cause existing functionality not to work as expected)
  • Documentation Update
  • Refactoring
  • Performance Improvement
  • Other (please describe):

This change is Reviewable

@zhyass zhyass marked this pull request as draft July 1, 2025 10:50
@github-actions github-actions bot added the pr-feature this PR introduces a new feature to the codebase label Jul 1, 2025
@zhyass zhyass force-pushed the feat_stream branch 2 times, most recently from f79fe6f to 6fc4e0c Compare July 4, 2025 19:45
@zhyass zhyass added the ci-benchmark Benchmark: run all test label Jul 6, 2025
@zhyass zhyass force-pushed the feat_stream branch 2 times, most recently from 553a5ce to 26e8a74 Compare July 9, 2025 05:53
@zhyass zhyass added ci-benchmark Benchmark: run all test and removed ci-benchmark Benchmark: run all test labels Jul 9, 2025
@zhyass zhyass force-pushed the feat_stream branch 2 times, most recently from a9dfc76 to db64b98 Compare July 13, 2025 17:27
@zhyass zhyass added ci-benchmark Benchmark: run all test and removed ci-benchmark Benchmark: run all test labels Jul 13, 2025
@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-benchmark Benchmark: run all test labels Jul 14, 2025
@databendlabs databendlabs deleted a comment from github-actions bot Jul 14, 2025
@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jul 15, 2025
Copy link
Contributor

Docker Image for PR

  • tag: pr-18285-52e5e77-1752696188

note: this image tag is only available for internal use.

@databendlabs databendlabs deleted a comment from github-actions bot Jul 30, 2025
@databendlabs databendlabs deleted a comment from github-actions bot Jul 30, 2025
@databendlabs databendlabs deleted a comment from github-actions bot Jul 30, 2025
Copy link
Contributor

Docker Image for PR

  • tag: pr-18285-439539a-1753911154

note: this image tag is only available for internal use.

@zhyass zhyass added ci-cloud Build docker image for cloud test and removed ci-cloud Build docker image for cloud test labels Jul 31, 2025
@databendlabs databendlabs deleted a comment from github-actions bot Jul 31, 2025
@databendlabs databendlabs deleted a comment from github-actions bot Jul 31, 2025
@zhyass zhyass marked this pull request as ready for review July 31, 2025 18:08
Copy link
Contributor

Docker Image for PR

  • tag: pr-18285-6cfd498-1753985918

note: this image tag is only available for internal use.

@BohuTANG
Copy link
Member

BohuTANG commented Aug 1, 2025

HLL Write Performance Test - Complete Report

Test Methodology

This test compares write performance between HLL-enabled and HLL-disabled tables using a 5 billion row dataset.

Test Steps

  1. Generate base data (5B rows)
  2. Create two identical tables (HLL enabled vs disabled)
  3. Insert data from base table to both test tables
  4. Compare execution times and storage metrics

Complete Test SQL

-- ================================================================
-- HLL Write Performance Test - 5 Billion Rows
-- ================================================================

-- Step 1: Create base data table
CREATE OR REPLACE TABLE test_base_data (
    user_id UInt64,
    product_id UInt32,
    category_id UInt16,
    amount Decimal(10,2),
    event_date Date,
    description String
);

-- Step 2: Generate test data (5 billion rows)
INSERT INTO test_base_data 
SELECT 
    number as user_id,                                    -- 5B distinct
    (number % 100000) as product_id,                      -- 100K distinct  
    (number % 1000) as category_id,                       -- 1K distinct
    ((number % 10000) / 100.0) as amount,                -- Variable amounts
    ('2024-01-01'::Date + (number % 365)) as event_date, -- 1 year range
    CASE 
        WHEN number % 100 = 0 THEN repeat('long_desc_', 50) || (number)::VARCHAR
        ELSE 'desc_' || (number % 10000)::VARCHAR
    END as description
FROM numbers(5000000000);

-- Step 3: Create test tables
CREATE OR REPLACE TABLE test_no_hll (
    user_id UInt64,
    product_id UInt32,
    category_id UInt16,
    amount Decimal(10,2),
    event_date Date,
    description String
) ENGINE = Fuse approx_distinct_columns = '';

CREATE OR REPLACE TABLE test_with_hll (
    user_id UInt64,
    product_id UInt32,
    category_id UInt16,
    amount Decimal(10,2),
    event_date Date,
    description String
) ENGINE = Fuse;

-- Step 4: Performance test - INSERT operations
INSERT INTO test_no_hll SELECT * FROM test_base_data;
INSERT INTO test_with_hll SELECT * FROM test_base_data;


-- Step 6: Analyze results
SELECT 
    'NO HLL' as table_type,
    count(*) as segment_count,
    sum(row_count) as total_rows,
    sum(block_count) as total_blocks,
    0 as segments_with_hll,
    humanize_size(0) as total_hll_size
FROM fuse_segment('default', 'test_no_hll')
UNION ALL
SELECT 
    'WITH HLL' as table_type,
    count(*) as segment_count,
    sum(row_count) as total_rows,
    sum(block_count) as total_blocks,
    sum(CASE WHEN segment_stats_size IS NOT NULL THEN 1 ELSE 0 END) as segments_with_hll,
    humanize_size(sum(coalesce(segment_stats_size, 0))) as total_hll_size
FROM fuse_segment('default', 'test_with_hll');

Performance Results

INSERT Performance Comparison

Operation Execution Time Performance Impact
INSERT INTO test_no_hll 3m 41.1s Baseline
INSERT INTO test_with_hll 3m 32.9s 2.3% faster

Storage Analysis Results

╭─────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ table_type │ segment_count │    total_rows    │   total_blocks   │ segments_with_hll │  total_hll_size  │
│   String   │     UInt64    │ Nullable(UInt64) │ Nullable(UInt64) │  Nullable(UInt64) │ Nullable(String) │
├────────────┼───────────────┼──────────────────┼──────────────────┼───────────────────┼──────────────────┤
│ 'NO HLL'   │             6 │       5000000000 │             5870 │                 0 │ '0.00 B'         │
│ 'WITH HLL' │             6 │       5000000000 │             5870 │                 6 │ '1.55 MiB'       │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────╯

Key Metrics Summary

Metric NO HLL WITH HLL Difference
Execution Time 3m 41.1s 3m 32.9s -8.2s (2.3% faster)
Segments 6 6 0
Total Rows 5,000,000,000 5,000,000,000 0
Total Blocks 5,870 5,870 0
HLL Coverage 0/6 segments 6/6 segments 100% coverage
HLL Storage 0.00 B 1.55 MiB +1.55 MiB
Storage Overhead - - ~0.000003%

@zhyass zhyass merged commit 958b862 into databendlabs:main Aug 4, 2025
87 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-cloud Build docker image for cloud test pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants