Skip to content

Commit 9581241

Browse files
author
Jan Geertsma
committed
skip_learning
1 parent 64b1fd1 commit 9581241

File tree

2 files changed

+79
-41
lines changed

2 files changed

+79
-41
lines changed

lib/logstash/inputs/azure_blob_storage.rb

Lines changed: 78 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ class LogStash::Inputs::AzureBlobStorage < LogStash::Inputs::Base
6161
# Z00000000000000000000000000000000 2 ]}
6262
config :interval, :validate => :number, :default => 60
6363

64+
# add the filename into the events
6465
config :addfilename, :validate => :boolean, :default => false, :required => false
66+
6567
# debug_until will for a maximum amount of processed messages shows 3 types of log printouts including processed filenames. This is a lightweight alternative to switching the loglevel from info to debug or even trace
6668
config :debug_until, :validate => :number, :default => 0, :required => false
6769

@@ -71,6 +73,9 @@ class LogStash::Inputs::AzureBlobStorage < LogStash::Inputs::Base
7173
# WAD IIS Grok Pattern
7274
#config :grokpattern, :validate => :string, :required => false, :default => '%{TIMESTAMP_ISO8601:log_timestamp} %{NOTSPACE:instanceId} %{NOTSPACE:instanceId2} %{IPORHOST:ServerIP} %{WORD:httpMethod} %{URIPATH:requestUri} %{NOTSPACE:requestQuery} %{NUMBER:port} %{NOTSPACE:username} %{IPORHOST:clientIP} %{NOTSPACE:httpVersion} %{NOTSPACE:userAgent} %{NOTSPACE:cookie} %{NOTSPACE:referer} %{NOTSPACE:host} %{NUMBER:httpStatus} %{NUMBER:subresponse} %{NUMBER:win32response} %{NUMBER:sentBytes:int} %{NUMBER:receivedBytes:int} %{NUMBER:timeTaken:int}'
7375

76+
# skip learning if you use json and don't want to learn the head and tail, but use either the defaults or configure them.
77+
config :skip_learning, :validate => :boolean, :default => false, :required => false
78+
7479
# The string that starts the JSON. Only needed when the codec is JSON. When partial file are read, the result will not be valid JSON unless the start and end are put back. the file_head and file_tail are learned at startup, by reading the first file in the blob_list and taking the first and last block, this would work for blobs that are appended like nsgflowlogs. The configuration can be set to override the learning. In case learning fails and the option is not set, the default is to use the 'records' as set by nsgflowlogs.
7580
config :file_head, :validate => :string, :required => false, :default => '{"records":['
7681
# The string that ends the JSON
@@ -113,34 +118,7 @@ def run(queue)
113118
@processed = 0
114119
@regsaved = @processed
115120

116-
# Try in this order to access the storageaccount
117-
# 1. storageaccount / sas_token
118-
# 2. connection_string
119-
# 3. storageaccount / access_key
120-
121-
unless connection_string.nil?
122-
conn = connection_string.value
123-
end
124-
unless sas_token.nil?
125-
unless sas_token.value.start_with?('?')
126-
conn = "BlobEndpoint=https://#{storageaccount}.#{dns_suffix};SharedAccessSignature=#{sas_token.value}"
127-
else
128-
conn = sas_token.value
129-
end
130-
end
131-
unless conn.nil?
132-
@blob_client = Azure::Storage::Blob::BlobService.create_from_connection_string(conn)
133-
else
134-
# unless use_development_storage?
135-
@blob_client = Azure::Storage::Blob::BlobService.create(
136-
storage_account_name: storageaccount,
137-
storage_dns_suffix: dns_suffix,
138-
storage_access_key: access_key.value,
139-
)
140-
# else
141-
# @logger.info("not yet implemented")
142-
# end
143-
end
121+
connect
144122

145123
@registry = Hash.new
146124
if registry_create_policy == "resume"
@@ -175,7 +153,7 @@ def run(queue)
175153
if registry_create_policy == "start_fresh"
176154
@registry = list_blobs(true)
177155
save_registry(@registry)
178-
@logger.info("starting fresh, writing a clean the registry to contain #{@registry.size} blobs/files")
156+
@logger.info("starting fresh, writing a clean registry to contain #{@registry.size} blobs/files")
179157
end
180158

181159
@is_json = false
@@ -188,12 +166,14 @@ def run(queue)
188166
@tail = ''
189167
# if codec=json sniff one files blocks A and Z to learn file_head and file_tail
190168
if @is_json
191-
learn_encapsulation
192169
if file_head
193-
@head = file_head
170+
@head = file_head
194171
end
195172
if file_tail
196-
@tail = file_tail
173+
@tail = file_tail
174+
end
175+
if file_head and file_tail and !skip_learning
176+
learn_encapsulation
197177
end
198178
@logger.info("head will be: #{@head} and tail is set to #{@tail}")
199179
end
@@ -234,6 +214,8 @@ def run(queue)
234214
# size nilClass when the list doesn't grow?!
235215
# Worklist is the subset of files where the already read offset is smaller than the file size
236216
worklist.clear
217+
chunk = nil
218+
237219
worklist = newreg.select {|name,file| file[:offset] < file[:length]}
238220
if (worklist.size > 4) then @logger.info("worklist contains #{worklist.size} blobs") end
239221

@@ -246,19 +228,26 @@ def run(queue)
246228
size = 0
247229
if file[:offset] == 0
248230
# This is where Sera4000 issue starts
249-
# For an append blob, reading full and crashing, retry, last_modified? ... lenght? ... committed? ...
250-
# length and skip reg value
251-
begin
252-
chunk = full_read(name)
253-
size=chunk.size
254-
rescue Exception => e
255-
@logger.error("Failed to read #{name} because of: #{e.message} .. will continue and pretend this never happened")
231+
# For an append blob, reading full and crashing, retry, last_modified? ... lenght? ... committed? ...
232+
# length and skip reg value
233+
if (file[:length] > 0)
234+
begin
235+
chunk = full_read(name)
236+
size=chunk.size
237+
rescue Exception => e
238+
@logger.error("Failed to read #{name} because of: #{e.message} .. will continue and pretend this never happened")
239+
end
240+
else
241+
@logger.info("found a zero size file #{name}")
242+
chunk = nil
256243
end
257244
else
258245
chunk = partial_read_json(name, file[:offset], file[:length])
259246
@logger.debug("partial file #{name} from #{file[:offset]} to #{file[:length]}")
260247
end
261248
if logtype == "nsgflowlog" && @is_json
249+
# skip empty chunks
250+
unless chunk.nil?
262251
res = resource(name)
263252
begin
264253
fingjson = JSON.parse(chunk)
@@ -267,6 +256,7 @@ def run(queue)
267256
rescue JSON::ParserError
268257
@logger.error("parse error on #{res[:nsg]} [#{res[:date]}] offset: #{file[:offset]} length: #{file[:length]}")
269258
end
259+
end
270260
# TODO: Convert this to line based grokking.
271261
# TODO: ECS Compliance?
272262
elsif logtype == "wadiis" && !@is_json
@@ -284,6 +274,7 @@ def run(queue)
284274
end
285275
rescue Exception => e
286276
@logger.error("codec exception: #{e.message} .. will continue and pretend this never happened")
277+
@registry.store(name, { :offset => file[:length], :length => file[:length] })
287278
@logger.debug("#{chunk}")
288279
end
289280
@processed += counter
@@ -323,8 +314,54 @@ def close
323314

324315

325316
private
317+
def connect
318+
# Try in this order to access the storageaccount
319+
# 1. storageaccount / sas_token
320+
# 2. connection_string
321+
# 3. storageaccount / access_key
322+
323+
unless connection_string.nil?
324+
conn = connection_string.value
325+
end
326+
unless sas_token.nil?
327+
unless sas_token.value.start_with?('?')
328+
conn = "BlobEndpoint=https://#{storageaccount}.#{dns_suffix};SharedAccessSignature=#{sas_token.value}"
329+
else
330+
conn = sas_token.value
331+
end
332+
end
333+
unless conn.nil?
334+
@blob_client = Azure::Storage::Blob::BlobService.create_from_connection_string(conn)
335+
else
336+
# unless use_development_storage?
337+
@blob_client = Azure::Storage::Blob::BlobService.create(
338+
storage_account_name: storageaccount,
339+
storage_dns_suffix: dns_suffix,
340+
storage_access_key: access_key.value,
341+
)
342+
# else
343+
# @logger.info("not yet implemented")
344+
# end
345+
end
346+
end
347+
326348
def full_read(filename)
327-
return @blob_client.get_blob(container, filename)[1]
349+
tries ||= 2
350+
begin
351+
return @blob_client.get_blob(container, filename)[1]
352+
rescue Exception => e
353+
@logger.error("caught: #{e.message} for full_read")
354+
if (tries -= 1) > 0
355+
if e.message = "Connection reset by peer"
356+
connect
357+
end
358+
retry
359+
end
360+
end
361+
begin
362+
chuck = @blob_client.get_blob(container, filename)[1]
363+
end
364+
return chuck
328365
end
329366

330367
def partial_read_json(filename, offset, length)
@@ -475,6 +512,7 @@ def save_registry(filelist)
475512

476513

477514
def learn_encapsulation
515+
@logger.info("learn_encapsulation, this can be skipped by setting skip_learning => true. Or set both head_file and tail_file")
478516
# From one file, read first block and last block to learn head and tail
479517
begin
480518
blobs = @blob_client.list_blobs(container, { max_results: 3, prefix: @prefix})

logstash-input-azure_blob_storage.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-azure_blob_storage'
3-
s.version = '0.11.6'
3+
s.version = '0.11.7'
44
s.licenses = ['Apache-2.0']
55
s.summary = 'This logstash plugin reads and parses data from Azure Storage Blobs.'
66
s.description = <<-EOF

0 commit comments

Comments
 (0)