Skip to content

Commit b957bbd

Browse files
authored
feat: field value for time_key can be formatted date (#30)
1 parent e7f3c94 commit b957bbd

File tree

6 files changed

+140
-12
lines changed

6 files changed

+140
-12
lines changed

.circleci/config.yml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ commands:
4141
- restore_cache:
4242
name: Restoring Gem Cache
4343
keys:
44-
- &cache-key gem-cache-v2-{{ checksum "fluent-plugin-influxdb-v2.gemspec" }}-<< parameters.ruby-image >>
45-
- gem-cache-v2-{{ checksum "fluent-plugin-influxdb-v2.gemspec" }}
46-
- gem-cache-v2-
44+
- &cache-key gem-cache-v3-{{ checksum "fluent-plugin-influxdb-v2.gemspec" }}-<< parameters.ruby-image >>
45+
- gem-cache-v3-{{ checksum "fluent-plugin-influxdb-v2.gemspec" }}
46+
- gem-cache-v3-
4747
- run:
4848
name: Install dependencies
4949
command: |
@@ -120,6 +120,9 @@ workflows:
120120
- tests-ruby:
121121
name: ruby-2.7
122122
ruby-image: "cimg/ruby:2.7"
123+
- tests-ruby:
124+
name: ruby-3.0
125+
ruby-image: "cimg/ruby:3.0"
123126
- tests-ruby:
124127
name: ruby-2.6
125128
- tests-ruby:

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
## 1.9.0 [unreleased]
22

3+
### Features
4+
1. [#30](https://github.com/influxdata/influxdb-plugin-fluent/pull/30): Field value for `time_key` can be formatted date (`2021-11-05 09:15:49.487727165 +0000`, `2021-11-05T10:04:43.617216Z`)
5+
6+
### Dependencies
7+
1. [#30](https://github.com/influxdata/influxdb-plugin-fluent/pull/30): Update dependencies:
8+
- influxdb-client to 2.1.0
9+
310
### CI
411
1. [#27](https://github.com/influxdata/influxdb-plugin-fluent/pull/27): Switch to next-gen CircleCI's convenience images
12+
1. [#30](https://github.com/influxdata/influxdb-plugin-fluent/pull/30): Add Ruby 3.0 into CI
513

614
## 1.8.0 [2021-08-20]
715

fluent-plugin-influxdb-v2.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ Gem::Specification.new do |spec|
4747
spec.required_ruby_version = '>= 2.2.0'
4848

4949
spec.add_runtime_dependency 'fluentd', '~> 1.8'
50-
spec.add_runtime_dependency 'influxdb-client', '1.12.0'
50+
spec.add_runtime_dependency 'influxdb-client', '2.1.0'
5151

5252
spec.add_development_dependency 'bundler', '~> 2.0'
5353
spec.add_development_dependency 'codecov', '~> 0.1.16'

lib/fluent/plugin/out_influxdb2.rb

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,12 @@ def write(chunk)
121121
tag = chunk.metadata.tag
122122
bucket, measurement = expand_placeholders(chunk)
123123
chunk.msgpack_each do |time, record|
124-
if time.is_a?(Integer)
125-
time_formatted = time
126-
else
127-
nano_seconds = time.sec * 1e9
128-
nano_seconds += time.nsec
129-
time_formatted = @precision_formatter.call(nano_seconds)
130-
end
124+
time_formatted = _format_time(time)
131125
point = InfluxDB2::Point
132126
.new(name: measurement)
133127
record.each_pair do |k, v|
134128
if k.eql?(@time_key)
135-
time_formatted = v
129+
time_formatted = _format_time(v)
136130
else
137131
_parse_field(k, v, point)
138132
end
@@ -157,6 +151,27 @@ def expand_placeholders(chunk)
157151
[bucket, measurement]
158152
end
159153

154+
def _format_time(time)
155+
if time.is_a?(Integer)
156+
time
157+
elsif time.is_a?(Float)
158+
time
159+
elsif time.is_a?(Fluent::EventTime)
160+
nano_seconds = time.sec * 1e9
161+
nano_seconds += time.nsec
162+
@precision_formatter.call(nano_seconds)
163+
elsif time.is_a?(String)
164+
begin
165+
_format_time(Fluent::EventTime.parse(time))
166+
rescue StandardError => e
167+
log.debug "Cannot parse timestamp: #{time} due: #{e}"
168+
time
169+
end
170+
else
171+
time
172+
end
173+
end
174+
160175
def _parse_field(key, value, point)
161176
if @tag_keys.include?(key)
162177
point.add_tag(key, value)

test/.rubocop.yml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# The MIT License
3+
#
4+
# Permission is hereby granted, free of charge, to any person obtaining a copy
5+
# of this software and associated documentation files (the "Software"), to deal
6+
# in the Software without restriction, including without limitation the rights
7+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8+
# copies of the Software, and to permit persons to whom the Software is
9+
# furnished to do so, subject to the following conditions:
10+
#
11+
# The above copyright notice and this permission notice shall be included in
12+
# all copies or substantial portions of the Software.
13+
#
14+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
20+
# THE SOFTWARE.
21+
#
22+
23+
inherit_from: ../.rubocop.yml
24+
Metrics/ClassLength:
25+
Enabled: false
26+

test/influxdb/plugin/output_test.rb

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,82 @@ def test_time_key
381381
times: 1, body: 'h2o_tag,version=v.10 level=2i,location="europe" 1544897215000000000')
382382
end
383383

384+
def test_time_float
385+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
386+
.to_return(status: 204)
387+
driver = create_driver(%(
388+
@type influxdb2
389+
token my-token
390+
bucket my-bucket
391+
org my-org
392+
tag_keys ["version"]
393+
time_key time
394+
))
395+
driver.run(default_tag: 'h2o_tag') do
396+
emit_documents(driver, 'location' => 'europe', 'level' => 2, 'version' => 'v.10',
397+
'time' => 1_544_897_215_000_000_000.123)
398+
end
399+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
400+
times: 1, body: 'h2o_tag,version=v.10 level=2i,location="europe" 1544897215000000000')
401+
end
402+
403+
def test_time_key_rfc_3339_format
404+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
405+
.to_return(status: 204)
406+
driver = create_driver(%(
407+
@type influxdb2
408+
token my-token
409+
bucket my-bucket
410+
org my-org
411+
tag_keys ["version"]
412+
time_key time
413+
))
414+
driver.run(default_tag: 'h2o_tag') do
415+
emit_documents(driver, 'location' => 'europe', 'level' => 2, 'version' => 'v.10',
416+
'time' => '2021-11-05T10:04:43.617216Z')
417+
end
418+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
419+
times: 1, body: 'h2o_tag,version=v.10 level=2i,location="europe" 1636106683617216000')
420+
end
421+
422+
def test_time_key_log_format
423+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
424+
.to_return(status: 204)
425+
driver = create_driver(%(
426+
@type influxdb2
427+
token my-token
428+
bucket my-bucket
429+
org my-org
430+
tag_keys ["version"]
431+
time_key time
432+
))
433+
driver.run(default_tag: 'h2o_tag') do
434+
emit_documents(driver, 'location' => 'europe', 'level' => 2, 'version' => 'v.10',
435+
'time' => '2021-11-05 09:15:49.487727165 +0000')
436+
end
437+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
438+
times: 1, body: 'h2o_tag,version=v.10 level=2i,location="europe" 1636103749487727104')
439+
end
440+
441+
def test_time_key_not_parseable
442+
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
443+
.to_return(status: 204)
444+
driver = create_driver(%(
445+
@type influxdb2
446+
token my-token
447+
bucket my-bucket
448+
org my-org
449+
tag_keys ["version"]
450+
time_key time
451+
))
452+
driver.run(default_tag: 'h2o_tag') do
453+
emit_documents(driver, 'location' => 'europe', 'level' => 2, 'version' => 'v.10',
454+
'time' => '1544897215000000000')
455+
end
456+
assert_requested(:post, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns',
457+
times: 1, body: 'h2o_tag,version=v.10 level=2i,location="europe" 1544897215000000000')
458+
end
459+
384460
def test_field_cast_to_float
385461
stub_request(:any, 'https://localhost:8086/api/v2/write?bucket=my-bucket&org=my-org&precision=ns')
386462
.to_return(status: 204)

0 commit comments

Comments
 (0)