Skip to content

Commit 08f247d

Browse files
authored
Merge pull request #28 from s0und0fs1lence/main
Add streaming query
2 parents c08ce79 + 9931e96 commit 08f247d

File tree

12 files changed

+738
-77
lines changed

12 files changed

+738
-77
lines changed

chdb-purego/binding.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,18 @@ func findLibrary() string {
3535
}
3636

3737
var (
38-
queryStable func(argc int, argv []string) *local_result
39-
freeResult func(result *local_result)
40-
queryStableV2 func(argc int, argv []string) *local_result_v2
41-
freeResultV2 func(result *local_result_v2)
42-
connectChdb func(argc int, argv []*byte) **chdb_conn
43-
closeConn func(conn **chdb_conn)
44-
queryConn func(conn *chdb_conn, query string, format string) *local_result_v2
38+
queryStable func(argc int, argv []string) *local_result
39+
freeResult func(result *local_result)
40+
queryStableV2 func(argc int, argv []string) *local_result_v2
41+
freeResultV2 func(result *local_result_v2)
42+
connectChdb func(argc int, argv []*byte) **chdb_conn
43+
closeConn func(conn **chdb_conn)
44+
queryConn func(conn *chdb_conn, query string, format string) *local_result_v2
45+
queryConnStreaming func(conn *chdb_conn, query string, format string) *chdb_streaming_result
46+
streamingResultError func(result *chdb_streaming_result) *string
47+
streamingResultNext func(conn *chdb_conn, result *chdb_streaming_result) *local_result_v2
48+
streamingResultDestroy func(result *chdb_streaming_result)
49+
streamingResultCancel func(conn *chdb_conn, result *chdb_streaming_result)
4550
)
4651

4752
func init() {
@@ -58,5 +63,10 @@ func init() {
5863
purego.RegisterLibFunc(&connectChdb, libchdb, "connect_chdb")
5964
purego.RegisterLibFunc(&closeConn, libchdb, "close_conn")
6065
purego.RegisterLibFunc(&queryConn, libchdb, "query_conn")
66+
purego.RegisterLibFunc(&queryConnStreaming, libchdb, "query_conn_streaming")
67+
purego.RegisterLibFunc(&streamingResultError, libchdb, "chdb_streaming_result_error")
68+
purego.RegisterLibFunc(&streamingResultNext, libchdb, "chdb_streaming_fetch_result")
69+
purego.RegisterLibFunc(&streamingResultCancel, libchdb, "chdb_streaming_cancel_query")
70+
purego.RegisterLibFunc(&streamingResultDestroy, libchdb, "chdb_destroy_result")
6171

6272
}

chdb-purego/chdb.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ type connection struct {
9898
conn **chdb_conn
9999
}
100100

101+
// CancelQuery implements ChdbConn.
102+
func (c *connection) CancelQuery(query ChdbResult) (err error) {
103+
panic("unimplemented")
104+
}
105+
101106
func newChdbConn(conn **chdb_conn) ChdbConn {
102107
c := &connection{
103108
conn: conn,
@@ -136,6 +141,29 @@ func (c *connection) Query(queryStr string, formatStr string) (result ChdbResult
136141
return newChdbResult(res), nil
137142
}
138143

144+
// QueryStreaming implements ChdbConn.
145+
func (c *connection) QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error) {
146+
147+
if c.conn == nil {
148+
return nil, fmt.Errorf("invalid connection")
149+
}
150+
151+
rawConn := *c.conn
152+
153+
res := queryConnStreaming(rawConn, queryStr, formatStr)
154+
if res == nil {
155+
// According to the C ABI of chDB v1.2.0, the C function query_stable_v2
156+
// returns nil if the query returns no data. This is not an error. We
157+
// will change this behavior in the future.
158+
return newStreamingResult(rawConn, res), nil
159+
}
160+
if s := streamingResultError(res); s != nil {
161+
return nil, errors.New(*s)
162+
}
163+
164+
return newStreamingResult(rawConn, res), nil
165+
}
166+
139167
func (c *connection) Ready() bool {
140168
if c.conn != nil {
141169
deref := *c.conn

chdb-purego/streaming.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package chdbpurego
2+
3+
import "errors"
4+
5+
type streamingResult struct {
6+
curConn *chdb_conn
7+
stream *chdb_streaming_result
8+
curChunk ChdbResult
9+
}
10+
11+
func newStreamingResult(conn *chdb_conn, cRes *chdb_streaming_result) ChdbStreamResult {
12+
13+
// nextChunk := streamingResultNext(conn, cRes)
14+
// if nextChunk == nil {
15+
// return nil
16+
// }
17+
18+
res := &streamingResult{
19+
curConn: conn,
20+
stream: cRes,
21+
// curChunk: newChdbResult(nextChunk),
22+
}
23+
24+
// runtime.SetFinalizer(res, res.Free)
25+
return res
26+
27+
}
28+
29+
// Error implements ChdbStreamResult.
30+
func (c *streamingResult) Error() error {
31+
if s := streamingResultError(c.stream); s != nil {
32+
return errors.New(*s)
33+
}
34+
return nil
35+
}
36+
37+
// Free implements ChdbStreamResult.
38+
func (c *streamingResult) Free() {
39+
streamingResultCancel(c.curConn, c.stream)
40+
streamingResultDestroy(c.stream)
41+
c.stream = nil
42+
if c.curChunk != nil {
43+
c.curChunk.Free()
44+
c.curChunk = nil
45+
}
46+
}
47+
48+
// Cancel implements ChdbStreamResult.
49+
func (c *streamingResult) Cancel() {
50+
c.Free()
51+
}
52+
53+
// GetNext implements ChdbStreamResult.
54+
func (c *streamingResult) GetNext() ChdbResult {
55+
if c.curChunk == nil {
56+
nextChunk := streamingResultNext(c.curConn, c.stream)
57+
if nextChunk == nil {
58+
return nil
59+
}
60+
c.curChunk = newChdbResult(nextChunk)
61+
return c.curChunk
62+
}
63+
// free the current chunk before getting the next one
64+
c.curChunk.Free()
65+
c.curChunk = nil
66+
nextChunk := streamingResultNext(c.curConn, c.stream)
67+
if nextChunk == nil {
68+
return nil
69+
}
70+
c.curChunk = newChdbResult(nextChunk)
71+
return c.curChunk
72+
}

chdb-purego/types.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ type local_result_v2 struct {
2424
error_message *byte
2525
}
2626

27+
// clickhouse streaming result struct. for reference: https://github.com/chdb-io/chdb/blob/main/programs/local/chdb.h#L65
28+
type chdb_streaming_result struct {
29+
internal_data unsafe.Pointer
30+
}
31+
2732
// clickhouse background server connection.for reference: https://github.com/chdb-io/chdb/blob/main/programs/local/chdb.h#L82
2833
type chdb_conn struct {
2934
server unsafe.Pointer
@@ -32,7 +37,6 @@ type chdb_conn struct {
3237
}
3338

3439
type ChdbResult interface {
35-
// Raw bytes result buffer, used for reading the result of clickhouse query
3640
Buf() []byte
3741
// String rapresentation of the the buffer
3842
String() string
@@ -50,9 +54,26 @@ type ChdbResult interface {
5054
Free()
5155
}
5256

57+
type ChdbStreamResult interface {
58+
// GetNext returns the next chunk of data from the stream.
59+
// The chunk is a ChdbResult object that can be used to read the data.
60+
// If there are no more chunks, it returns nil.
61+
GetNext() ChdbResult
62+
// Error returns the error message if there was an error during the streaming process.
63+
Error() error
64+
// Cancel cancels the streaming process and frees the underlying memory.
65+
Cancel()
66+
// Free frees the underlying memory and closes the stream.
67+
Free()
68+
}
69+
5370
type ChdbConn interface {
5471
//Query executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr
5572
Query(queryStr string, formatStr string) (result ChdbResult, err error)
73+
// QueryStreaming executes the given queryStr in the underlying clickhouse connection, and output the result in the given formatStr
74+
// The result is a stream of data that can be read in chunks.
75+
// This is useful for large datasets that cannot be loaded into memory all at once.
76+
QueryStreaming(queryStr string, formatStr string) (result ChdbStreamResult, err error)
5677
//Ready returns a boolean indicating if the connections is successfully established.
5778
Ready() bool
5879
//Close the connection and free the underlying allocated memory

chdb.h

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
#pragma once
22

33
#ifdef __cplusplus
4-
# include <condition_variable>
54
# include <cstddef>
65
# include <cstdint>
7-
# include <mutex>
8-
# include <queue>
9-
# include <string>
106
extern "C" {
117
#else
128
# include <stdbool.h>
@@ -55,26 +51,6 @@ CHDB_EXPORT void free_result(struct local_result * result);
5551
CHDB_EXPORT struct local_result_v2 * query_stable_v2(int argc, char ** argv);
5652
CHDB_EXPORT void free_result_v2(struct local_result_v2 * result);
5753

58-
#ifdef __cplusplus
59-
struct query_request
60-
{
61-
std::string query;
62-
std::string format;
63-
};
64-
65-
struct query_queue
66-
{
67-
std::mutex mutex;
68-
std::condition_variable query_cv; // For query submission
69-
std::condition_variable result_cv; // For query result retrieval
70-
query_request current_query;
71-
local_result_v2 * current_result = nullptr;
72-
bool has_query = false;
73-
bool shutdown = false;
74-
bool cleanup_done = false;
75-
};
76-
#endif
77-
7854
/**
7955
* Connection structure for chDB
8056
* Contains server instance, connection state, and query processing queue
@@ -86,11 +62,15 @@ struct chdb_conn
8662
void * queue; /* Query processing queue */
8763
};
8864

65+
typedef struct {
66+
void * internal_data;
67+
} chdb_streaming_result;
68+
8969
/**
9070
* Creates a new chDB connection.
9171
* Only one active connection is allowed per process.
9272
* Creating a new connection with different path requires closing existing connection.
93-
*
73+
*
9474
* @param argc Number of command-line arguments
9575
* @param argv Command-line arguments array (--path=<db_path> to specify database location)
9676
* @return Pointer to connection pointer, or NULL on failure
@@ -101,15 +81,15 @@ CHDB_EXPORT struct chdb_conn ** connect_chdb(int argc, char ** argv);
10181
/**
10282
* Closes an existing chDB connection and cleans up resources.
10383
* Thread-safe function that handles connection shutdown and cleanup.
104-
*
84+
*
10585
* @param conn Pointer to connection pointer to close
10686
*/
10787
CHDB_EXPORT void close_conn(struct chdb_conn ** conn);
10888

10989
/**
11090
* Executes a query on the given connection.
11191
* Thread-safe function that handles query execution in a separate thread.
112-
*
92+
*
11393
* @param conn Connection to execute query on
11494
* @param query SQL query string to execute
11595
* @param format Output format string (e.g., "CSV", default format)
@@ -118,6 +98,51 @@ CHDB_EXPORT void close_conn(struct chdb_conn ** conn);
11898
*/
11999
CHDB_EXPORT struct local_result_v2 * query_conn(struct chdb_conn * conn, const char * query, const char * format);
120100

101+
/**
102+
* Executes a streaming query on the given connection.
103+
* @brief Initializes streaming query execution and returns result handle
104+
* @param conn Connection to execute query on
105+
* @param query SQL query string to execute
106+
* @param format Output format string (e.g. "CSV", default format)
107+
* @return Streaming result handle containing query state or error message
108+
* @note Returns error result if connection is invalid or closed
109+
*/
110+
CHDB_EXPORT chdb_streaming_result * query_conn_streaming(struct chdb_conn * conn, const char * query, const char * format);
111+
112+
/**
113+
* Retrieves error message from streaming result.
114+
* @brief Gets error message associated with streaming query execution
115+
* @param result Streaming result handle from query_conn_streaming()
116+
* @return Null-terminated error message string, or NULL if no error occurred
117+
*/
118+
CHDB_EXPORT const char * chdb_streaming_result_error(chdb_streaming_result * result);
119+
120+
/**
121+
* Fetches next chunk of streaming results.
122+
* @brief Iterates through streaming query results
123+
* @param conn Active connection handle
124+
* @param result Streaming result handle from query_conn_streaming()
125+
* @return Materialized result chunk with data
126+
* @note Returns empty result when stream ends
127+
*/
128+
CHDB_EXPORT struct local_result_v2 * chdb_streaming_fetch_result(struct chdb_conn * conn, chdb_streaming_result * result);
129+
130+
/**
131+
* Cancels ongoing streaming query.
132+
* @brief Aborts streaming query execution and cleans up resources
133+
* @param conn Active connection handle
134+
* @param result Streaming result handle to cancel
135+
*/
136+
CHDB_EXPORT void chdb_streaming_cancel_query(struct chdb_conn * conn, chdb_streaming_result * result);
137+
138+
/**
139+
* Releases resources associated with streaming result.
140+
* @brief Destroys streaming result handle and frees allocated memory
141+
* @param result Streaming result handle to destroy
142+
* @warning Must be called even if query was finished or canceled
143+
*/
144+
CHDB_EXPORT void chdb_destroy_result(chdb_streaming_result * result);
145+
121146
#ifdef __cplusplus
122147
}
123-
#endif
148+
#endif

0 commit comments

Comments
 (0)