Skip to content

Added filename as a configuration option. #215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Upcoming
- Added `filename` as an optional configuration setting to specify a custom format for file names. [#134](https://github.com/logstash-plugins/logstash-output-s3/issues/134)

## 4.2.0
- Added ability to specify [ONEZONE_IA](https://aws.amazon.com/s3/storage-classes/#__) as storage_class

Expand All @@ -21,7 +24,7 @@
- Fixed bucket validation failures when bucket policy requires encryption [#191](https://github.com/logstash-plugins/logstash-output-s3/pull/191)

## 4.1.4
- [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/184) Internal: Revert rake pinning to fix upstream builds
- [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/185) Internal: Revert rake pinning to fix upstream builds

## 4.1.3
- [#181](https://github.com/logstash-plugins/logstash-output-s3/pull/181) Docs: Fix incorrect characterization of parameters as `required` in example configuration.
Expand Down
11 changes: 11 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-encoding>> |<<string,string>>, one of `["none", "gzip"]`|No
| <<plugins-{type}s-{plugin}-endpoint>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-prefix>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-filename>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-proxy_uri>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-region>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-restore>> |<<boolean,boolean>>|No
Expand Down Expand Up @@ -216,6 +217,16 @@ This option supports logstash interpolation: https://www.elastic.co/guide/en/log
for example, files can be prefixed with the event date using `prefix = "%{+YYYY}/%{+MM}/%{+dd}"`.
Be warned this can create a lot of temporary local files.

[id="plugins-{type}s-{plugin}-filename"]
===== `filename`

* Value type is <<string,string>>
* Default value is `""`

Specify a filename format to use for uploaded files. If not defined, a unique S3 output file will be generated as described above.
This option supports logstash interpolation: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#sprintf;
If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place.

[id="plugins-{type}s-{plugin}-proxy_uri"]
===== `proxy_uri`

Expand Down
14 changes: 12 additions & 2 deletions lib/logstash/outputs/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash")

# Specify a prefix to the uploaded filename, this can simulate directories on S3. Prefix does not require leading slash.
# This option support string interpolation, be warned this can created a lot of temporary local files.
# This option supports logstash string interpolation with sprintf. Be warned this can create a lot of temporary local files.
config :prefix, :validate => :string, :default => ''

# Specify a filename format to use for uploaded files. If not defined, a unique filename is generated. Invalid characters are replaced with underscores (_).
# This option supports logstash string interpolation with sprintf. If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place.
config :filename, :validate => :string, :default => ''

# Specify how many workers to use to upload the files to S3
config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil

Expand Down Expand Up @@ -197,6 +201,11 @@ def register
raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}"
end
end
unless filename.empty?
if !PathValidator.valid?(filename)
raise LogStash::ConfigurationError, "Filename must not contains: #{PathValidator::INVALID_CHARACTERS}"
end
end

if !WritableDirectoryValidator.valid?(@temporary_directory)
raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}"
Expand Down Expand Up @@ -236,9 +245,10 @@ def multi_receive_encoded(events_and_encoded)
events_and_encoded.each do |event, encoded|
prefix_key = normalize_key(event.sprintf(@prefix))
prefix_written_to << prefix_key
filename_key = normalize_key(event.sprintf(@filename))

begin
@file_repository.get_file(prefix_key) { |file| file.write(encoded) }
@file_repository.get_file(prefix_key, filename_key: filename_key) { |file| file.write(encoded) }
# The output should stop accepting new events coming in, since it cannot do anything with them anymore.
# Log the error and rethrow it.
rescue Errno::ENOSPC => e
Expand Down
21 changes: 14 additions & 7 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
end

def apply(prefix)
def apply(prefix, filename)
return self
end

Expand All @@ -49,8 +49,8 @@ def initialize(tags, encoding, temporary_directory, stale_time)
@stale_time = stale_time
end

def apply(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
def apply(prefix_key, filename_key: "")
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory, filename: filename_key), @stale_time)
end
end

Expand Down Expand Up @@ -79,12 +79,19 @@ def each_files
end

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
def get_factory(prefix_key, filename_key: "")
key = prefix_key + (filename_key == "" ? "" : ("/" + filename_key))
if @prefixed_factories.key?(key)
factory = @prefixed_factories[key]
else
factory = @factory_initializer.apply(prefix_key, filename_key: filename_key)
@prefixed_factories[key] = factory
end
factory.with_lock { |factory| yield factory }
end

def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
def get_file(prefix_key, filename_key: "")
get_factory(prefix_key, filename_key: filename_key) { |factory| yield factory.current }
end

def shutdown
Expand Down
7 changes: 4 additions & 3 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ class TemporaryFileFactory
TXT_EXTENSION = "txt"
STRFTIME = "%Y-%m-%dT%H.%M"

attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current
attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current, :filename

def initialize(prefix, tags, encoding, temporary_directory)
def initialize(prefix, tags, encoding, temporary_directory, filename: "")
@counter = 0
@prefix = prefix
@filename = filename

@tags = tags
@encoding = encoding
Expand Down Expand Up @@ -75,7 +76,7 @@ def generate_name

def new_file
uuid = SecureRandom.uuid
name = generate_name
name = filename == "" ? generate_name : filename
path = ::File.join(temporary_directory, uuid)
key = ::File.join(prefix, name)

Expand Down
87 changes: 75 additions & 12 deletions spec/outputs/s3/file_repository_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
let(:encoding) { "none" }
let(:temporary_directory) { Stud::Temporary.pathname }
let(:prefix_key) { "a-key" }
let(:filename_key) { "a-filename-key" }

before do
FileUtils.mkdir_p(temporary_directory)
Expand All @@ -20,64 +21,97 @@
subject.get_file(prefix_key) do |file|
expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile)
end

subject.get_file(prefix_key, filename_key: filename_key) do |file|
expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile)
end
end

it "returns the same file for the same prefix key" do
it "returns the same file for the same prefix key and filename key" do
file_path = nil

subject.get_file(prefix_key) do |file|
file_path = file.path
end

subject.get_file(prefix_key) do |file|
expect(file.path).to eq(file_path)
end

subject.get_file(prefix_key, filename_key: filename_key) do |file|
file_path = file.path
end
subject.get_file(prefix_key, filename_key: filename_key) do |file|
expect(file.path).to eq(file_path)
end
end

it "returns the same file for the same dynamic prefix key" do
it "returns the same file for the same dynamic prefix key and filename key" do
prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/"
name = "${type}.txt"
event = LogStash::Event.new({ "type" => "syslog"})
key = event.sprintf(prefix)
name = event.sprintf(name)
file_path = nil


subject.get_file(key) do |file|
file_path = file.path
end

subject.get_file(key) do |file|
expect(file.path).to eq(file_path)
end

subject.get_file(key, filename_key: name) do |file|
file_path = file.path
end
subject.get_file(key, filename_key: name) do |file|
expect(file.path).to eq(file_path)
end
end

it "returns different file for different prefix keys" do
it "returns different file for different prefix keys and filename keys" do
file_path = nil

subject.get_file(prefix_key) do |file|
file_path = file.path
end

subject.get_file("another_prefix_key") do |file|
expect(file.path).not_to eq(file_path)
end

subject.get_file(prefix_key, filename_key: filename_key) do |file|
file_path = file.path
end
subject.get_file(prefix_key, filename_key: "another_filename_key") do |file|
expect(file.path).not_to eq(file_path)
end
end

it "allows to get the file factory for a specific prefix" do
it "allows to get the file factory for a specific prefix or filename" do
subject.get_factory(prefix_key) do |factory|
expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory)
end

subject.get_factory(prefix_key, filename_key: filename_key) do |factory|
expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory)
end
end

it "returns a different file factory for a different prefix keys" do
it "returns a different file factory for different prefix keys and filename keys" do
factory = nil

subject.get_factory(prefix_key) do |f|
factory = f
end

subject.get_factory("another_prefix_key") do |f|
expect(factory).not_to eq(f)
end

subject.get_factory(prefix_key, filename_key: filename_key) do |f|
factory = f
end
subject.get_factory(prefix_key, filename_key: "another_filename)key") do |f|
expect(factory).not_to eq(f)
end
end

it "returns the number of prefix keys" do
Expand All @@ -86,13 +120,25 @@
expect(subject.size).to eq(1)
end

it "returns all available keys" do
it "returns the number of prefix with filename keys" do
expect(subject.size).to eq(0)
subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") }
expect(subject.size).to eq(1)
end

it "returns all available prefix keys" do
subject.get_file(prefix_key) { |file| file.write("something") }
expect(subject.keys.toArray).to include(prefix_key)
expect(subject.keys.toArray.size).to eq(1)
end

it "clean stale factories" do
it "returns all available prefix with filename keys" do
subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") }
expect(subject.keys.toArray).to include(prefix_key + "/" + filename_key)
expect(subject.keys.toArray.size).to eq(1)
end

it "cleans stale prefix factories" do
@file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
expect(@file_repository.size).to eq(0)
path = ""
Expand All @@ -108,6 +154,23 @@
try(10) { expect(@file_repository.size).to eq(1) }
expect(File.directory?(path)).to be_falsey
end

it "cleans stale prefix with filename factories" do
@file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1)
expect(@file_repository.size).to eq(0)
path = ""
@file_repository.get_factory(prefix_key, filename_key: filename_key) do |factory|
factory.current.write("hello again")
# force a rotation so we get an empty file that will get stale.
factory.rotate!
path = factory.current.temp_path
end

@file_repository.get_file(prefix_key, filename_key: "another-filename") { |file| file.write("hello again") }
expect(@file_repository.size).to eq(2)
try(10) { expect(@file_repository.size).to eq(1) }
expect(File.directory?(path)).to be_falsey
end
end


Expand Down
12 changes: 11 additions & 1 deletion spec/outputs/s3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

describe LogStash::Outputs::S3 do
let(:prefix) { "super/%{server}" }
let(:filename) { "%{server}.txt" }
let(:region) { "us-east-1" }
let(:bucket_name) { "mybucket" }
let(:options) { { "region" => region,
"bucket" => bucket_name,
"prefix" => prefix,
"filename" => "",
"restore" => false,
"access_key_id" => "access_key_id",
"secret_access_key" => "secret_access_key"
Expand Down Expand Up @@ -158,6 +160,11 @@
s3 = described_class.new(options.merge({ "prefix" => "`no\><^" }))
expect { s3.register }.to raise_error(LogStash::ConfigurationError)
end

it "validates the filename" do
s3 = described_class.new(options.merge({ "filename" => "`no\><^" }))
expect { s3.register }.to raise_error(LogStash::ConfigurationError)
end

describe "additional_settings" do
context "when enabling force_path_style" do
Expand Down Expand Up @@ -199,8 +206,11 @@
subject.close
end

it "uses `Event#sprintf` for the prefix" do
let(:options) { super.merge({ "filename" => filename }) }

it "uses `Event#sprintf` for the prefix and filename" do
expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch")
expect(event).to receive(:sprintf).with(filename).and_return("overwatch.txt")
subject.multi_receive_encoded(events_and_encoded)
end
end
Expand Down