Skip to content

Commit 69e2816

Browse files
authored
feat: add possibility to ingest LineProtocol from Fluetd's event (#31)
1 parent b957bbd commit 69e2816

File tree

4 files changed

+63
-13
lines changed

4 files changed

+63
-13
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
### Features
44
1. [#30](https://github.com/influxdata/influxdb-plugin-fluent/pull/30): Field value for `time_key` can be formatted date (`2021-11-05 09:15:49.487727165 +0000`, `2021-11-05T10:04:43.617216Z`)
5+
1. [#31](https://github.com/influxdata/influxdb-plugin-fluent/pull/31): Add possibility to use LineProtocol from Fluetd's record.
56

67
### Dependencies
78
1. [#30](https://github.com/influxdata/influxdb-plugin-fluent/pull/30): Update dependencies:

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ Store Fluentd event to InfluxDB 2 database.
4545
| field_cast_to_float | Turn on/off auto casting Integer value to Float. Helper to avoid mismatch error: 'series type mismatch: already Integer but got Float'. | bool | false |
4646
| time_precision | The time precision of timestamp. You should specify either second (s), millisecond (ms), microsecond (us), or nanosecond (ns). | String | ns |
4747
| time_key | A name of the record key that used as a 'timestamp' instead of event timestamp. If a record key doesn't exists or hasn't value then is used event timestamp. | String | nil |
48+
| line_protocol_key | A name of the record key that contains [LineProtocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/). The value of this key is used for ingesting data into InfluxDB. If a record key doesn't exists or hasn't value then is used event timestamp. | String | nil |
4849

4950
##### Minimal configuration
5051

lib/fluent/plugin/out_influxdb2.rb

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ class InfluxDBOutput < Fluent::Plugin::Output
7373
desc 'A name of the record key that used as a \'timestamp\' instead of event timestamp.' \
7474
'If a record key doesn\'t exists or hasn\'t value then is used event timestamp.'
7575

76+
config_param :line_protocol_key, :string, default: nil
77+
desc 'A name of the record key that contains \'LineProtocol\'.' \
78+
'The value of this key is used for ingesting data into InfluxDB.' \
79+
'For more info see - https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/.'
80+
7681
config_section :buffer do
7782
config_set_default :@type, DEFAULT_BUFFER_TYPE
7883
config_set_default :chunk_keys, ['tag']
@@ -121,22 +126,30 @@ def write(chunk)
121126
tag = chunk.metadata.tag
122127
bucket, measurement = expand_placeholders(chunk)
123128
chunk.msgpack_each do |time, record|
124-
time_formatted = _format_time(time)
125-
point = InfluxDB2::Point
126-
.new(name: measurement)
127-
record.each_pair do |k, v|
128-
if k.eql?(@time_key)
129-
time_formatted = _format_time(v)
130-
else
131-
_parse_field(k, v, point)
129+
if @line_protocol_key
130+
points << record[@line_protocol_key] if record.include?(@line_protocol_key)
131+
else
132+
time_formatted = _format_time(time)
133+
point = InfluxDB2::Point
134+
.new(name: measurement)
135+
record.each_pair do |k, v|
136+
if k.eql?(@time_key)
137+
time_formatted = _format_time(v)
138+
else
139+
_parse_field(k, v, point)
140+
end
141+
point.add_tag('fluentd', tag) if @tag_fluentd
132142
end
133-
point.add_tag('fluentd', tag) if @tag_fluentd
143+
point.time(time_formatted, @precision)
144+
points << point
134145
end
135-
point.time(time_formatted, @precision)
136-
points << point
137146
end
138-
@write_api.write(data: points, bucket: bucket)
139-
log.debug "Written points: #{points}"
147+
if points.empty?
148+
log.debug "Nothing to write for chunk: #{chunk.metadata}"
149+
else
150+
@write_api.write(data: points, bucket: bucket)
151+
log.debug "Written points: #{points}"
152+
end
140153
end
141154

142155
private

test/influxdb/plugin/output_test.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -586,4 +586,39 @@ def test_measurement_as_placeholder
586586
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
587587
times: 1, body: 'placeholder_h2o_tag level=2i,location="europe" 1444897215000000000')
588588
end
589+
590+
def test_line_protocol
591+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
592+
.to_return(status: 204)
593+
driver = create_driver(%(
594+
@type influxdb2
595+
token my-token
596+
bucket my-bucket
597+
org my-org
598+
time_precision ns
599+
line_protocol_key lp_key
600+
))
601+
driver.run(default_tag: 'h2o_tag') do
602+
emit_documents(driver, 'location' => 'europe', 'level' => 2, 'lp_key' => 'mem,tag=1 field=10 102030')
603+
end
604+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
605+
times: 1, body: 'mem,tag=1 field=10 102030')
606+
end
607+
608+
def test_line_protocol_not_defined
609+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
610+
.to_return(status: 204)
611+
driver = create_driver(%(
612+
@type influxdb2
613+
token my-token
614+
bucket my-bucket
615+
org my-org
616+
time_precision ns
617+
line_protocol_key lp_key
618+
))
619+
driver.run(default_tag: 'h2o_tag') do
620+
emit_documents(driver)
621+
end
622+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns', times: 0)
623+
end
589624
end

0 commit comments

Comments
 (0)