From d5e21e647a64bb43e3979182f641bc39ec50ca55 Mon Sep 17 00:00:00 2001 From: Darren Maki Date: Tue, 17 Dec 2019 16:11:49 -0500 Subject: [PATCH 1/3] Added `filename` as a configuration option. This new option can be used to specify how files are named when uploaded to S3. It supports logstash string interpolation (sprintf) so can be used to generate unique filenames based on the data from an event. Addresses #134 --- CHANGELOG.md | 5 +- lib/logstash/outputs/s3.rb | 14 ++- lib/logstash/outputs/s3/file_repository.rb | 21 +++-- .../outputs/s3/temporary_file_factory.rb | 7 +- spec/outputs/s3/file_repository_spec.rb | 87 ++++++++++++++++--- spec/outputs/s3_spec.rb | 12 ++- 6 files changed, 120 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2571381c..8ddf0374 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## Upcoming + - Added `filename` as an optional configuration setting to specify a custom format for file names. [#134](https://github.com/logstash-plugins/logstash-output-s3/issues/134) + ## 4.2.0 - Added ability to specify [ONEZONE_IA](https://aws.amazon.com/s3/storage-classes/#__) as storage_class @@ -21,7 +24,7 @@ - Fixed bucket validation failures when bucket policy requires encryption [#191](https://github.com/logstash-plugins/logstash-output-s3/pull/191) ## 4.1.4 - - [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/184) Internal: Revert rake pinning to fix upstream builds + - [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/185) Internal: Revert rake pinning to fix upstream builds ## 4.1.3 - [#181](https://github.com/logstash-plugins/logstash-output-s3/pull/181) Docs: Fix incorrect characterization of parameters as `required` in example configuration. diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index ed32cea2..26b4ea41 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -151,9 +151,13 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash") # Specify a prefix to the uploaded filename, this can simulate directories on S3. Prefix does not require leading slash. - # This option support string interpolation, be warned this can created a lot of temporary local files. + # This option supports logstash string interpolation with sprintf. Be warned this can create a lot of temporary local files. config :prefix, :validate => :string, :default => '' + # Specify a filename format to use for uploaded files. If not defined, a unique filename is generated. Invalid characters are replaced with underscores (_). + # This option supports logstash string interpolation with sprintf. If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place. + config :filename, :validate => :string, :default => '' + # Specify how many workers to use to upload the files to S3 config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil @@ -197,6 +201,11 @@ def register raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" end end + unless filename.empty? + if !PathValidator.valid?(filename) + raise LogStash::ConfigurationError, "Filename must not contains: #{PathValidator::INVALID_CHARACTERS}" + end + end if !WritableDirectoryValidator.valid?(@temporary_directory) raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}" @@ -236,9 +245,10 @@ def multi_receive_encoded(events_and_encoded) events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key + filename_key = normalize_key(event.sprintf(@filename)) begin - @file_repository.get_file(prefix_key) { |file| file.write(encoded) } + @file_repository.get_file(prefix_key, filename_key: filename_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index f6603c0f..9fb9beeb 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -31,7 +31,7 @@ def stale? with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) } end - def apply(prefix) + def apply(prefix, filename) return self end @@ -49,8 +49,8 @@ def initialize(tags, encoding, temporary_directory, stale_time) @stale_time = stale_time end - def apply(prefix_key) - PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) + def apply(prefix_key, filename_key: "") + PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory, filename: filename_key), @stale_time) end end @@ -79,12 +79,19 @@ def each_files end # Return the file factory - def get_factory(prefix_key) - @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } + def get_factory(prefix_key, filename_key: "") + key = prefix_key + (filename_key == "" ? "" : ("/" + filename_key)) + if @prefixed_factories.key?(key) + factory = @prefixed_factories[key] + else + factory = @factory_initializer.apply(prefix_key, filename_key: filename_key) + @prefixed_factories[key] = factory + end + factory.with_lock { |factory| yield factory } end - def get_file(prefix_key) - get_factory(prefix_key) { |factory| yield factory.current } + def get_file(prefix_key, filename_key: "") + get_factory(prefix_key, filename_key: filename_key) { |factory| yield factory.current } end def shutdown diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb index 21a98abe..afb9062b 100644 --- a/lib/logstash/outputs/s3/temporary_file_factory.rb +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -24,11 +24,12 @@ class TemporaryFileFactory TXT_EXTENSION = "txt" STRFTIME = "%Y-%m-%dT%H.%M" - attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current + attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current, :filename - def initialize(prefix, tags, encoding, temporary_directory) + def initialize(prefix, tags, encoding, temporary_directory, filename: "") @counter = 0 @prefix = prefix + @filename = filename @tags = tags @encoding = encoding @@ -75,7 +76,7 @@ def generate_name def new_file uuid = SecureRandom.uuid - name = generate_name + name = filename == "" ? generate_name : filename path = ::File.join(temporary_directory, uuid) key = ::File.join(prefix, name) diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb index 631a7286..e57d8ad0 100644 --- a/spec/outputs/s3/file_repository_spec.rb +++ b/spec/outputs/s3/file_repository_spec.rb @@ -9,6 +9,7 @@ let(:encoding) { "none" } let(:temporary_directory) { Stud::Temporary.pathname } let(:prefix_key) { "a-key" } + let(:filename_key) { "a-filename-key" } before do FileUtils.mkdir_p(temporary_directory) @@ -20,64 +21,97 @@ subject.get_file(prefix_key) do |file| expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) end + + subject.get_file(prefix_key, filename_key: filename_key) do |file| + expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) + end end - it "returns the same file for the same prefix key" do + it "returns the same file for the same prefix key and filename key" do file_path = nil subject.get_file(prefix_key) do |file| file_path = file.path end - subject.get_file(prefix_key) do |file| expect(file.path).to eq(file_path) end + + subject.get_file(prefix_key, filename_key: filename_key) do |file| + file_path = file.path + end + subject.get_file(prefix_key, filename_key: filename_key) do |file| + expect(file.path).to eq(file_path) + end end - it "returns the same file for the same dynamic prefix key" do + it "returns the same file for the same dynamic prefix key and filename key" do prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/" + name = "${type}.txt" event = LogStash::Event.new({ "type" => "syslog"}) key = event.sprintf(prefix) + name = event.sprintf(name) file_path = nil - subject.get_file(key) do |file| file_path = file.path end - subject.get_file(key) do |file| expect(file.path).to eq(file_path) end + + subject.get_file(key, filename_key: name) do |file| + file_path = file.path + end + subject.get_file(key, filename_key: name) do |file| + expect(file.path).to eq(file_path) + end end - it "returns different file for different prefix keys" do + it "returns different file for different prefix keys and filename keys" do file_path = nil subject.get_file(prefix_key) do |file| file_path = file.path end - subject.get_file("another_prefix_key") do |file| expect(file.path).not_to eq(file_path) end + + subject.get_file(prefix_key, filename_key: filename_key) do |file| + file_path = file.path + end + subject.get_file(prefix_key, filename_key: "another_filename_key") do |file| + expect(file.path).not_to eq(file_path) + end end - it "allows to get the file factory for a specific prefix" do + it "allows to get the file factory for a specific prefix or filename" do subject.get_factory(prefix_key) do |factory| expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory) end + + subject.get_factory(prefix_key, filename_key: filename_key) do |factory| + expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory) + end end - it "returns a different file factory for a different prefix keys" do + it "returns a different file factory for different prefix keys and filename keys" do factory = nil subject.get_factory(prefix_key) do |f| factory = f end - subject.get_factory("another_prefix_key") do |f| expect(factory).not_to eq(f) end + + subject.get_factory(prefix_key, filename_key: filename_key) do |f| + factory = f + end + subject.get_factory(prefix_key, filename_key: "another_filename)key") do |f| + expect(factory).not_to eq(f) + end end it "returns the number of prefix keys" do @@ -86,13 +120,25 @@ expect(subject.size).to eq(1) end - it "returns all available keys" do + it "returns the number of prefix with filename keys" do + expect(subject.size).to eq(0) + subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") } + expect(subject.size).to eq(1) + end + + it "returns all available prefix keys" do subject.get_file(prefix_key) { |file| file.write("something") } expect(subject.keys.toArray).to include(prefix_key) expect(subject.keys.toArray.size).to eq(1) end - it "clean stale factories" do + it "returns all available prefix with filename keys" do + subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") } + expect(subject.keys.toArray).to include(prefix_key + "/" + filename_key) + expect(subject.keys.toArray.size).to eq(1) + end + + it "cleans stale prefix factories" do @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) expect(@file_repository.size).to eq(0) path = "" @@ -108,6 +154,23 @@ try(10) { expect(@file_repository.size).to eq(1) } expect(File.directory?(path)).to be_falsey end + + it "cleans stale prefix with filename factories" do + @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) + expect(@file_repository.size).to eq(0) + path = "" + @file_repository.get_factory(prefix_key, filename_key: filename_key) do |factory| + factory.current.write("hello again") + # force a rotation so we get an empty file that will get stale. + factory.rotate! + path = factory.current.temp_path + end + + @file_repository.get_file(prefix_key, filename_key: "another-filename") { |file| file.write("hello again") } + expect(@file_repository.size).to eq(2) + try(10) { expect(@file_repository.size).to eq(1) } + expect(File.directory?(path)).to be_falsey + end end diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 8bfc4744..d09e6fee 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -6,11 +6,13 @@ describe LogStash::Outputs::S3 do let(:prefix) { "super/%{server}" } + let(:filename) { "%{server}.txt" } let(:region) { "us-east-1" } let(:bucket_name) { "mybucket" } let(:options) { { "region" => region, "bucket" => bucket_name, "prefix" => prefix, + "filename" => "", "restore" => false, "access_key_id" => "access_key_id", "secret_access_key" => "secret_access_key" @@ -158,6 +160,11 @@ s3 = described_class.new(options.merge({ "prefix" => "`no\><^" })) expect { s3.register }.to raise_error(LogStash::ConfigurationError) end + + it "validates the filename" do + s3 = described_class.new(options.merge({ "filename" => "`no\><^" })) + expect { s3.register }.to raise_error(LogStash::ConfigurationError) + end describe "additional_settings" do context "when enabling force_path_style" do @@ -199,8 +206,11 @@ subject.close end - it "uses `Event#sprintf` for the prefix" do + let(:options) { super.merge({ "filename" => filename }) } + + it "uses `Event#sprintf` for the prefix and filename" do expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch") + expect(event).to receive(:sprintf).with(filename).and_return("overwatch.txt") subject.multi_receive_encoded(events_and_encoded) end end From 0d4e3215fcf41e650b445fe1dcaf1d79faba5ed9 Mon Sep 17 00:00:00 2001 From: Darren Maki Date: Wed, 18 Dec 2019 15:38:50 -0500 Subject: [PATCH 2/3] Added missing doc section for `filename`. --- docs/index.asciidoc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 8dd98c0e..6258dfc7 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -87,6 +87,7 @@ This plugin supports the following configuration options plus the <> |<>, one of `["none", "gzip"]`|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -216,6 +217,16 @@ This option supports logstash interpolation: https://www.elastic.co/guide/en/log for example, files can be prefixed with the event date using `prefix = "%{+YYYY}/%{+MM}/%{+dd}"`. Be warned this can create a lot of temporary local files. +[id="plugins-{type}s-{plugin}-filename"] +===== `filename` + + * Value type is <> + * Default value is `""` + +Specify a filename format to use for uploaded files. If not defined, a unique S3 output file will be generated as described above. +This option supports logstash interpolation: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#sprintf; +If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place. + [id="plugins-{type}s-{plugin}-proxy_uri"] ===== `proxy_uri` From 9cf39f1515b5fec083626a88af467174c6e91fda Mon Sep 17 00:00:00 2001 From: Darren Maki Date: Wed, 18 Dec 2019 16:16:46 -0500 Subject: [PATCH 3/3] Fixed incorrect variable reference. --- lib/logstash/outputs/s3.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index 26b4ea41..489c14eb 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -198,12 +198,12 @@ def register # be moved easily. unless @prefix.empty? if !PathValidator.valid?(prefix) - raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" + raise LogStash::ConfigurationError, "Prefix must not contain: #{PathValidator::INVALID_CHARACTERS}" end end - unless filename.empty? + unless @filename.empty? if !PathValidator.valid?(filename) - raise LogStash::ConfigurationError, "Filename must not contains: #{PathValidator::INVALID_CHARACTERS}" + raise LogStash::ConfigurationError, "Filename must not contain: #{PathValidator::INVALID_CHARACTERS}" end end