Skip to content

Commit 0003e8a

Browse files
author
Jan Geertsma
committed
updates for 0.11.4
1 parent 059ab5f commit 0003e8a

File tree

3 files changed

+109
-50
lines changed

3 files changed

+109
-50
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
## 0.11.4
22
- fixed listing 3 times, rather than retrying to list max 3 times
3-
- added log entries for better tracing in which phase the application is now
3+
- added option to migrate/save to using local registry
4+
- rewrote interval timing
5+
- reduced saving of registry to maximum once per interval, protect duplicate simultanious writes
6+
- added debug_timer for better tracing how long operations take
7+
- removing pipeline name from logfiles, logstash 7.6 and up have this in the log4j2 by default now
8+
- moved initialization from register to run. should make logs more readable
49

510
## 0.11.3
611
- don't crash on failed codec, e.g. gzip_lines could sometimes have a corrupted file?

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,11 @@ The registry_create_policy is used when the pipeline is started to either resume
4040

4141
interval defines the minimum time the registry should be saved to the registry file (by default 'data/registry.dat'), this is only needed in case the pipeline dies unexpectedly. During a normal shutdown the registry is also saved.
4242

43-
During the pipeline start the plugin uses one file to learn how the JSON header and tail look like, they can also be configured manually.
43+
When registry_local_path is set to a directory, the registry is save on the logstash server in that directory. The filename is the pipe.id
44+
45+
with registry_create_policy set to resume and the registry_local_path set to a directory where the registry isn't yet created, should load from the storage account and save the registry on the local server
46+
47+
During the pipeline start for JSON codec, the plugin uses one file to learn how the JSON header and tail look like, they can also be configured manually.
4448

4549
## Running the pipeline
4650
The pipeline can be started in several ways.
@@ -91,6 +95,7 @@ The log level of the plugin can be put into DEBUG through
9195
curl -XPUT 'localhost:9600/_node/logging?pretty' -H 'Content-Type: application/json' -d'{"logger.logstash.inputs.azureblobstorage" : "DEBUG"}'
9296
```
9397

98+
because debug also makes logstash chatty, there are also debug_timer and debug_until that can be used to print additional informantion on what the pipeline is doing and how long it takes. debug_until is for the number of events until debug is disabled.
9499

95100
## Other Configuration Examples
96101
For nsgflowlogs, a simple configuration looks like this
@@ -176,7 +181,7 @@ filter {
176181
remove_field => ["subresponse"]
177182
remove_field => ["username"]
178183
remove_field => ["clientPort"]
179-
remove_field => ["port"]
184+
remove_field => ["port"]:0
180185
remove_field => ["timestamp"]
181186
}
182187
}

lib/logstash/inputs/azure_blob_storage.rb

Lines changed: 96 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ class LogStash::Inputs::AzureBlobStorage < LogStash::Inputs::Base
3939
# The default, `data/registry`, it contains a Ruby Marshal Serialized Hash of the filename the offset read sofar and the filelength the list time a filelisting was done.
4040
config :registry_path, :validate => :string, :required => false, :default => 'data/registry.dat'
4141

42+
# If registry_local_path is set to a directory on the local server, the registry is save there instead of the remote blob_storage
43+
config :registry_local_path, :validate => :string, :required => false
44+
4245
# The default, `resume`, will load the registry offsets and will start processing files from the offsets.
4346
# When set to `start_over`, all log files are processed from begining.
4447
# when set to `start_fresh`, it will read log files that are created or appended since this start of the pipeline.
@@ -58,6 +61,9 @@ class LogStash::Inputs::AzureBlobStorage < LogStash::Inputs::Base
5861
# debug_until will for a maximum amount of processed messages shows 3 types of log printouts including processed filenames. This is a lightweight alternative to switching the loglevel from info to debug or even trace
5962
config :debug_until, :validate => :number, :default => 0, :required => false
6063

64+
# debug_timer show time spent on activities
65+
config :debug_timer, :validate => :boolean, :default => false, :required => false
66+
6167
# WAD IIS Grok Pattern
6268
#config :grokpattern, :validate => :string, :required => false, :default => '%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE:instanceId} %{NOTSPACE:instanceId2} %{IPORHOST:ServerIP} %{WORD:httpMethod} %{URIPATH:requestUri} %{NOTSPACE:requestQuery} %{NUMBER:port} %{NOTSPACE:username} %{IPORHOST:clientIP} %{NOTSPACE:httpVersion} %{NOTSPACE:userAgent} %{NOTSPACE:cookie} %{NOTSPACE:referer} %{NOTSPACE:host} %{NUMBER:httpStatus} %{NUMBER:subresponse} %{NUMBER:win32response} %{NUMBER:sentBytes:int} %{NUMBER:receivedBytes:int} %{NUMBER:timeTaken:int}'
6369

@@ -90,12 +96,15 @@ class LogStash::Inputs::AzureBlobStorage < LogStash::Inputs::Base
9096
public
9197
def register
9298
@pipe_id = Thread.current[:name].split("[").last.split("]").first
93-
@logger.info("=== "+config_name+" / "+@pipe_id+" / "+@id[0,6]+" ===")
94-
#@logger.info("ruby #{ RUBY_VERSION }p#{ RUBY_PATCHLEVEL } / #{Gem.loaded_specs[config_name].version.to_s}")
99+
@logger.info("=== #{config_name} #{Gem.loaded_specs["logstash-input-"+config_name].version.to_s} / #{@pipe_id} / #{@id[0,6]} / ruby #{ RUBY_VERSION }p#{ RUBY_PATCHLEVEL } ===")
95100
@logger.info("If this plugin doesn't work, please raise an issue in https://github.com/janmg/logstash-input-azure_blob_storage")
96101
# TODO: consider multiple readers, so add pipeline @id or use logstash-to-logstash communication?
97102
# TODO: Implement retry ... Error: Connection refused - Failed to open TCP connection to
103+
end
98104

105+
106+
107+
def run(queue)
99108
# counter for all processed events since the start of this pipeline
100109
@processed = 0
101110
@regsaved = @processed
@@ -127,24 +136,38 @@ def register
127136

128137
@registry = Hash.new
129138
if registry_create_policy == "resume"
130-
@logger.info(@pipe_id+" resuming from registry")
131139
for counter in 1..3
132140
begin
133-
@registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1])
134-
#[0] headers [1] responsebody
141+
if (!@registry_local_path.nil?)
142+
unless File.file?(@registry_local_path+"/"+@pipe_id)
143+
@registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1])
144+
#[0] headers [1] responsebody
145+
@logger.info("migrating from remote registry #{registry_path}")
146+
else
147+
if !Dir.exist?(@registry_local_path)
148+
FileUtils.mkdir_p(@registry_local_path)
149+
end
150+
@registry = Marshal.load(File.read(@registry_local_path+"/"+@pipe_id))
151+
@logger.info("resuming from local registry #{registry_local_path+"/"+@pipe_id}")
152+
end
153+
else
154+
@registry = Marshal.load(@blob_client.get_blob(container, registry_path)[1])
155+
#[0] headers [1] responsebody
156+
@logger.info("resuming from remote registry #{registry_path}")
157+
end
158+
break
135159
rescue Exception => e
136-
@logger.error(@pipe_id+" caught: #{e.message}")
160+
@logger.error("caught: #{e.message}")
137161
@registry.clear
138-
@logger.error(@pipe_id+" loading registry failed for attempt #{counter} of 3")
162+
@logger.error("loading registry failed for attempt #{counter} of 3")
139163
end
140164
end
141165
end
142166
# read filelist and set offsets to file length to mark all the old files as done
143167
if registry_create_policy == "start_fresh"
144-
@logger.info(@pipe_id+" starting fresh")
145168
@registry = list_blobs(true)
146169
save_registry(@registry)
147-
@logger.info("writing the registry, it contains #{@registry.size} blobs/files")
170+
@logger.info("starting fresh, writing a clean the registry to contain #{@registry.size} blobs/files")
148171
end
149172

150173
@is_json = false
@@ -164,27 +187,32 @@ def register
164187
if file_tail
165188
@tail = file_tail
166189
end
167-
@logger.info(@pipe_id+" head will be: #{@head} and tail is set to #{@tail}")
190+
@logger.info("head will be: #{@head} and tail is set to #{@tail}")
168191
end
169-
end # def register
170-
171-
172192

173-
def run(queue)
174193
newreg = Hash.new
175194
filelist = Hash.new
176195
worklist = Hash.new
177-
# we can abort the loop if stop? becomes true
196+
@last = start = Time.now.to_i
197+
198+
# This is the main loop, it
199+
# 1. Lists all the files in the remote storage account that match the path prefix
200+
# 2. Filters on path_filters to only include files that match the directory and file glob (**/*.json)
201+
# 3. Save the listed files in a registry of known files and filesizes.
202+
# 4. List all the files again and compare the registry with the new filelist and put the delta in a worklist
203+
# 5. Process the worklist and put all events in the logstash queue.
204+
# 6. if there is time left, sleep to complete the interval. If processing takes more than an inteval, save the registry and continue.
205+
# 7. If stop signal comes, finish the current file, save the registry and quit
178206
while !stop?
179-
chrono = Time.now.to_i
180207
# load the registry, compare it's offsets to file list, set offset to 0 for new files, process the whole list and if finished within the interval wait for next loop,
181208
# TODO: sort by timestamp ?
182209
#filelist.sort_by(|k,v|resource(k)[:date])
183210
worklist.clear
184211
filelist.clear
185212
newreg.clear
213+
214+
# Listing all the files
186215
filelist = list_blobs(false)
187-
# registry.merge(filelist) {|key, :offset, :length| :offset.merge :length }
188216
filelist.each do |name, file|
189217
off = 0
190218
begin
@@ -199,26 +227,28 @@ def run(queue)
199227
worklist.clear
200228
worklist = newreg.select {|name,file| file[:offset] < file[:length]}
201229
if (worklist.size > 4) then @logger.info("worklist contains #{worklist.size} blobs") end
202-
# This would be ideal for threading since it's IO intensive, would be nice with a ruby native ThreadPool
230+
231+
# Start of processing
232+
# This would be ideal for threading since it's IO intensive, would be nice with a ruby native ThreadPool
203233
worklist.each do |name, file|
204-
#res = resource(name)
234+
start = Time.now.to_i
205235
if (@debug_until > @processed) then @logger.info("3: processing #{name} from #{file[:offset]} to #{file[:length]}") end
206236
size = 0
207237
if file[:offset] == 0
208238
chunk = full_read(name)
209239
size=chunk.size
210240
else
211241
chunk = partial_read_json(name, file[:offset], file[:length])
212-
@logger.info(@pipe_id+" partial file #{name} from #{file[:offset]} to #{file[:length]}")
242+
@logger.debug("partial file #{name} from #{file[:offset]} to #{file[:length]}")
213243
end
214244
if logtype == "nsgflowlog" && @is_json
215245
res = resource(name)
216246
begin
217247
fingjson = JSON.parse(chunk)
218248
@processed += nsgflowlog(queue, fingjson)
219-
@logger.debug(@pipe_id+" Processed #{res[:nsg]} [#{res[:date]}] #{@processed} events")
249+
@logger.debug("Processed #{res[:nsg]} [#{res[:date]}] #{@processed} events")
220250
rescue JSON::ParserError
221-
@logger.error(@pipe_id+" parse error on #{res[:nsg]} [#{res[:date]}] offset: #{file[:offset]} length: #{file[:length]}")
251+
@logger.error("parse error on #{res[:nsg]} [#{res[:date]}] offset: #{file[:offset]} length: #{file[:length]}")
222252
end
223253
# TODO: Convert this to line based grokking.
224254
# TODO: ECS Compliance?
@@ -233,30 +263,32 @@ def run(queue)
233263
queue << event
234264
end
235265
rescue Exception => e
236-
@logger.error(@pipe_id+" codec exception: #{e.message} .. will continue and pretend this never happened")
237-
@logger.debug(@pipe_id+" #{chunk}")
266+
@logger.error("codec exception: #{e.message} .. will continue and pretend this never happened")
267+
@logger.debug("#{chunk}")
238268
end
239269
@processed += counter
240270
end
241271
@registry.store(name, { :offset => size, :length => file[:length] })
242272
# TODO add input plugin option to prevent connection cache
243273
@blob_client.client.reset_agents!
244-
#@logger.info(@pipe_id+" name #{name} size #{size} len #{file[:length]}")
274+
#@logger.info("name #{name} size #{size} len #{file[:length]}")
245275
# if stop? good moment to stop what we're doing
246276
if stop?
247277
return
248278
end
249-
# save the registry past the regular intervals
250-
now = Time.now.to_i
251-
if ((now - chrono) > interval)
279+
if ((Time.now.to_i - @last) > @interval)
252280
save_registry(@registry)
253-
chrono += interval
254281
end
255282
end
256-
# Save the registry and sleep until the remaining polling interval is over
257-
if (@debug_until > @processed) then @logger.info("going to sleep for #{interval - (Time.now.to_i - chrono)} seconds") end
258-
save_registry(@registry)
259-
sleeptime = interval - (Time.now.to_i - chrono)
283+
# The files that got processed after the last registry save need to be saved too, in case the worklist is empty for some intervals.
284+
now = Time.now.to_i
285+
if ((now - @last) > @interval)
286+
save_registry(@registry)
287+
end
288+
sleeptime = interval - ((now - start) % interval)
289+
if @debug_timer
290+
@logger.info("going to sleep for #{sleeptime} seconds")
291+
end
260292
Stud.stoppable_sleep(sleeptime) { stop? }
261293
end
262294
end
@@ -345,7 +377,7 @@ def list_blobs(fill)
345377
begin
346378
return try_list_blobs(fill)
347379
rescue Exception => e
348-
@logger.error(@pipe_id+" caught: #{e.message} for list_blobs retries left #{tries}")
380+
@logger.error("caught: #{e.message} for list_blobs retries left #{tries}")
349381
if (tries -= 1) > 0
350382
retry
351383
end
@@ -354,10 +386,11 @@ def list_blobs(fill)
354386

355387
def try_list_blobs(fill)
356388
# inspired by: http://blog.mirthlab.com/2012/05/25/cleanly-retrying-blocks-of-code-after-an-exception-in-ruby/
357-
chrono = Time.now.to_i
358-
files = Hash.new
359-
nextMarker = nil
360-
loop do
389+
chrono = Time.now.to_i
390+
files = Hash.new
391+
nextMarker = nil
392+
counter = 1
393+
loop do
361394
blobs = @blob_client.list_blobs(container, { marker: nextMarker, prefix: @prefix})
362395
blobs.each do |blob|
363396
# FNM_PATHNAME is required so that "**/test" can match "test" at the root folder
@@ -376,24 +409,40 @@ def try_list_blobs(fill)
376409
end
377410
nextMarker = blobs.continuation_token
378411
break unless nextMarker && !nextMarker.empty?
412+
if (counter % 10 == 0) then @logger.info(" listing #{counter * 50000} files") end
413+
counter+=1
414+
end
415+
if @debug_timer
416+
@logger.info("list_blobs took #{Time.now.to_i - chrono} sec")
379417
end
380-
if (@debug_until > @processed) then @logger.info(@pipe_id+" list_blobs took #{Time.now.to_i - chrono} sec") end
381418
return files
382419
end
383420

384421
# When events were processed after the last registry save, start a thread to update the registry file.
385422
def save_registry(filelist)
386-
# TODO because of threading, processed values and regsaved are not thread safe, they can change as instance variable @!
423+
# Because of threading, processed values and regsaved are not thread safe, they can change as instance variable @! Most of the time this is fine because the registry is the last resort, but be careful about corner cases!
387424
unless @processed == @regsaved
388425
@regsaved = @processed
389-
@logger.info(@pipe_id+" processed #{@processed} events, saving #{filelist.size} blobs and offsets to registry #{registry_path}")
390-
Thread.new {
426+
unless (@busy_writing_registry)
427+
Thread.new {
391428
begin
392-
@blob_client.create_block_blob(container, registry_path, Marshal.dump(filelist))
429+
@busy_writing_registry = true
430+
unless (@registry_local_path)
431+
@blob_client.create_block_blob(container, registry_path, Marshal.dump(filelist))
432+
@logger.info("processed #{@processed} events, saving #{filelist.size} blobs and offsets to remote registry #{registry_path}")
433+
else
434+
File.open(@registry_local_path+"/"+@pipe_id, 'w') { |file| file.write(Marshal.dump(filelist)) }
435+
@logger.info("processed #{@processed} events, saving #{filelist.size} blobs and offsets to local registry #{registry_local_path+"/"+@pipe_id}")
436+
end
437+
@busy_writing_registry = false
438+
@last = Time.now.to_i
393439
rescue
394-
@logger.error(@pipe_id+" Oh my, registry write failed, do you have write access?")
440+
@logger.error("Oh my, registry write failed, do you have write access?")
395441
end
396442
}
443+
else
444+
@logger.info("Skipped writing the registry because previous write still in progress, it just takes long or may be hanging!")
445+
end
397446
end
398447
end
399448

@@ -405,13 +454,13 @@ def learn_encapsulation
405454
return if blob.nil?
406455
blocks = @blob_client.list_blob_blocks(container, blob.name)[:committed]
407456
# TODO add check for empty blocks and log error that the header and footer can't be learned and must be set in the config
408-
@logger.debug(@pipe_id+" using #{blob.name} to learn the json header and tail")
457+
@logger.debug("using #{blob.name} to learn the json header and tail")
409458
@head = @blob_client.get_blob(container, blob.name, start_range: 0, end_range: blocks.first.size-1)[1]
410-
@logger.debug(@pipe_id+" learned header: #{@head}")
459+
@logger.debug("learned header: #{@head}")
411460
length = blob.properties[:content_length].to_i
412461
offset = length - blocks.last.size
413462
@tail = @blob_client.get_blob(container, blob.name, start_range: offset, end_range: length-1)[1]
414-
@logger.debug(@pipe_id+" learned tail: #{@tail}")
463+
@logger.debug("learned tail: #{@tail}")
415464
end
416465

417466
def resource(str)

0 commit comments

Comments
 (0)