Skip to content

Commit 4662bf7

Browse files
authored
[refactor](ingester-example): reorganize project structure and improve database connection configuration. (#86)
* refactor(ingester-example): reorganize project structure and improve database connection - Rename packages to better reflect their purpose (e.g., quickstart, benchmark) - Move database connection logic to a central DBConnector class - Use properties file for database connection details - Update logging and error handling - Improve code readability and maintainability * docs(ingester-example): update README and reorganize imports - Update README with correct file paths for examples- Reorganize import statements in Java files for better readability * docs(bench): remove redundant environment variables from benchmark classes - Remove db_endpoint and db_name from BulkWriteBenchmark and StreamingWriteBenchmark - Update class comments to reflect the removal of these environment variables * docs(bench): remove unnecessary db endpoint and name comments - Remove comments about db_endpoint and db_name as they are not relevant to the batching write benchmark- Keep existing information about batch_size_per_request, zstd_compression, and max_points_per_second
1 parent bf135df commit 4662bf7

18 files changed

+102
-55
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ Context ctx = Context.newDefault();
225225
// Add a hint to make the database create a table with the specified TTL (time-to-live)
226226
ctx = ctx.withHint("ttl", "3d");
227227
// Set the compression algorithm to Zstd.
228-
ctx = ctx.withCompression(Compression.Zstd)
228+
ctx = ctx.withCompression(Compression.Zstd);
229229
// Use the ctx when writing data to GreptimeDB
230230
CompletableFuture<Result<WriteOk, Err>> future = client.write(Arrays.asList(table1, table2), WriteOp.Insert, ctx);
231231
```

ingester-example/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,15 @@ These batching approaches can dramatically improve performance compared to makin
3434

3535
#### Examples
3636

37-
- [LowLevelApiWriteQuickStart.java](src/main/java/io/greptime/LowLevelApiWriteQuickStart.java)
37+
- [LowLevelApiWriteQuickStart.java](src/main/java/io/greptime/quickstart/write/LowLevelApiWriteQuickStart.java)
3838

3939
This example demonstrates how to use the low-level API to write data to GreptimeDB. It covers:
4040
* Defining table schemas with tags, timestamps, and fields
4141
* Writing multiple rows of data to different tables
4242
* Processing write results using the Result pattern
4343
* Deleting data using the `WriteOp.Delete` operation
4444

45-
- [HighLevelApiWriteQuickStart.java](src/main/java/io/greptime/HighLevelApiWriteQuickStart.java)
45+
- [HighLevelApiWriteQuickStart.java](src/main/java/io/greptime/quickstart/write/HighLevelApiWriteQuickStart.java)
4646

4747
This example demonstrates how to use the high-level API to write data to GreptimeDB. It covers:
4848
* Writing data using POJO objects with annotations
@@ -61,15 +61,15 @@ This API is particularly well-suited for:
6161

6262
#### Examples
6363

64-
- [LowLevelApiStreamWriteQuickStart.java](src/main/java/io/greptime/LowLevelApiStreamWriteQuickStart.java)
64+
- [LowLevelApiStreamWriteQuickStart.java](src/main/java/io/greptime/quickstart/write/LowLevelApiStreamWriteQuickStart.java)
6565

6666
This example demonstrates how to use the low-level API to write data to GreptimeDB using stream. It covers:
6767
* Defining table schemas with tags, timestamps, and fields
6868
* Writing multiple rows of data to different tables via streaming
6969
* Finalizing the stream and retrieving write results
7070
* Deleting data using the `WriteOp.Delete` operation
7171

72-
- [HighLevelApiStreamWriteQuickStart.java](src/main/java/io/greptime/HighLevelApiStreamWriteQuickStart.java)
72+
- [HighLevelApiStreamWriteQuickStart.java](src/main/java/io/greptime/quickstart/write/HighLevelApiStreamWriteQuickStart.java)
7373

7474
This example demonstrates how to use the high-level API to write data to GreptimeDB using stream. It covers:
7575
* Writing POJO objects directly to the stream
@@ -99,7 +99,7 @@ This API is ideal for scenarios such as:
9999

100100
### Examples
101101

102-
- [BulkWriteApiQuickStart.java](src/main/java/io/greptime/BulkWriteApiQuickStart.java)
102+
- [BulkWriteApiQuickStart.java](src/main/java/io/greptime/quickstart/write/BulkWriteApiQuickStart.java)
103103

104104
This example demonstrates how to use the bulk write API to write large volumes of data to a single table with maximum efficiency. It covers:
105105
* Configuring the bulk writer for optimal performance

ingester-example/src/main/java/io/greptime/bench/DBConnector.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,36 @@
1818

1919
import io.greptime.GreptimeDB;
2020
import io.greptime.options.GreptimeOptions;
21+
import io.greptime.quickstart.query.QueryJDBCQuickStart;
22+
import java.io.IOException;
23+
import java.util.Properties;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
2126

2227
/**
2328
* DBConnector is a helper class to connect to a GreptimeDB instance.
2429
*/
2530
public class DBConnector {
2631

27-
public static GreptimeDB connectTo(String[] endpoints, String dbname) {
28-
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, dbname)
32+
private static final Logger LOG = LoggerFactory.getLogger(DBConnector.class);
33+
34+
public static GreptimeDB connect() {
35+
Properties prop = new Properties();
36+
try {
37+
prop.load(QueryJDBCQuickStart.class.getResourceAsStream("/db-connection.properties"));
38+
} catch (IOException e) {
39+
throw new RuntimeException(e);
40+
}
41+
String database = (String) prop.get("db.database");
42+
String endpointsStr = prop.getProperty("db.endpoints");
43+
String[] endpoints = endpointsStr.split(",");
44+
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database)
2945
.writeMaxRetries(0)
3046
.defaultStreamMaxWritePointsPerSecond(Integer.MAX_VALUE)
3147
.useZeroCopyWriteInBulkWrite(true)
3248
.build();
49+
LOG.info("Connect to db: {}, endpoint: {}", database, endpointsStr);
50+
3351
return GreptimeDB.create(opts);
3452
}
3553
}

ingester-example/src/main/java/io/greptime/bench/BatchingWriteBenchmark.java renamed to ingester-example/src/main/java/io/greptime/bench/benchmark/BatchingWriteBenchmark.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime.bench;
17+
package io.greptime.bench.benchmark;
1818

1919
import io.greptime.GreptimeDB;
2020
import io.greptime.WriteOp;
21+
import io.greptime.bench.DBConnector;
22+
import io.greptime.bench.TableDataProvider;
2123
import io.greptime.common.util.MetricsUtil;
2224
import io.greptime.common.util.ServiceLoader;
2325
import io.greptime.common.util.SystemPropertyUtil;
@@ -40,8 +42,6 @@
4042
* BatchingWriteBenchmark is a benchmark for the batching write API of GreptimeDB.
4143
*
4244
* Env:
43-
* - db_endpoint: the endpoint of the GreptimeDB server
44-
* - db_name: the name of the database
4545
* - batch_size_per_request: the batch size per request
4646
* - zstd_compression: whether to use zstd compression
4747
* - max_points_per_second: the max number of points that can be written per second, exceeding which may cause blockage
@@ -51,21 +51,18 @@ public class BatchingWriteBenchmark {
5151
private static final Logger LOG = LoggerFactory.getLogger(BatchingWriteBenchmark.class);
5252

5353
public static void main(String[] args) throws Exception {
54-
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
55-
String dbName = SystemPropertyUtil.get("db_name", "public");
5654
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
5755
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
5856
int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE);
59-
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
57+
6058
LOG.info("Using zstd compression: {}", zstdCompression);
6159
LOG.info("Batch size: {}", batchSize);
6260
LOG.info("Max points per second: {}", maxPointsPerSecond);
6361

6462
// Start a metrics exporter
6563
MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry());
6664
metricsExporter.init(ExporterOptions.newDefault());
67-
68-
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
65+
GreptimeDB greptimeDB = DBConnector.connect();
6966

7067
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
7168
Context ctx = Context.newDefault().withCompression(compression);

ingester-example/src/main/java/io/greptime/bench/BulkWriteBenchmark.java renamed to ingester-example/src/main/java/io/greptime/bench/benchmark/BulkWriteBenchmark.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime.bench;
17+
package io.greptime.bench.benchmark;
1818

1919
import io.greptime.BulkStreamWriter;
2020
import io.greptime.BulkWrite;
2121
import io.greptime.GreptimeDB;
22+
import io.greptime.bench.DBConnector;
23+
import io.greptime.bench.TableDataProvider;
2224
import io.greptime.common.util.MetricsUtil;
2325
import io.greptime.common.util.ServiceLoader;
2426
import io.greptime.common.util.SystemPropertyUtil;
@@ -37,8 +39,6 @@
3739
* BulkWriteBenchmark is a benchmark for the bulk write API of GreptimeDB.
3840
*
3941
* Env:
40-
* - db_endpoint: the endpoint of the GreptimeDB server
41-
* - db_name: the name of the database
4242
* - batch_size_per_request: the batch size per request
4343
* - zstd_compression: whether to use zstd compression
4444
*/
@@ -47,19 +47,16 @@ public class BulkWriteBenchmark {
4747
private static final Logger LOG = LoggerFactory.getLogger(BulkWriteBenchmark.class);
4848

4949
public static void main(String[] args) throws Exception {
50-
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
51-
String dbName = SystemPropertyUtil.get("db_name", "public");
5250
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
5351
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
54-
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
5552
LOG.info("Using zstd compression: {}", zstdCompression);
5653
LOG.info("Batch size: {}", batchSize);
5754

5855
// Start a metrics exporter
5956
MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry());
6057
metricsExporter.init(ExporterOptions.newDefault());
6158

62-
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
59+
GreptimeDB greptimeDB = DBConnector.connect();
6360

6461
BulkWrite.Config cfg = BulkWrite.Config.newBuilder()
6562
.allocatorInitReservation(0)

ingester-example/src/main/java/io/greptime/bench/StreamingWriteBenchmark.java renamed to ingester-example/src/main/java/io/greptime/bench/benchmark/StreamingWriteBenchmark.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime.bench;
17+
package io.greptime.bench.benchmark;
1818

1919
import io.greptime.GreptimeDB;
2020
import io.greptime.StreamWriter;
21+
import io.greptime.bench.DBConnector;
22+
import io.greptime.bench.TableDataProvider;
2123
import io.greptime.common.util.MetricsUtil;
2224
import io.greptime.common.util.ServiceLoader;
2325
import io.greptime.common.util.SystemPropertyUtil;
@@ -37,8 +39,6 @@
3739
* StreamingWriteBenchmark is a benchmark for the streaming write API of GreptimeDB.
3840
*
3941
* Env:
40-
* - db_endpoint: the endpoint of the GreptimeDB server
41-
* - db_name: the name of the database
4242
* - batch_size_per_request: the batch size per request
4343
* - zstd_compression: whether to use zstd compression
4444
* - max_points_per_second: the max number of points that can be written per second, exceeding which may cause blockage
@@ -48,12 +48,9 @@ public class StreamingWriteBenchmark {
4848
private static final Logger LOG = LoggerFactory.getLogger(StreamingWriteBenchmark.class);
4949

5050
public static void main(String[] args) throws Exception {
51-
String endpoint = SystemPropertyUtil.get("db_endpoint", "127.0.0.1:4001");
52-
String dbName = SystemPropertyUtil.get("db_name", "public");
5351
boolean zstdCompression = SystemPropertyUtil.getBool("zstd_compression", true);
5452
int batchSize = SystemPropertyUtil.getInt("batch_size_per_request", 64 * 1024);
5553
int maxPointsPerSecond = SystemPropertyUtil.getInt("max_points_per_second", Integer.MAX_VALUE);
56-
LOG.info("Connect to db: {}, endpoint: {}", dbName, endpoint);
5754
LOG.info("Using zstd compression: {}", zstdCompression);
5855
LOG.info("Batch size: {}", batchSize);
5956
LOG.info("Max points per second: {}", maxPointsPerSecond);
@@ -62,7 +59,7 @@ public static void main(String[] args) throws Exception {
6259
MetricsExporter metricsExporter = new MetricsExporter(MetricsUtil.metricRegistry());
6360
metricsExporter.init(ExporterOptions.newDefault());
6461

65-
GreptimeDB greptimeDB = DBConnector.connectTo(new String[] {endpoint}, dbName);
62+
GreptimeDB greptimeDB = DBConnector.connect();
6663

6764
Compression compression = zstdCompression ? Compression.Zstd : Compression.None;
6865
Context ctx = Context.newDefault().withCompression(compression);

ingester-example/src/main/java/io/greptime/Cpu.java renamed to ingester-example/src/main/java/io/greptime/metric/Cpu.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime;
17+
package io.greptime.metric;
1818

1919
import io.greptime.models.Column;
2020
import io.greptime.models.DataType;
2121
import io.greptime.models.Metric;
2222

2323
/**
24-
*
24+
* Memory metric class that represents CPU usage statistics.
2525
*/
2626
@Metric(name = "cpu_metric")
2727
public class Cpu {

ingester-example/src/main/java/io/greptime/Memory.java renamed to ingester-example/src/main/java/io/greptime/metric/Memory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime;
17+
package io.greptime.metric;
1818

1919
import io.greptime.models.Column;
2020
import io.greptime.models.DataType;
2121
import io.greptime.models.Metric;
2222

2323
/**
24-
*
24+
* Memory metric class that represents memory usage statistics.
2525
*/
2626
@Metric(name = "mem_metric")
2727
public class Memory {

ingester-example/src/main/java/io/greptime/TestConnector.java renamed to ingester-example/src/main/java/io/greptime/quickstart/TestConnector.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime;
17+
package io.greptime.quickstart;
1818

19+
import io.greptime.GreptimeDB;
1920
import io.greptime.common.util.SerializingExecutor;
2021
import io.greptime.limit.LimitedPolicy;
2122
import io.greptime.models.AuthInfo;
2223
import io.greptime.options.GreptimeOptions;
24+
import io.greptime.quickstart.query.QueryJDBCQuickStart;
2325
import io.greptime.rpc.RpcOptions;
26+
import java.io.IOException;
27+
import java.util.Properties;
2428

2529
/**
2630
*
@@ -30,11 +34,19 @@ public class TestConnector {
3034
public static GreptimeDB connectToDefaultDB() {
3135
// GreptimeDB has a default database named "public" in the default catalog "greptime",
3236
// we can use it as the test database
33-
String database = "public";
37+
Properties prop = new Properties();
38+
39+
try {
40+
prop.load(QueryJDBCQuickStart.class.getResourceAsStream("/db-connection.properties"));
41+
} catch (IOException e) {
42+
throw new RuntimeException(e);
43+
}
44+
String database = (String) prop.get("db.database");
3445
// By default, GreptimeDB listens on port 4001 using the gRPC protocol.
3546
// We can provide multiple endpoints that point to the same GreptimeDB cluster.
3647
// The client will make calls to these endpoints based on a load balancing strategy.
37-
String[] endpoints = {"127.0.0.1:4001"};
48+
String endpointsStr = prop.getProperty("db.endpoints");
49+
String[] endpoints = endpointsStr.split(",");
3850
GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints, database) // Optional, the default value is fine.
3951
// Asynchronous thread pool, which is used to handle various asynchronous
4052
// tasks in the SDK (You are using a purely asynchronous SDK). If you do not

ingester-example/src/main/java/io/greptime/QueryJDBC.java renamed to ingester-example/src/main/java/io/greptime/quickstart/query/QueryJDBCQuickStart.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.greptime;
17+
package io.greptime.quickstart.query;
1818

19+
import io.greptime.GreptimeDB;
20+
import io.greptime.metric.Cpu;
21+
import io.greptime.quickstart.TestConnector;
1922
import java.io.IOException;
2023
import java.sql.Connection;
2124
import java.sql.DriverManager;
@@ -33,9 +36,9 @@
3336
/**
3437
*
3538
*/
36-
public class QueryJDBC {
39+
public class QueryJDBCQuickStart {
3740

38-
private static final Logger LOG = LoggerFactory.getLogger(QueryJDBC.class);
41+
private static final Logger LOG = LoggerFactory.getLogger(QueryJDBCQuickStart.class);
3942

4043
public static void main(String[] args) throws Exception {
4144
GreptimeDB greptimeDB = TestConnector.connectToDefaultDB();
@@ -82,7 +85,7 @@ public static void main(String[] args) throws Exception {
8285

8386
public static Connection getConnection() throws IOException, ClassNotFoundException, SQLException {
8487
Properties prop = new Properties();
85-
prop.load(QueryJDBC.class.getResourceAsStream("/db-connection.properties"));
88+
prop.load(QueryJDBCQuickStart.class.getResourceAsStream("/db-connection.properties"));
8689

8790
String dbName = (String) prop.get("db.database-driver");
8891

0 commit comments

Comments
 (0)