-
Notifications
You must be signed in to change notification settings - Fork 2k
[Feature][http-Sink] Implementing http batch writes #9292
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This pull request implements HTTP batch writes for the HTTP sink connector in SeaTunnel. Key changes include:
- Adding batch processing logic in HttpSinkWriter for both array and object modes
- Introducing new configuration options (array_mode, batch_size, request_interval_ms, and format) for HTTP sinks
- Updating both tests and documentation to verify and explain the new batch features
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkBatchWriterTest.java | Added tests for both object and array modes to validate individual and batch processing |
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java | Enhanced writer logic with batch buffering and configurable HTTP request intervals in array mode |
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java | Integrated new options into the sink factory for proper instantiation of the sink writer |
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSink.java | Updated to invoke the new writer with batch processing parameters |
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/config/HttpSinkOptions.java | Added new options to support batch processing configuration |
docs/zh/connector-v2/sink/Http.md docs/en/connector-v2/sink/Http.md |
Updated documentation to describe the new batch processing features and their configuration |
byte[] serialize = serializationSchema.serialize(row); | ||
jsonRecords.add(new String(serialize)); | ||
} | ||
String body = "[" + String.join(",", jsonRecords) + "]"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Consider using a dedicated JSON library to construct the JSON array, which can help ensure proper formatting and handle edge cases more robustly.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current way of manually constructing JSON arrays in the code does have some potential problems, I have constructed json arrays using Jackson
.defaultValue(0) | ||
.withDescription("The interval milliseconds between two HTTP requests"); | ||
|
||
public static final Option<String> FORMAT = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The description for the FORMAT option indicates that only 'json' is supported; consider clarifying the documentation or renaming the option to avoid confusion regarding supported formats.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Http.md
documentation already states that
docs/en/connector-v2/sink/Http.md
Outdated
| array_mode | Boolean| No | false | Send data as a JSON array when true, or as a single JSON object when false (default) | | ||
| batch_size | Int | No | 1 | The batch size of records to send in one HTTP request. Only works when array_mode is true. | | ||
| request_interval_ms | Int | No | 0 | The interval milliseconds between two HTTP requests, to avoid sending requests too frequently. | | ||
| format | String | No | json | The format of batch data. Currently only "json" is supported, which will send data as JSON array. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what another format we can support in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently think of just json, later expansion, the code can be modified to enumerate the judgment can be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove useless config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 removed useless config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs should be updated too.
docs/en/connector-v2/sink/Http.md
Outdated
| array_mode | Boolean| No | false | Send data as a JSON array when true, or as a single JSON object when false (default) | | ||
| batch_size | Int | No | 1 | The batch size of records to send in one HTTP request. Only works when array_mode is true. | | ||
| request_interval_ms | Int | No | 0 | The interval milliseconds between two HTTP requests, to avoid sending requests too frequently. | | ||
| format | String | No | json | The format of batch data. Currently only "json" is supported, which will send data as JSON array. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs should be updated too.
boolean arrayMode = pluginConfig.get(HttpSinkOptions.ARRAY_MODE); | ||
int batchSize = pluginConfig.get(HttpSinkOptions.BATCH_SIZE); | ||
int requestIntervalMs = pluginConfig.get(HttpSinkOptions.REQUEST_INTERVAL_MS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add those config into
Line 29 in 3783816
public class HttpParameter implements Serializable { |
} | ||
|
||
@Override | ||
public void write(SeaTunnelRow element) throws IOException { | ||
if (!arrayMode) { | ||
// Object mode: send each record individually, ignore batch_size setting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Object mode: send each record individually, ignore batch_size setting |
// Object mode: send each record individually, ignore batch_size setting | ||
writeSingleRecord(element); | ||
} else { | ||
// Array mode: batch processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Array mode: batch processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
try { | ||
// only support post web hook | ||
// Send HTTP request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Send HTTP request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -76,8 +166,15 @@ public void write(SeaTunnelRow element) throws IOException { | |||
|
|||
@Override | |||
public void close() throws IOException { | |||
if (arrayMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should invoke flush
method when invoke prepareCommit
method too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
if (Objects.nonNull(httpClient)) { | ||
httpClient.close(); | ||
} | ||
} | ||
|
||
protected HttpClientProvider createHttpClient(HttpParameter httpParameter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protected HttpClientProvider createHttpClient(HttpParameter httpParameter) { | |
@VisibleForTesting | |
protected HttpClientProvider createHttpClient(HttpParameter httpParameter) { |
return; | ||
} | ||
|
||
// Check request interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Check request interval |
Please do not add useless comment.
Thread.currentThread().interrupt(); | ||
log.warn("Sleep interrupted", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw exception directly. Othewise the writer woule never be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Purpose of this pull request
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
release-note
.