Skip to content

Commit 9a73100

Browse files
authored
feat: add placeholder support for bucket & measurement fields (#26)
1 parent c5811e0 commit 9a73100

File tree

3 files changed

+54
-6
lines changed

3 files changed

+54
-6
lines changed

CHANGELOG.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
## 1.8.0 [unreleased]
22

3+
### Features
4+
1. [#26](https://github.com/influxdata/influxdb-plugin-fluent/pull/26): Add placeholder support for bucket & measurement fields
5+
36
## 1.7.0 [2021-03-05]
47

58
### Features
6-
1. [#23](https://github.com/influxdata/influxdb-plugin-fluent/pull/23): Added possibility to specify the certification verification behaviour
9+
1. [#23](https://github.com/influxdata/influxdb-plugin-fluent/pull/23): Add possibility to specify the certification verification behaviour
710

811
### CI
9-
1. [#24](https://github.com/influxdata/influxdb-plugin-fluent/pull/24): Updated stable image to `influxdb:latest` and nightly to `quay.io/influxdb/influxdb:nightly`
12+
1. [#24](https://github.com/influxdata/influxdb-plugin-fluent/pull/24): Updat stable image to `influxdb:latest` and nightly to `quay.io/influxdb/influxdb:nightly`
1013

1114
## 1.6.0 [2020-10-02]
1215

@@ -18,15 +21,15 @@
1821

1922
## 1.5.0 [2020-07-17]
2023

21-
1. [#12](https://github.com/influxdata/influxdb-plugin-fluent/pull/12): Renamed gem to `fluent-plugin-influxdb-v2`
24+
1. [#12](https://github.com/influxdata/influxdb-plugin-fluent/pull/12): Rename gem to `fluent-plugin-influxdb-v2`
2225

2326
### Dependencies
2427
1. [#11](https://github.com/influxdata/influxdb-plugin-fluent/pull/11): Upgrade InfluxDB client to 1.6.0
2528

2629
## 1.4.0 [2020-06-19]
2730

2831
### Features
29-
1. [#8](https://github.com/influxdata/influxdb-plugin-fluent/pull/8): Added support for nested fields
32+
1. [#8](https://github.com/influxdata/influxdb-plugin-fluent/pull/8): Add support for nested fields
3033

3134
### Dependencies
3235
1. [#9](https://github.com/influxdata/influxdb-plugin-fluent/pull/9): Upgrade InfluxDB client to 1.5.0

lib/fluent/plugin/out_influxdb2.rb

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ def multi_workers_ready?
119119
def write(chunk)
120120
points = []
121121
tag = chunk.metadata.tag
122-
measurement = @measurement || tag
122+
bucket, measurement = expand_placeholders(chunk)
123123
chunk.msgpack_each do |time, record|
124124
if time.is_a?(Integer)
125125
time_formatted = time
@@ -141,12 +141,22 @@ def write(chunk)
141141
point.time(time_formatted, @precision)
142142
points << point
143143
end
144-
@write_api.write(data: points)
144+
@write_api.write(data: points, bucket: bucket)
145145
log.debug "Written points: #{points}"
146146
end
147147

148148
private
149149

150+
def expand_placeholders(chunk)
151+
bucket = extract_placeholders(@bucket, chunk)
152+
measurement = if @measurement
153+
extract_placeholders(@measurement, chunk)
154+
else
155+
chunk.metadata.tag
156+
end
157+
[bucket, measurement]
158+
end
159+
150160
def _parse_field(key, value, point)
151161
if @tag_keys.include?(key)
152162
point.add_tag(key, value)

test/influxdb/plugin/output_test.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,4 +475,39 @@ def emit_documents(driver, data = { 'location' => 'europe', 'level' => 2 })
475475
driver.feed(time, data)
476476
time
477477
end
478+
479+
def test_bucket_as_placeholder
480+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=placeholder_h2o_tag&org=my-org&precision=ns')
481+
.to_return(status: 204)
482+
driver = create_driver(%(
483+
@type influxdb2
484+
token my-token
485+
bucket placeholder_${tag}
486+
org my-org
487+
time_precision ns
488+
))
489+
driver.run(default_tag: 'h2o_tag') do
490+
emit_documents(driver)
491+
end
492+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=placeholder_h2o_tag&org=my-org&precision=ns',
493+
times: 1, body: 'h2o_tag level=2i,location="europe" 1444897215000000000')
494+
end
495+
496+
def test_measurement_as_placeholder
497+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
498+
.to_return(status: 204)
499+
driver = create_driver(%(
500+
@type influxdb2
501+
token my-token
502+
bucket my-bucket
503+
org my-org
504+
time_precision ns
505+
measurement placeholder_${tag}
506+
))
507+
driver.run(default_tag: 'h2o_tag') do
508+
emit_documents(driver)
509+
end
510+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
511+
times: 1, body: 'placeholder_h2o_tag level=2i,location="europe" 1444897215000000000')
512+
end
478513
end

0 commit comments

Comments
 (0)