Skip to content

Commit f6fa3c1

Browse files
Merge branch 'main' of github.com:ClickHouse/clickhouse-go into reduce-memory-usage-compressor
2 parents 7783fa4 + 88b9368 commit f6fa3c1

File tree

4 files changed

+31
-13
lines changed

4 files changed

+31
-13
lines changed

clickhouse_options.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ func (c CompressionMethod) String() string {
4242
return "zstd"
4343
case CompressionLZ4:
4444
return "lz4"
45+
case CompressionLZ4HC:
46+
return "lz4hc"
4547
case CompressionGZIP:
4648
return "gzip"
4749
case CompressionDeflate:
@@ -56,6 +58,7 @@ func (c CompressionMethod) String() string {
5658
const (
5759
CompressionNone = CompressionMethod(compress.None)
5860
CompressionLZ4 = CompressionMethod(compress.LZ4)
61+
CompressionLZ4HC = CompressionMethod(compress.LZ4HC)
5962
CompressionZSTD = CompressionMethod(compress.ZSTD)
6063
CompressionGZIP = CompressionMethod(0x95)
6164
CompressionDeflate = CompressionMethod(0x96)
@@ -66,6 +69,7 @@ var compressionMap = map[string]CompressionMethod{
6669
"none": CompressionNone,
6770
"zstd": CompressionZSTD,
6871
"lz4": CompressionLZ4,
72+
"lz4hc": CompressionLZ4HC,
6973
"gzip": CompressionGZIP,
7074
"deflate": CompressionDeflate,
7175
"br": CompressionBrotli,
@@ -79,7 +83,7 @@ type Auth struct { // has_control_character
7983

8084
type Compression struct {
8185
Method CompressionMethod
82-
// this only applies to zlib and brotli compression algorithms
86+
// this only applies to lz4, lz4hc, zlib, and brotli compression algorithms
8387
Level int
8488
}
8589

conn.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,22 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
7373
}
7474
}
7575

76-
compression := CompressionNone
76+
var (
77+
compression CompressionMethod
78+
compressor *compress.Writer
79+
)
7780
if opt.Compression != nil {
7881
switch opt.Compression.Method {
79-
case CompressionLZ4, CompressionZSTD, CompressionNone:
82+
case CompressionLZ4, CompressionLZ4HC, CompressionZSTD, CompressionNone:
8083
compression = opt.Compression.Method
8184
default:
8285
return nil, fmt.Errorf("unsupported compression method for native protocol")
8386
}
87+
88+
compressor = compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method))
89+
} else {
90+
compression = CompressionNone
91+
compressor = compress.NewWriter(compress.LevelZero, compress.None)
8492
}
8593

8694
var (
@@ -95,7 +103,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
95103
structMap: &structMap{},
96104
compression: compression,
97105
connectedAt: time.Now(),
98-
compressor: compress.NewWriterWithMethods(0, compress.Method(compression)),
106+
compressor: compressor,
99107
readTimeout: opt.ReadTimeout,
100108
blockBufferSize: opt.BlockBufferSize,
101109
maxCompressionBuffer: opt.MaxCompressionBuffer,
@@ -247,7 +255,7 @@ func (c *connect) exception() error {
247255
func (c *connect) compressBuffer(start int) error {
248256
if c.compression != CompressionNone && len(c.buffer.Buf) > 0 {
249257
data := c.buffer.Buf[start:]
250-
if err := c.compressor.Compress(compress.Method(c.compression), data); err != nil {
258+
if err := c.compressor.Compress(data); err != nil {
251259
return errors.Wrap(err, "compress")
252260
}
253261
c.buffer.Buf = append(c.buffer.Buf[:start], c.compressor.Data...)

conn_http.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
230230
url: u,
231231
buffer: new(chproto.Buffer),
232232
compression: opt.Compression.Method,
233-
blockCompressor: compress.NewWriterWithMethods(0, compress.Method(opt.Compression.Method)),
233+
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
234234
compressionPool: compressionPool,
235235
blockBufferSize: opt.BlockBufferSize,
236236
headers: headers,
@@ -256,7 +256,7 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
256256
url: u,
257257
buffer: new(chproto.Buffer),
258258
compression: opt.Compression.Method,
259-
blockCompressor: compress.NewWriterWithMethods(0, compress.Method(opt.Compression.Method)),
259+
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
260260
compressionPool: compressionPool,
261261
location: location,
262262
blockBufferSize: opt.BlockBufferSize,
@@ -377,7 +377,7 @@ func (h *httpConnect) writeData(block *proto.Block) error {
377377
if h.compression == CompressionLZ4 || h.compression == CompressionZSTD {
378378
// Performing compression. Supported and requires
379379
data := h.buffer.Buf[start:]
380-
if err := h.blockCompressor.Compress(compress.Method(h.compression), data); err != nil {
380+
if err := h.blockCompressor.Compress(data); err != nil {
381381
return errors.Wrap(err, "compress")
382382
}
383383
h.buffer.Buf = append(h.buffer.Buf[:start], h.blockCompressor.Data...)

tests/compression_test.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,27 +19,33 @@ package tests
1919

2020
import (
2121
"context"
22+
"github.com/ClickHouse/ch-go/compress"
2223
"github.com/ClickHouse/clickhouse-go/v2"
2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
2526
"testing"
2627
)
2728

2829
func TestZSTDCompression(t *testing.T) {
29-
CompressionTest(t, clickhouse.CompressionZSTD)
30+
CompressionTest(t, compress.LevelZero, clickhouse.CompressionZSTD)
3031
}
3132

3233
func TestLZ4Compression(t *testing.T) {
33-
CompressionTest(t, clickhouse.CompressionLZ4)
34+
CompressionTest(t, compress.Level(3), clickhouse.CompressionLZ4)
35+
}
36+
37+
func TestLZ4HCCompression(t *testing.T) {
38+
CompressionTest(t, compress.LevelLZ4HCDefault, clickhouse.CompressionLZ4HC)
3439
}
3540

3641
func TestNoCompression(t *testing.T) {
37-
CompressionTest(t, clickhouse.CompressionNone)
42+
CompressionTest(t, compress.LevelZero, clickhouse.CompressionNone)
3843
}
3944

40-
func CompressionTest(t *testing.T, method clickhouse.CompressionMethod) {
45+
func CompressionTest(t *testing.T, level compress.Level, method clickhouse.CompressionMethod) {
4146
conn, err := GetNativeConnection(nil, nil, &clickhouse.Compression{
4247
Method: method,
48+
Level: int(level),
4349
})
4450
ctx := context.Background()
4551
require.NoError(t, err)
@@ -57,7 +63,7 @@ func CompressionTest(t *testing.T, method clickhouse.CompressionMethod) {
5763
var (
5864
col1Data = []string{"A", "b", "c"}
5965
)
60-
for i := 0; i < 10; i++ {
66+
for i := 0; i < 100; i++ {
6167
require.NoError(t, batch.Append(col1Data))
6268
}
6369
require.NoError(t, batch.Send())

0 commit comments

Comments
 (0)