diff --git a/CHANGELOG.md b/CHANGELOG.md index 2571381c..8ddf0374 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## Upcoming + - Added `filename` as an optional configuration setting to specify a custom format for file names. [#134](https://github.com/logstash-plugins/logstash-output-s3/issues/134) + ## 4.2.0 - Added ability to specify [ONEZONE_IA](https://aws.amazon.com/s3/storage-classes/#__) as storage_class @@ -21,7 +24,7 @@ - Fixed bucket validation failures when bucket policy requires encryption [#191](https://github.com/logstash-plugins/logstash-output-s3/pull/191) ## 4.1.4 - - [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/184) Internal: Revert rake pinning to fix upstream builds + - [#185](https://github.com/logstash-plugins/logstash-output-s3/pull/185) Internal: Revert rake pinning to fix upstream builds ## 4.1.3 - [#181](https://github.com/logstash-plugins/logstash-output-s3/pull/181) Docs: Fix incorrect characterization of parameters as `required` in example configuration. diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 8dd98c0e..6258dfc7 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -87,6 +87,7 @@ This plugin supports the following configuration options plus the <> |<>, one of `["none", "gzip"]`|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -216,6 +217,16 @@ This option supports logstash interpolation: https://www.elastic.co/guide/en/log for example, files can be prefixed with the event date using `prefix = "%{+YYYY}/%{+MM}/%{+dd}"`. Be warned this can create a lot of temporary local files. +[id="plugins-{type}s-{plugin}-filename"] +===== `filename` + + * Value type is <> + * Default value is `""` + +Specify a filename format to use for uploaded files. If not defined, a unique S3 output file will be generated as described above. +This option supports logstash interpolation: https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#sprintf; +If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place. + [id="plugins-{type}s-{plugin}-proxy_uri"] ===== `proxy_uri` diff --git a/lib/logstash/outputs/s3.rb b/lib/logstash/outputs/s3.rb index ed32cea2..489c14eb 100644 --- a/lib/logstash/outputs/s3.rb +++ b/lib/logstash/outputs/s3.rb @@ -151,9 +151,13 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base config :temporary_directory, :validate => :string, :default => File.join(Dir.tmpdir, "logstash") # Specify a prefix to the uploaded filename, this can simulate directories on S3. Prefix does not require leading slash. - # This option support string interpolation, be warned this can created a lot of temporary local files. + # This option supports logstash string interpolation with sprintf. Be warned this can create a lot of temporary local files. config :prefix, :validate => :string, :default => '' + # Specify a filename format to use for uploaded files. If not defined, a unique filename is generated. Invalid characters are replaced with underscores (_). + # This option supports logstash string interpolation with sprintf. If you do not configure a unique filename using interpolation, the plugin may overwrite the same file each time an S3 upload takes place. + config :filename, :validate => :string, :default => '' + # Specify how many workers to use to upload the files to S3 config :upload_workers_count, :validate => :number, :default => (Concurrent.processor_count * 0.5).ceil @@ -194,7 +198,12 @@ def register # be moved easily. unless @prefix.empty? if !PathValidator.valid?(prefix) - raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}" + raise LogStash::ConfigurationError, "Prefix must not contain: #{PathValidator::INVALID_CHARACTERS}" + end + end + unless @filename.empty? + if !PathValidator.valid?(filename) + raise LogStash::ConfigurationError, "Filename must not contain: #{PathValidator::INVALID_CHARACTERS}" end end @@ -236,9 +245,10 @@ def multi_receive_encoded(events_and_encoded) events_and_encoded.each do |event, encoded| prefix_key = normalize_key(event.sprintf(@prefix)) prefix_written_to << prefix_key + filename_key = normalize_key(event.sprintf(@filename)) begin - @file_repository.get_file(prefix_key) { |file| file.write(encoded) } + @file_repository.get_file(prefix_key, filename_key: filename_key) { |file| file.write(encoded) } # The output should stop accepting new events coming in, since it cannot do anything with them anymore. # Log the error and rethrow it. rescue Errno::ENOSPC => e diff --git a/lib/logstash/outputs/s3/file_repository.rb b/lib/logstash/outputs/s3/file_repository.rb index f6603c0f..9fb9beeb 100644 --- a/lib/logstash/outputs/s3/file_repository.rb +++ b/lib/logstash/outputs/s3/file_repository.rb @@ -31,7 +31,7 @@ def stale? with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) } end - def apply(prefix) + def apply(prefix, filename) return self end @@ -49,8 +49,8 @@ def initialize(tags, encoding, temporary_directory, stale_time) @stale_time = stale_time end - def apply(prefix_key) - PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time) + def apply(prefix_key, filename_key: "") + PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory, filename: filename_key), @stale_time) end end @@ -79,12 +79,19 @@ def each_files end # Return the file factory - def get_factory(prefix_key) - @prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory } + def get_factory(prefix_key, filename_key: "") + key = prefix_key + (filename_key == "" ? "" : ("/" + filename_key)) + if @prefixed_factories.key?(key) + factory = @prefixed_factories[key] + else + factory = @factory_initializer.apply(prefix_key, filename_key: filename_key) + @prefixed_factories[key] = factory + end + factory.with_lock { |factory| yield factory } end - def get_file(prefix_key) - get_factory(prefix_key) { |factory| yield factory.current } + def get_file(prefix_key, filename_key: "") + get_factory(prefix_key, filename_key: filename_key) { |factory| yield factory.current } end def shutdown diff --git a/lib/logstash/outputs/s3/temporary_file_factory.rb b/lib/logstash/outputs/s3/temporary_file_factory.rb index 21a98abe..afb9062b 100644 --- a/lib/logstash/outputs/s3/temporary_file_factory.rb +++ b/lib/logstash/outputs/s3/temporary_file_factory.rb @@ -24,11 +24,12 @@ class TemporaryFileFactory TXT_EXTENSION = "txt" STRFTIME = "%Y-%m-%dT%H.%M" - attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current + attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current, :filename - def initialize(prefix, tags, encoding, temporary_directory) + def initialize(prefix, tags, encoding, temporary_directory, filename: "") @counter = 0 @prefix = prefix + @filename = filename @tags = tags @encoding = encoding @@ -75,7 +76,7 @@ def generate_name def new_file uuid = SecureRandom.uuid - name = generate_name + name = filename == "" ? generate_name : filename path = ::File.join(temporary_directory, uuid) key = ::File.join(prefix, name) diff --git a/spec/outputs/s3/file_repository_spec.rb b/spec/outputs/s3/file_repository_spec.rb index 631a7286..e57d8ad0 100644 --- a/spec/outputs/s3/file_repository_spec.rb +++ b/spec/outputs/s3/file_repository_spec.rb @@ -9,6 +9,7 @@ let(:encoding) { "none" } let(:temporary_directory) { Stud::Temporary.pathname } let(:prefix_key) { "a-key" } + let(:filename_key) { "a-filename-key" } before do FileUtils.mkdir_p(temporary_directory) @@ -20,64 +21,97 @@ subject.get_file(prefix_key) do |file| expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) end + + subject.get_file(prefix_key, filename_key: filename_key) do |file| + expect(file).to be_kind_of(LogStash::Outputs::S3::TemporaryFile) + end end - it "returns the same file for the same prefix key" do + it "returns the same file for the same prefix key and filename key" do file_path = nil subject.get_file(prefix_key) do |file| file_path = file.path end - subject.get_file(prefix_key) do |file| expect(file.path).to eq(file_path) end + + subject.get_file(prefix_key, filename_key: filename_key) do |file| + file_path = file.path + end + subject.get_file(prefix_key, filename_key: filename_key) do |file| + expect(file.path).to eq(file_path) + end end - it "returns the same file for the same dynamic prefix key" do + it "returns the same file for the same dynamic prefix key and filename key" do prefix = "%{type}/%{+YYYY}/%{+MM}/%{+dd}/" + name = "${type}.txt" event = LogStash::Event.new({ "type" => "syslog"}) key = event.sprintf(prefix) + name = event.sprintf(name) file_path = nil - subject.get_file(key) do |file| file_path = file.path end - subject.get_file(key) do |file| expect(file.path).to eq(file_path) end + + subject.get_file(key, filename_key: name) do |file| + file_path = file.path + end + subject.get_file(key, filename_key: name) do |file| + expect(file.path).to eq(file_path) + end end - it "returns different file for different prefix keys" do + it "returns different file for different prefix keys and filename keys" do file_path = nil subject.get_file(prefix_key) do |file| file_path = file.path end - subject.get_file("another_prefix_key") do |file| expect(file.path).not_to eq(file_path) end + + subject.get_file(prefix_key, filename_key: filename_key) do |file| + file_path = file.path + end + subject.get_file(prefix_key, filename_key: "another_filename_key") do |file| + expect(file.path).not_to eq(file_path) + end end - it "allows to get the file factory for a specific prefix" do + it "allows to get the file factory for a specific prefix or filename" do subject.get_factory(prefix_key) do |factory| expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory) end + + subject.get_factory(prefix_key, filename_key: filename_key) do |factory| + expect(factory).to be_kind_of(LogStash::Outputs::S3::TemporaryFileFactory) + end end - it "returns a different file factory for a different prefix keys" do + it "returns a different file factory for different prefix keys and filename keys" do factory = nil subject.get_factory(prefix_key) do |f| factory = f end - subject.get_factory("another_prefix_key") do |f| expect(factory).not_to eq(f) end + + subject.get_factory(prefix_key, filename_key: filename_key) do |f| + factory = f + end + subject.get_factory(prefix_key, filename_key: "another_filename)key") do |f| + expect(factory).not_to eq(f) + end end it "returns the number of prefix keys" do @@ -86,13 +120,25 @@ expect(subject.size).to eq(1) end - it "returns all available keys" do + it "returns the number of prefix with filename keys" do + expect(subject.size).to eq(0) + subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") } + expect(subject.size).to eq(1) + end + + it "returns all available prefix keys" do subject.get_file(prefix_key) { |file| file.write("something") } expect(subject.keys.toArray).to include(prefix_key) expect(subject.keys.toArray.size).to eq(1) end - it "clean stale factories" do + it "returns all available prefix with filename keys" do + subject.get_file(prefix_key, filename_key: filename_key) { |file| file.write("something else") } + expect(subject.keys.toArray).to include(prefix_key + "/" + filename_key) + expect(subject.keys.toArray.size).to eq(1) + end + + it "cleans stale prefix factories" do @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) expect(@file_repository.size).to eq(0) path = "" @@ -108,6 +154,23 @@ try(10) { expect(@file_repository.size).to eq(1) } expect(File.directory?(path)).to be_falsey end + + it "cleans stale prefix with filename factories" do + @file_repository = described_class.new(tags, encoding, temporary_directory, 1, 1) + expect(@file_repository.size).to eq(0) + path = "" + @file_repository.get_factory(prefix_key, filename_key: filename_key) do |factory| + factory.current.write("hello again") + # force a rotation so we get an empty file that will get stale. + factory.rotate! + path = factory.current.temp_path + end + + @file_repository.get_file(prefix_key, filename_key: "another-filename") { |file| file.write("hello again") } + expect(@file_repository.size).to eq(2) + try(10) { expect(@file_repository.size).to eq(1) } + expect(File.directory?(path)).to be_falsey + end end diff --git a/spec/outputs/s3_spec.rb b/spec/outputs/s3_spec.rb index 8bfc4744..d09e6fee 100644 --- a/spec/outputs/s3_spec.rb +++ b/spec/outputs/s3_spec.rb @@ -6,11 +6,13 @@ describe LogStash::Outputs::S3 do let(:prefix) { "super/%{server}" } + let(:filename) { "%{server}.txt" } let(:region) { "us-east-1" } let(:bucket_name) { "mybucket" } let(:options) { { "region" => region, "bucket" => bucket_name, "prefix" => prefix, + "filename" => "", "restore" => false, "access_key_id" => "access_key_id", "secret_access_key" => "secret_access_key" @@ -158,6 +160,11 @@ s3 = described_class.new(options.merge({ "prefix" => "`no\><^" })) expect { s3.register }.to raise_error(LogStash::ConfigurationError) end + + it "validates the filename" do + s3 = described_class.new(options.merge({ "filename" => "`no\><^" })) + expect { s3.register }.to raise_error(LogStash::ConfigurationError) + end describe "additional_settings" do context "when enabling force_path_style" do @@ -199,8 +206,11 @@ subject.close end - it "uses `Event#sprintf` for the prefix" do + let(:options) { super.merge({ "filename" => filename }) } + + it "uses `Event#sprintf` for the prefix and filename" do expect(event).to receive(:sprintf).with(prefix).and_return("super/overwatch") + expect(event).to receive(:sprintf).with(filename).and_return("overwatch.txt") subject.multi_receive_encoded(events_and_encoded) end end