Skip to content

filter_lookup: added filter for key value lookup #10620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

olegmukhin
Copy link

@olegmukhin olegmukhin commented Jul 19, 2025

Added a new LookUp filter to address use case when enrichment of record is required based on simple static key value lookup.

The filter loads a CSV file into a hash table for performance. It consider first column of the CSV to be the key and the second column to be the value. All other columns are ignored.

Where a record value (identified by lookup_key input) matches the key from the CSV, the value from the CSV row is added under a new key (defined by result_key input) to the record.


Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • [N/A] Run local packaging test showing all targets (including any new ones) build.
  • [N/A] Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • [N/A] Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

New filter aims to address use case of simple data enrichment using
static key value lookup.

The filter loads first two columns of CSV file into memory as a hash
table. When a specified record value matches the key in the hash table
the value will be appended to the record (based on key name defined
in the filter inputs).)

Tested with valgrind.

Signed-off-by: Oleg Mukhin <[email protected]>
@olegmukhin
Copy link
Author

Test configuration

Fluent Bit YAML Configuration

parsers:
  - name: json
    format: json

pipeline:
  inputs:
    - name: tail
      path: /src/devices.log
      read_from_head: true
      parser: json

  filters:
    - name: lookup
      match: "*"
      file: /src/device-bu.csv
      lookup_key: $hostname
      result_key: business_line
      ignore_case: true

  outputs:
    - name: stdout
      match: "*"

To test new filter we will load a range of log values including, strings (different cases), integer, boolean, embedded quotes and other value types.

devices.log

{"hostname": "server-prod-001"}
{"hostname": "Server-Prod-001"}
{"hostname": "db-test-abc"}
{"hostname": 123}
{"hostname": true}
{"hostname": " host with space "}
{"hostname": "quoted \"host\""}
{"hostname": "unknown-host"}
{}
{"hostname": [1,2,3]}
{"hostname": {"sub": "val"}}
{"hostname": " "}

CSV configuration will aim to test key overwrites, different types of strings, use and escaping of quotes.

device-bu.csv

hostname,business_line
server-prod-001,Finance
db-test-abc,Engineering
db-test-abc,Marketing
web-frontend-xyz,Marketing
app-backend-123,Operations
"legacy-system true","Legacy IT"
" host with space ","Infrastructure"
"quoted ""host""", "R&D"
123, "R&D"
true, "R&D"
no-match-host,Should Not Appear

When executed with verbose flag the following out is produced.

Test output

[2025/07/19 14:38:48] [ info] Configuration:
[2025/07/19 14:38:48] [ info]  flush time     | 1.000000 seconds
[2025/07/19 14:38:48] [ info]  grace          | 5 seconds
[2025/07/19 14:38:48] [ info]  daemon         | 0
[2025/07/19 14:38:48] [ info] ___________
[2025/07/19 14:38:48] [ info]  inputs:
[2025/07/19 14:38:48] [ info]      tail
[2025/07/19 14:38:48] [ info] ___________
[2025/07/19 14:38:48] [ info]  filters:
[2025/07/19 14:38:48] [ info]      lookup.0
[2025/07/19 14:38:48] [ info] ___________
[2025/07/19 14:38:48] [ info]  outputs:
[2025/07/19 14:38:48] [ info]      stdout.0
[2025/07/19 14:38:48] [ info] ___________
[2025/07/19 14:38:48] [ info]  collectors:
[2025/07/19 14:38:48] [ info] [fluent bit] version=4.1.0, commit=, pid=50224
[2025/07/19 14:38:48] [debug] [engine] coroutine stack size: 196608 bytes (192.0K)
[2025/07/19 14:38:48] [ info] [storage] ver=1.5.3, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2025/07/19 14:38:48] [ info] [simd    ] disabled
[2025/07/19 14:38:48] [ info] [cmetrics] version=1.0.4
[2025/07/19 14:38:48] [ info] [ctraces ] version=0.6.6
[2025/07/19 14:38:48] [ info] [input:tail:tail.0] initializing
[2025/07/19 14:38:48] [ info] [input:tail:tail.0] storage_strategy='memory' (memory only)
[2025/07/19 14:38:48] [debug] [tail:tail.0] created event channels: read=25 write=26
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] flb_tail_fs_inotify_init() initializing inotify tail input
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] inotify watch fd=31
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] scanning path /src/*.log
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] file will be read in POSIX_FADV_DONTNEED mode /src/devices.log
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] inode=10 with offset=0 appended as /src/devices.log
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] scan_glob add(): /src/devices.log, inode 10
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] 1 new files found on path '/src/*.log'
[2025/07/19 14:38:48] [ info] [filter:lookup:lookup.0] Loaded 10 entries from CSV
[2025/07/19 14:38:48] [debug] [stdout:stdout.0] created event channels: read=33 write=34
[2025/07/19 14:38:48] [ info] [output:stdout:stdout.0] worker #0 started
[2025/07/19 14:38:48] [ info] [sp] stream processor started
[2025/07/19 14:38:48] [ info] [engine] Shutdown Grace Period=5, Shutdown Input Grace Period=2
[2025/07/19 14:38:48] [debug] [filter:lookup:lookup.0] Record 4: lookup value for key '$hostname' is non-string, converted to '123'
[2025/07/19 14:38:48] [debug] [filter:lookup:lookup.0] Record 5: lookup value for key '$hostname' is non-string, converted to 'true'
[2025/07/19 14:38:48] [debug] [filter:lookup:lookup.0] Record 10: lookup_key '$hostname' has type array/map, skipping to avoid ra error
[2025/07/19 14:38:48] [debug] [filter:lookup:lookup.0] Record 11: lookup_key '$hostname' has type array/map, skipping to avoid ra error
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] [static files] processed 278b
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] inode=10 file=/src/devices.log promote to TAIL_EVENT
[2025/07/19 14:38:48] [ info] [input:tail:tail.0] inotify_fs_add(): inode=10 watch_fd=1 name=/src/devices.log
[2025/07/19 14:38:48] [debug] [input:tail:tail.0] [static files] processed 0b, done
[2025/07/19 14:38:49] [debug] [task] created task=0xffff9c043a10 id=0 OK
[2025/07/19 14:38:49] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[0] tail.0: [[1752935928.516352587, {}], {"hostname"=>"server-prod-001", "business_line"=>"Finance"}]
[1] tail.0: [[1752935928.516443337, {}], {"hostname"=>"Server-Prod-001", "business_line"=>"Finance"}]
[2] tail.0: [[1752935928.516445712, {}], {"hostname"=>"db-test-abc", "business_line"=>"Marketing"}]
[3] tail.0: [[1752935928.516448504, {}], {"hostname"=>123, "business_line"=>"R&D"}]
[4] tail.0: [[1752935928.516450337, {}], {"hostname"=>true, "business_line"=>"R&D"}]
[5] tail.0: [[1752935928.516452004, {}], {"hostname"=>" host with space ", "business_line"=>"Infrastructure"}]
[6] tail.0: [[1752935928.516453670, {}], {"hostname"=>"quoted "host"", "business_line"=>"R&D"}]
[7] tail.0: [[1752935928.516455212, {}], {"hostname"=>"unknown-host"}]
[8] tail.0: [[1752935928.516456504, {}], {}]
[9] tail.0: [[1752935928.516458712, {}], {"hostname"=>[1, 2, 3]}]
[10] tail.0: [[1752935928.516460754, {}], {"hostname"=>{"sub"=>"val"}}]
[2025/07/19 14:38:49] [debug] [out flush] cb_destroy coro_id=0
[2025/07/19 14:38:49] [debug] [task] destroy task=0xffff9c043a10 (task_id=0)

Output shows correct matching and handling of different value types and correct output when no match is detected.

Valgrind summary (after run with multiple types of lookups):

==50220== HEAP SUMMARY:
==50220==     in use at exit: 0 bytes in 0 blocks
==50220==   total heap usage: 14,547 allocs, 14,550 frees, 74,987,419 bytes allocated
==50220== 
==50220== All heap blocks were freed -- no leaks are possible
==50220== 
==50220== Use --track-origins=yes to see where uninitialised values come from
==50220== For lists of detected and suppressed errors, rerun with: -s
==50220== ERROR SUMMARY: 6 errors from 4 contexts (suppressed: 0 from 0)

@olegmukhin
Copy link
Author

Documentation for this filter has been submitted as part of #fluent/fluent-bit-docs/pull/1953.

- Removed unecessary FLB_FILTER_LOOKUP build flag now LookUp
is enabled by default like other filters (without flag).
- Fixed critical use-after-free bug in numeric value lookups.
- Added processed_records_total, matched_records_total and
skipped_records_total metrics to enable operational visibility
- Added unit tests to cover handling of different data types,
CSV loading/handling and metrics tests.

Tested with valgrind - no memory leaks. All unit tests pass.

Signed-off-by: Oleg Mukhin <[email protected]>
@olegmukhin
Copy link
Author

Added unit tests for lookup filter. All tests pass:

Test basic_lookup...                            [ OK ]
Test ignore_case...                             [ OK ]
Test csv_quotes...                              [ OK ]
Test numeric_values...                          [ OK ]
Test large_numbers...                           [ OK ]
Test boolean_values...                          [ OK ]
Test no_match...                                [ OK ]
Test long_csv_lines...                          [ OK ]
Test whitespace_trim...                         [ OK ]
Test dynamic_buffer...                          [ OK ]
Test nested_keys...                             [ OK ]
Test large_csv...                               [ OK ]
Test nested_array_keys...                       [ OK ]
Test metrics_matched...                         [ OK ]
Test metrics_processed...                       [ OK ]

Valgrind results are showing appropriate memory management.

==19111== HEAP SUMMARY:
==19111==     in use at exit: 0 bytes in 0 blocks
==19111==   total heap usage: 6,964 allocs, 6,964 frees, 59,096,180 bytes allocated
==19111== 
==19111== All heap blocks were freed -- no leaks are possible
==19111== 
==19111== Use --track-origins=yes to see where uninitialised values come from

- fix variable declarations and remove C99 features
- Conditional compilation for Windows vs Unix headers/functions
- Replace bool with int, fix format specifiers, update comments

All 15 unit tests for filter passed.

Signed-off-by: Oleg Mukhin <[email protected]>
@olegmukhin
Copy link
Author

Added fix for failing checks on Cent OS 7 and Windows. Please rerun.

- fix variable declarations and remove C99 features for unit tests
- Conditional compilation for Windows for unit test features

All 15 unit tests for filter passed.

Signed-off-by: Oleg Mukhin <[email protected]>
@olegmukhin
Copy link
Author

olegmukhin commented Jul 21, 2025

Last check is failing due to Cent OS 7 incompatibility in unit test file - fix in last commit. Please rerun.

@olegmukhin
Copy link
Author

Could this please get one more run at the checks? I didn't realise we need this to compile on Cent OS 7 - should be good now with last commit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant