diff --git a/.ci/docker-compose.override.yml b/.ci/docker-compose.override.yml index 260b0e503..7608b7865 100644 --- a/.ci/docker-compose.override.yml +++ b/.ci/docker-compose.override.yml @@ -1,5 +1,3 @@ -version: '3' - services: logstash: diff --git a/.ci/logstash-run.sh b/.ci/logstash-run.sh index 0801282b0..4c231345c 100755 --- a/.ci/logstash-run.sh +++ b/.ci/logstash-run.sh @@ -13,7 +13,7 @@ else fi # CentOS 7 using curl defaults does not enable TLSv1.3 -CURL_OPTS="-k --tlsv1.2 --tls-max 1.3" +CURL_OPTS="-k -u admin:elastic --tlsv1.2 --tls-max 1.3" wait_for_es() { count=120 @@ -22,7 +22,7 @@ wait_for_es() { [[ $count -eq 0 ]] && exit 1 sleep 1 done - echo $(curl $CURL_OPTS -vi $ES_URL | jq -r .version.number) + echo $(curl $CURL_OPTS $ES_URL | jq -r .version.number) } if [[ "$INTEGRATION" != "true" ]]; then diff --git a/.travis.yml b/.travis.yml index 112673acb..5e29addb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,13 +4,23 @@ import: jobs: include: - stage: "Integration Tests" - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current + env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current + - env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.previous + - env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current + - env: INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current + - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.previous - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.next - - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.future + - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.next + - env: INTEGRATION=true SNAPSHOT=true LOG_LEVEL=info ELASTIC_STACK_VERSION=main - stage: "Secure Integration Tests" - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current SNAPSHOT=true - - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current - - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current ES_SSL_KEY_INVALID=true - - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3 + env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=7.current + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current ES_SSL_KEY_INVALID=true + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=8.current ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3 + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current ES_SSL_KEY_INVALID=true + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info ELASTIC_STACK_VERSION=9.current ES_SSL_SUPPORTED_PROTOCOLS=TLSv1.3 + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info SNAPSHOT=true ELASTIC_STACK_VERSION=8.next + - env: SECURE_INTEGRATION=true INTEGRATION=true LOG_LEVEL=info SNAPSHOT=true ELASTIC_STACK_VERSION=9.next diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a46b00a8..2d0591b3e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ +## 11.22.13 + - Add headers reporting uncompressed size and doc count for bulk requests [#1217](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1217) + ## 11.22.12 - Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199) + ## 11.22.11 - Remove irrelevant log warning about elastic stack version [#1202](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1202) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 120d3e673..8d6e02cbf 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -21,7 +21,8 @@ module LogStash; module Outputs; class ElasticSearch; # We wound up agreeing that a number greater than 10 MiB and less than 100MiB # made sense. We picked one on the lowish side to not use too much heap. TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB - + EVENT_COUNT_HEADER = "X-Elastic-Event-Count".freeze + UNCOMPRESSED_LENGTH_HEADER = "X-Elastic-Uncompressed-Request-Length".freeze class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count @@ -143,7 +144,11 @@ def bulk(actions) :payload_size => stream_writer.pos, :content_length => body_stream.size, :batch_offset => (index + 1 - batch_actions.size)) - bulk_responses << bulk_send(body_stream, batch_actions) + headers = { + EVENT_COUNT_HEADER => batch_actions.size.to_s, + UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s + } + bulk_responses << bulk_send(body_stream, batch_actions, headers) body_stream.truncate(0) && body_stream.seek(0) stream_writer = gzip_writer(body_stream) if compression_level? batch_actions.clear @@ -159,7 +164,14 @@ def bulk(actions) :payload_size => stream_writer.pos, :content_length => body_stream.size, :batch_offset => (actions.size - batch_actions.size)) - bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0 + + if body_stream.size > 0 + headers = { + EVENT_COUNT_HEADER => batch_actions.size.to_s, + UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s + } + bulk_responses << bulk_send(body_stream, batch_actions, headers) + end body_stream.close unless compression_level? join_bulk_responses(bulk_responses) @@ -179,8 +191,8 @@ def join_bulk_responses(bulk_responses) } end - def bulk_send(body_stream, batch_actions) - params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} + def bulk_send(body_stream, batch_actions, headers = {}) + params = compression_level? ? {:headers => headers.merge("Content-Encoding" => "gzip") } : { :headers => headers } begin response = @pool.post(@bulk_path, params, body_stream.string) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 1909085ca..10f737a4d 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.22.12' + s.version = '11.22.13' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/es_spec_helper.rb b/spec/es_spec_helper.rb index 6981a9c12..0f86f5498 100644 --- a/spec/es_spec_helper.rb +++ b/spec/es_spec_helper.rb @@ -1,15 +1,18 @@ require_relative './spec_helper' require 'elasticsearch' -require_relative "support/elasticsearch/api/actions/delete_ilm_policy" -require_relative "support/elasticsearch/api/actions/get_alias" -require_relative "support/elasticsearch/api/actions/put_alias" -require_relative "support/elasticsearch/api/actions/get_ilm_policy" -require_relative "support/elasticsearch/api/actions/put_ilm_policy" require 'json' require 'cabin' +# remove this condition and support package once plugin starts consuming elasticsearch-ruby v8 client +# in elasticsearch-ruby v7, ILM APIs were in a separate xpack gem, now directly available +unless elastic_ruby_v8_client_available? + require_relative "support/elasticsearch/api/actions/delete_ilm_policy" + require_relative "support/elasticsearch/api/actions/get_ilm_policy" + require_relative "support/elasticsearch/api/actions/put_ilm_policy" +end + module ESHelper def get_host_port if ENV["INTEGRATION"] == "true" @@ -20,8 +23,12 @@ def get_host_port end def get_client - Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client| - allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking + if elastic_ruby_v8_client_available? + Elasticsearch::Client.new(:hosts => [get_host_port]) + else + Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client| + allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking + end end end @@ -128,31 +135,36 @@ def get_cluster_settings(client) end def get_policy(client, policy_name) - client.get_ilm_policy(name: policy_name) + if elastic_ruby_v8_client_available? + client.index_lifecycle_management.get_lifecycle(policy: policy_name) + else + client.get_ilm_policy(name: policy_name) + end end def put_policy(client, policy_name, policy) - client.put_ilm_policy({:name => policy_name, :body=> policy}) - end - - def put_alias(client, the_alias, index) - body = { - "aliases" => { - index => { - "is_write_index"=> true - } - } - } - client.put_alias({name: the_alias, body: body}) + if elastic_ruby_v8_client_available? + client.index_lifecycle_management.put_lifecycle({:policy => policy_name, :body=> policy}) + else + client.put_ilm_policy({:name => policy_name, :body=> policy}) + end end def clean_ilm(client) - client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ } + if elastic_ruby_v8_client_available? + client.index_lifecycle_management.get_lifecycle.each_key { |key| client.index_lifecycle_management.delete_lifecycle(policy: key) if key =~ /logstash-policy/ } + else + client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ } + end end def supports_ilm?(client) begin - client.get_ilm_policy + if elastic_ruby_v8_client_available? + client.index_lifecycle_management.get_lifecycle + else + client.get_ilm_policy + end true rescue false diff --git a/spec/fixtures/test_certs/GENERATED_AT b/spec/fixtures/test_certs/GENERATED_AT index 79bce0d85..83f8b6c2d 100644 --- a/spec/fixtures/test_certs/GENERATED_AT +++ b/spec/fixtures/test_certs/GENERATED_AT @@ -1 +1 @@ -2024-06-25T21:50:58+01:00 +2025-07-22T11:15:03+01:00 diff --git a/spec/fixtures/test_certs/ca.crt b/spec/fixtures/test_certs/ca.crt index a732ad145..686c13d8b 100644 --- a/spec/fixtures/test_certs/ca.crt +++ b/spec/fixtures/test_certs/ca.crt @@ -1,29 +1,32 @@ -----BEGIN CERTIFICATE----- -MIIFDDCCAvQCAQEwDQYJKoZIhvcNAQELBQAwTDELMAkGA1UEBhMCUFQxCzAJBgNV -BAgMAk5BMQ8wDQYDVQQHDAZMaXNib24xDjAMBgNVBAoMBU15TGFiMQ8wDQYDVQQD -DAZSb290Q0EwHhcNMjQwNjI1MjA1MDU4WhcNMjUwNjI1MjA1MDU4WjBMMQswCQYD -VQQGEwJQVDELMAkGA1UECAwCTkExDzANBgNVBAcMBkxpc2JvbjEOMAwGA1UECgwF -TXlMYWIxDzANBgNVBAMMBlJvb3RDQTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCC -AgoCggIBAMtTMqAWuH17b9XqPa5L3HNqgnZ958+gvcOt7Q/sOEvcDQJgkzZ+Gywh -5er5JF2iomYOHiD5JncYr4YmRQKuYfD6B1WI5FuQthD/OlA1/RHqtbY27J33SaO6 -6ro8gal7vjHrXKQkefVYRwdfO6DqqbhV6L4sMiy8FzQ55TMpoM35cWuvoAMxvSQq -GZ4pYYKnfNSGhzHvssfNS1xu/Lwb7Vju4jPhp+43BkGwEimI5km7jNC1nwjiHtxD -sY/s93AKa/vLktXKUK5nA3jjJOhAbRTVnbOAgxFt0YbX98xW/aUqscgBUVs9J/My -TRMwVKJ7Vsmth1PdJQksUASuzESlSPl09dMjTQ+MXzJDt0JvX8SIJPmbBng78MSa -CUhpOZiii1l2mBfPWejx20I/SMCUNmzbwm2w9JD50Jv2iX4l4ge4H1CIK1/orW1p -dY9xPL0uKYm6ADsDC0B8sGgNMBXeB6aLojY1/ITwmmfpfk9c/yWPfC7stHgCYRAv -5MfGAsmv0/ya5VrWQGBJkFiYy1pon6nxUjCbgn0RABojRoGdhhY3QDipgwmSgFZx -r064RFr1bt/Ml3MJmPf535mSwPdk/j/zw4IZTvlmwKW3FyMDhwYL/zX7J0c6MzMP -LEdi73Qjzmr3ENIrir4O86wNz81YRfYkg9ZX8yKJK9LBAUrYCjJ3AgMBAAEwDQYJ -KoZIhvcNAQELBQADggIBABym9LMyS9W9lvpcH4OK1YLfBPJwrhZ+4keiriY4zWOo -pB+v2Q35neMMXSlTDpeIwPdMkqsh8VZprOWURF80JGvpJ6fBfi05rCDWp/ol1ZKi -snCA+dE2zDK7Z3+F0MbakT5oBi5WgkXSvRvlJEJ/gBD7WC1wq0kxCMK+M5w2RPAT -nnV/iozNBkwExxyJA7BpS6F/v0XjwK7fm5Kpql7zKlh5piZ2IVU0B60Sqskcb2mU -90+1r9T06ekIW/Iz1jd5RWYziu0nbmDeKeKvGAICNU+evYXW+/5kKecMLuEvDCgS -ssbt/Hb510uLHhxfhN4SbvBl2zADsLC+2arf2ATIwD8ZXDDs04ayBsejV0ZwVrTZ -ExKqAys+B3tuIHGRqL5VukdmH6g6oJziYueohPBCOuSOzDd0FhppF4uXZS8DReSg -KieO2ZYfiA1gVRiY6jPx+r7J9I5kSS1gwr/e3zHJHa79ijMB1SSIswQUmgSMkwGh -sNyDNI9ZxgJan3v7kVargMt2LiNcXvVyTzPSYSXcY7SoebfpMprVIG7vZ9TZf+Uu -FQeOfxdLFuGTnpFrYmvOD3OIKfODlY5t+TNICg7A3eTUXeJPcdBBnuVCiQU6TCB5 -H+69K5w54Q6a70sHZU1IWsGT8XtbUizPNQky+LAFsE/5oUnCqtypeEu4srcZK53x +MIIFdTCCA12gAwIBAgIUDITbsLT9hKser0ZzBZsxqgaZdWswDQYJKoZIhvcNAQEL +BQAwSjELMAkGA1UEBhMCUFQxCzAJBgNVBAgMAk5BMQ8wDQYDVQQHDAZMaXNib24x +DjAMBgNVBAoMBU15TGFiMQ0wCwYDVQQDDARyb290MB4XDTI1MDcyMjEwMTUwM1oX +DTM1MDcyMDEwMTUwM1owSjELMAkGA1UEBhMCUFQxCzAJBgNVBAgMAk5BMQ8wDQYD +VQQHDAZMaXNib24xDjAMBgNVBAoMBU15TGFiMQ0wCwYDVQQDDARyb290MIICIjAN +BgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAy1MyoBa4fXtv1eo9rkvcc2qCdn3n +z6C9w63tD+w4S9wNAmCTNn4bLCHl6vkkXaKiZg4eIPkmdxivhiZFAq5h8PoHVYjk +W5C2EP86UDX9Eeq1tjbsnfdJo7rqujyBqXu+MetcpCR59VhHB187oOqpuFXoviwy +LLwXNDnlMymgzflxa6+gAzG9JCoZnilhgqd81IaHMe+yx81LXG78vBvtWO7iM+Gn +7jcGQbASKYjmSbuM0LWfCOIe3EOxj+z3cApr+8uS1cpQrmcDeOMk6EBtFNWds4CD +EW3Rhtf3zFb9pSqxyAFRWz0n8zJNEzBUontWya2HU90lCSxQBK7MRKVI+XT10yNN +D4xfMkO3Qm9fxIgk+ZsGeDvwxJoJSGk5mKKLWXaYF89Z6PHbQj9IwJQ2bNvCbbD0 +kPnQm/aJfiXiB7gfUIgrX+itbWl1j3E8vS4piboAOwMLQHywaA0wFd4HpouiNjX8 +hPCaZ+l+T1z/JY98Luy0eAJhEC/kx8YCya/T/JrlWtZAYEmQWJjLWmifqfFSMJuC +fREAGiNGgZ2GFjdAOKmDCZKAVnGvTrhEWvVu38yXcwmY9/nfmZLA92T+P/PDghlO ++WbApbcXIwOHBgv/NfsnRzozMw8sR2LvdCPOavcQ0iuKvg7zrA3PzVhF9iSD1lfz +Iokr0sEBStgKMncCAwEAAaNTMFEwHQYDVR0OBBYEFKFadJx46upif1BrhYZ0iu8o +2z8rMB8GA1UdIwQYMBaAFKFadJx46upif1BrhYZ0iu8o2z8rMA8GA1UdEwEB/wQF +MAMBAf8wDQYJKoZIhvcNAQELBQADggIBAJi4FwYJz/RotoUpfrLZFf69RoI01Fje +8ITt8SR1Dx/1GTPEuqVVfx0EYtOoH6Gg3FwgSQ9GHRDIa1vkHY5S+FUSOW3pCoZE +/kaLu9bmFxn+GntghvQEor+LzODuZKLXupaGcu1tA4fzyuI4jglVD2sGZtLk//CT +Hd4tOWXo5k1Fj0jMnJq+2Htr8yBeSAO5ZNsvtAjOUU6pfDEwL9bgRzlKKFQQMUYo +6x1FvRDRXWjpzB/H+OSqOaoNLEB9FfEl8I7nn6uTenr5WxjPAOpwjZl9ObB/95xM +p91abKbLQLev5I8npM9G3C/n01l3IzRs7DNHqGJTZO7frGhicD7/jNa+tkSioeJ2 +fIMqgDOvQE+gMxs19zw1tsI3+kqX7+ptTkU4Lan5V5ZKGfU8xtcVIlyRk5/yDUI5 +1dfQVubs6z07s6De2qa92LFz9l8sT6QuVer+c/wPPhBdMwbzcHyUJIBjFaBpxH86 +F7Mr5Zr/+qcbHglAHow1lBqdZzimqGd1koqFRat/pFUFh0iqktMmpl+ZUCjyoQEX +93j8aMU2UQjYM8NJDE2aRculo9OEoqERYFM2m3nHvrtE7iZgddryLNH7ZmC1EquX +MhZJ26GuZ2U4b9dAX858WTv0q1EF5S8KObMlxMU7IDk+cWlSD+puWliwfUKoTR/4 +JErSfjCSaRqh -----END CERTIFICATE----- diff --git a/spec/fixtures/test_certs/ca.der.sha256 b/spec/fixtures/test_certs/ca.der.sha256 index 4ce28e8a9..7b37c8468 100644 --- a/spec/fixtures/test_certs/ca.der.sha256 +++ b/spec/fixtures/test_certs/ca.der.sha256 @@ -1 +1 @@ -8b23238088af65cbae6ee9c23821068d896ec1dad081e2a1035ff70866943247 +d403930d5296f1515aadd3f730757e7719188b63a276687a3475128b746e4340 diff --git a/spec/fixtures/test_certs/renew.sh b/spec/fixtures/test_certs/renew.sh index 8fa81421a..81de8a5d5 100755 --- a/spec/fixtures/test_certs/renew.sh +++ b/spec/fixtures/test_certs/renew.sh @@ -3,8 +3,7 @@ set -e cd "$(dirname "$0")" -openssl x509 -x509toreq -in ca.crt -signkey ca.key -out ca.csr -openssl x509 -req -days 365 -in ca.csr -set_serial 0x01 -signkey ca.key -out ca.crt && rm ca.csr +openssl req -x509 -new -nodes -key ca.key -subj "/C=PT/ST=NA/L=Lisbon/O=MyLab/CN=root" -sha256 -days 3650 -out ca.crt openssl x509 -in ca.crt -outform der | sha256sum | awk '{print $1}' > ca.der.sha256 openssl x509 -x509toreq -in test.crt -signkey test.key -out test.csr @@ -13,4 +12,4 @@ openssl x509 -in test.crt -outform der | sha256sum | awk '{print $1}' > test.der openssl pkcs12 -export -inkey test.key -in test.crt -passout "pass:1234567890" -out test.p12 # output ISO8601 timestamp to file -date -Iseconds > GENERATED_AT \ No newline at end of file +date -Iseconds > GENERATED_AT diff --git a/spec/fixtures/test_certs/test.crt b/spec/fixtures/test_certs/test.crt index 6f8abc444..57ffefca0 100644 --- a/spec/fixtures/test_certs/test.crt +++ b/spec/fixtures/test_certs/test.crt @@ -1,30 +1,31 @@ -----BEGIN CERTIFICATE----- -MIIFEzCCAvsCAQEwDQYJKoZIhvcNAQELBQAwTDELMAkGA1UEBhMCUFQxCzAJBgNV -BAgMAk5BMQ8wDQYDVQQHDAZMaXNib24xDjAMBgNVBAoMBU15TGFiMQ8wDQYDVQQD -DAZSb290Q0EwHhcNMjQwNjI1MjA1MDU4WhcNMjUwNjI1MjA1MDU4WjBTMQswCQYD -VQQGEwJQVDELMAkGA1UECAwCTkExDzANBgNVBAcMBkxpc2JvbjEOMAwGA1UECgwF -TXlMYWIxFjAUBgNVBAMMDWVsYXN0aWNzZWFyY2gwggIiMA0GCSqGSIb3DQEBAQUA -A4ICDwAwggIKAoICAQDGIT9szzhN5HvZ2nivnCDzVfdYbbqBhgEbPppWPyFcV0r2 -rtmWfeK5EEdsTS/Ey4owTceOplPpAp4svF+a/i1/bHhqnQYYU4f7Qic4fDAszLdi -SIo0o1csNvIogm/P+uvSzE6eZRZUSmo49dY5SKSJt6Pjh6lM2MHEjsPKIKdAN57w -EN90q4IZv6AHE9rphqxcmF1k+j5xmhCUS1EJ+y7hyZ0S7Hghdgp/0cxSu/7YlVYy -JpkIlQd3RPXzEf6VSYjtr9Ajp1rhvv2611q0CB5NALg/KR3OiMPYmTg5HAKOdweN -am76nG3VxTeV3y+LW/pZAbi4qAl+4/c0eOGsL7o/YSn7qhThU1AWS9kY1WxTCrKR -h58rUGRfmvpnOR99xvR4jz942RNiY61pTmsvo+iJspTII3GZhwIGlHtxE9Rn50lW -QcDuDDHfObWhzb4rS55BERIwDUqD1LgCRd0ikRxPSvI1AM4cl35b4DTaDLcnM6EO -fy+QTYsgNoftU1PI1onDQ7ZdfgrTrIBFQQRwOqfyB4bB2zWVj62LSDvZoYYicNUe -cqyE1542WNKzmyE8Mrf3uknN2J6EH7EhmiyRBtGg3NEQCwIYM4/kWPNPOtkSjsn3 -cNbMNUZiSnQn/nTs4T8g6b2rrwsay/FGUE83AbPqqcTlp2RUVnjbC8KA5+iV1wID -AQABMA0GCSqGSIb3DQEBCwUAA4ICAQAlB7YFw7e1pzYz55Kqa91juTWP5XiCn59n -J0YKM++vdHqy224HZb9jGtJjZ+0Wod4cwiOVWm+5hLs4IrzfGuXFZEFx/VWP3SDq -4F3IJJXQkc7jSNrL6IR92xRDSB+yFZZI6FFsnaKMT2fZELndPVFnH+oclc8ZZoyz -2H/r1CT4yYx7YclAWUqq8Ci3J82qUeeM8Xj9fzGFKy6oCoRsApQb4qb4DoQ1TbZC -b8gWxHj8l4izul1MtTzSkoMb0Ot50vMoT69m1hDz5H4wF6KuAZUAgM9LQWNHJCkt -hlOXvqFTHF+y+bvK+hGs976xViq3HA45M3+5Psv0+fdoHgYQJvd23yt8CM0rGfv3 -P+34HlLCW+FdWiazmo+tl5YmtGs6pYuAEp2z5pmUO2l2CutFmv4xBOvXF+rZOzxY -Q0ackJtflnDC/Tlq2qAldY3Oa8nyI3UIaMUcqHemwm5KpDjc0XF2J1qCoSrMxD8+ -L8HdvUYlh3DIFgJIG1DlTtfQO+RwrVi9+NBBGAsforla9HJDO/POiv7O9hED71u+ -pev8flmULeisMeYqeiL55jyS/+45VaF7t36FMyiP3zXANwbHZMvzVobEsXAuzPOt -pVNo/EpszrdBe9JWt1GrFLY9c14FmWG8cAWpcwRH0ofhJPPvEB7usFVWCSduOAbA -Zytzb+8iSw== +MIIFWjCCA0KgAwIBAgIBATANBgkqhkiG9w0BAQsFADBKMQswCQYDVQQGEwJQVDEL +MAkGA1UECAwCTkExDzANBgNVBAcMBkxpc2JvbjEOMAwGA1UECgwFTXlMYWIxDTAL +BgNVBAMMBHJvb3QwHhcNMjUwNzIyMTAxNTAzWhcNMjYwNzIyMTAxNTAzWjBTMQsw +CQYDVQQGEwJQVDELMAkGA1UECAwCTkExDzANBgNVBAcMBkxpc2JvbjEOMAwGA1UE +CgwFTXlMYWIxFjAUBgNVBAMMDWVsYXN0aWNzZWFyY2gwggIiMA0GCSqGSIb3DQEB +AQUAA4ICDwAwggIKAoICAQDGIT9szzhN5HvZ2nivnCDzVfdYbbqBhgEbPppWPyFc +V0r2rtmWfeK5EEdsTS/Ey4owTceOplPpAp4svF+a/i1/bHhqnQYYU4f7Qic4fDAs +zLdiSIo0o1csNvIogm/P+uvSzE6eZRZUSmo49dY5SKSJt6Pjh6lM2MHEjsPKIKdA +N57wEN90q4IZv6AHE9rphqxcmF1k+j5xmhCUS1EJ+y7hyZ0S7Hghdgp/0cxSu/7Y +lVYyJpkIlQd3RPXzEf6VSYjtr9Ajp1rhvv2611q0CB5NALg/KR3OiMPYmTg5HAKO +dweNam76nG3VxTeV3y+LW/pZAbi4qAl+4/c0eOGsL7o/YSn7qhThU1AWS9kY1WxT +CrKRh58rUGRfmvpnOR99xvR4jz942RNiY61pTmsvo+iJspTII3GZhwIGlHtxE9Rn +50lWQcDuDDHfObWhzb4rS55BERIwDUqD1LgCRd0ikRxPSvI1AM4cl35b4DTaDLcn +M6EOfy+QTYsgNoftU1PI1onDQ7ZdfgrTrIBFQQRwOqfyB4bB2zWVj62LSDvZoYYi +cNUecqyE1542WNKzmyE8Mrf3uknN2J6EH7EhmiyRBtGg3NEQCwIYM4/kWPNPOtkS +jsn3cNbMNUZiSnQn/nTs4T8g6b2rrwsay/FGUE83AbPqqcTlp2RUVnjbC8KA5+iV +1wIDAQABo0IwQDAdBgNVHQ4EFgQUb789MhsOk89lMWwSwBss1TLXDFAwHwYDVR0j +BBgwFoAUoVp0nHjq6mJ/UGuFhnSK7yjbPyswDQYJKoZIhvcNAQELBQADggIBAI+G +NKZ3s3m+/R4mH3M84gGWPE1joC2bLavYYLZjKnZv18o6fHX0IW/8v5hd5Df3SP5u +vhjC88bewiKVHldqkC6ju9rbZxQynhFZGXbN9zLvFMZGkfRH5vB2Y13ZWBdWhq5L +cRxpRk6WlwaSy0Ed4F12u9ERmhMOtSZhqAnNJBeVraOHeGlcMZXJdZkeyxkdcZ4y +YJcrI8Da0dMxILgIuc9ZCynAItRAjMw1/3wjlx0Cyxif10ct+EFiP6Zv/gzoo05v +tNeqOCrxAqAcwrS1u4q/KAKySiEIyxyU1nEI/g53nALwoQhFsRVqVXNAoy7xu37y +o+lvs98rkq/NkkbBvRBPdcF/BYNtesRxKja/QAEvslyZfyICL9oqsuPPEB2nHtXa +mWntT2NaXyr1FWCxHaXfZQOxSwco3vTk7HLuNug2wxIc/hewkLlk5NCRkAYfTlan +gLhZ3vBej4oA8cdpODMb8SrYhqKTeX8E+ulHVS0paY0kszAGK2x2kHqRGNXUlfoB +Ax0etGudHhgtTCAmUgJDyQNLkvBKHYQJ2V/Wv/xej7wXKkACNKlRORl8zcnbVErd +GM/ibfqNIPIo8dP2EDycSV6vIICqkxpCZZObNjfgKa0UN03qYi7xREhhEehXgU8H +IO9w2pG7ReiO2E+bLIs0Zh1+2IwlM1EM/eqbq+Gi -----END CERTIFICATE----- diff --git a/spec/fixtures/test_certs/test.der.sha256 b/spec/fixtures/test_certs/test.der.sha256 index 8e18e9573..fb0137e98 100644 --- a/spec/fixtures/test_certs/test.der.sha256 +++ b/spec/fixtures/test_certs/test.der.sha256 @@ -1 +1 @@ -80329a197063dea8cf7905d10d221648bbdbc05b8fb1d4c2e384b831bc6590df +386ae6ef809d20ddfcc7ca68f480e82007c031b365c86cc58922cf1bd7238f89 diff --git a/spec/fixtures/test_certs/test.p12 b/spec/fixtures/test_certs/test.p12 index 0c06248e1..579c68ec2 100644 Binary files a/spec/fixtures/test_certs/test.p12 and b/spec/fixtures/test_certs/test.p12 differ diff --git a/spec/integration/outputs/compressed_indexing_spec.rb b/spec/integration/outputs/compressed_indexing_spec.rb index 3662f5434..d93663dfe 100644 --- a/spec/integration/outputs/compressed_indexing_spec.rb +++ b/spec/integration/outputs/compressed_indexing_spec.rb @@ -36,7 +36,9 @@ { "Content-Encoding" => "gzip", "Content-Type" => "application/json", - 'x-elastic-product-origin' => 'logstash-output-elasticsearch' + 'x-elastic-product-origin' => 'logstash-output-elasticsearch', + 'X-Elastic-Event-Count' => anything, + 'X-Elastic-Uncompressed-Request-Length' => anything, } } diff --git a/spec/integration/outputs/delete_spec.rb b/spec/integration/outputs/delete_spec.rb index 9fa8afd18..5d4104279 100644 --- a/spec/integration/outputs/delete_spec.rb +++ b/spec/integration/outputs/delete_spec.rb @@ -39,12 +39,12 @@ it "should ignore non-monotonic external version updates" do id = "ev2" subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)]) - r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) + r = es.get(:index => 'logstash-delete', :id => id, :refresh => true) expect(r['_version']).to eq(99) expect(r['_source']['message']).to eq('foo') subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)]) - r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) + r2 = es.get(:index => 'logstash-delete', :id => id, :refresh => true) expect(r2['_version']).to eq(99) expect(r2['_source']['message']).to eq('foo') end @@ -52,12 +52,12 @@ it "should commit monotonic external version updates" do id = "ev3" subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)]) - r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) + r = es.get(:index => 'logstash-delete', :id => id, :refresh => true) expect(r['_version']).to eq(99) expect(r['_source']['message']).to eq('foo') subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)]) - expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect { es.get(:index => 'logstash-delete', :id => id, :refresh => true) }.to raise_error(get_expected_error_class) end end end diff --git a/spec/integration/outputs/ilm_spec.rb b/spec/integration/outputs/ilm_spec.rb index e6c2dce10..0262ab426 100644 --- a/spec/integration/outputs/ilm_spec.rb +++ b/spec/integration/outputs/ilm_spec.rb @@ -102,7 +102,7 @@ it 'should not install the default policy' do subject.register sleep(1) - expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class) end it 'should not write the ILM settings into the template' do @@ -282,12 +282,12 @@ subject.register sleep(1) expect(@es.indices.exists_alias(name: "logstash")).to be_truthy - expect(@es.get_alias(name: "logstash")).to include("logstash-000001") + expect(@es.indices.get_alias(name: "logstash")).to include("logstash-000001") end end it 'should install it if it is not present' do - expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class) subject.register sleep(1) expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.not_to raise_error @@ -298,7 +298,7 @@ subject.register sleep(1) expect(@es.indices.exists_alias(name: "logstash")).to be_truthy - expect(@es.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001") + expect(@es.indices.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001") end it 'should ingest into a single index' do @@ -340,14 +340,14 @@ let (:policy) { small_max_doc_policy } before do - expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class) put_policy(@es,ilm_policy_name, policy) end it 'should not install the default policy if it is not used' do subject.register sleep(1) - expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class) end end @@ -357,14 +357,14 @@ let (:policy) { max_age_policy("1d") } before do - expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class) put_policy(@es,ilm_policy_name, policy) end it 'should not install the default policy if it is not used' do subject.register sleep(1) - expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class) end end @@ -374,7 +374,7 @@ subject.register sleep(1) expect(@es.indices.exists_alias(name: expected_index)).to be_truthy - expect(@es.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001") + expect(@es.indices.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001") end it 'should write the ILM settings into the template' do @@ -443,17 +443,18 @@ subject.register sleep(1) expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy - expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001") + expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001") end context 'when the custom rollover alias already exists' do it 'should ignore the already exists error' do expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_falsey - put_alias(@es, "#{ilm_rollover_alias}-#{todays_date}-000001", ilm_rollover_alias) + @es.indices.create(index: "#{ilm_rollover_alias}-#{todays_date}-000001") + @es.indices.put_alias(name: ilm_rollover_alias, index: "#{ilm_rollover_alias}-#{todays_date}-000001") expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy subject.register sleep(1) - expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001") + expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001") end end @@ -532,3 +533,8 @@ end end + +def get_expected_error_class + return Elastic::Transport::Transport::Errors::NotFound if elastic_ruby_v8_client_available? + Elasticsearch::Transport::Transport::Errors::NotFound +end diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 02f3fb766..191973297 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -215,12 +215,22 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false); it "sets the correct content-type header" do expected_manticore_opts = { - :headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'}, + :headers => { + "Content-Type" => "application/json", + 'x-elastic-product-origin' => 'logstash-output-elasticsearch', + 'X-Elastic-Event-Count' => anything, + 'X-Elastic-Uncompressed-Request-Length' => anything + }, :body => anything } if secure expected_manticore_opts = { - :headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'}, + :headers => { + "Content-Type" => "application/json", + 'x-elastic-product-origin' => 'logstash-output-elasticsearch', + 'X-Elastic-Event-Count' => anything, + 'X-Elastic-Uncompressed-Request-Length' => anything + }, :body => anything, :auth => { :user => user, diff --git a/spec/integration/outputs/index_version_spec.rb b/spec/integration/outputs/index_version_spec.rb index 0b1ecda94..d73872f49 100644 --- a/spec/integration/outputs/index_version_spec.rb +++ b/spec/integration/outputs/index_version_spec.rb @@ -36,11 +36,11 @@ it "should default to ES version" do subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")]) - r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true) + r = es.get(:index => 'logstash-index', :id => '123', :refresh => true) expect(r["_version"]).to eq(1) expect(r["_source"]["message"]).to eq('foo') subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")]) - r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true) + r2 = es.get(:index => 'logstash-index', :id => '123', :refresh => true) expect(r2["_version"]).to eq(2) expect(r2["_source"]["message"]).to eq('foobar') end @@ -63,7 +63,7 @@ it "should respect the external version" do id = "ev1" subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")]) - r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true) + r = es.get(:index => 'logstash-index', :id => id, :refresh => true) expect(r["_version"]).to eq(99) expect(r["_source"]["message"]).to eq('foo') end @@ -71,12 +71,12 @@ it "should ignore non-monotonic external version updates" do id = "ev2" subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")]) - r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true) + r = es.get(:index => 'logstash-index', :id => id, :refresh => true) expect(r["_version"]).to eq(99) expect(r["_source"]["message"]).to eq('foo') subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")]) - r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true) + r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true) expect(r2["_version"]).to eq(99) expect(r2["_source"]["message"]).to eq('foo') end @@ -84,12 +84,12 @@ it "should commit monotonic external version updates" do id = "ev3" subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")]) - r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true) + r = es.get(:index => 'logstash-index', :id => id, :refresh => true) expect(r["_version"]).to eq(99) expect(r["_source"]["message"]).to eq('foo') subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")]) - r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true) + r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true) expect(r2["_version"]).to eq(100) expect(r2["_source"]["message"]).to eq('foo') end diff --git a/spec/integration/outputs/painless_update_spec.rb b/spec/integration/outputs/painless_update_spec.rb index 02116fa51..0f4eabe06 100644 --- a/spec/integration/outputs/painless_update_spec.rb +++ b/spec/integration/outputs/painless_update_spec.rb @@ -22,11 +22,12 @@ def get_es_output( options={} ) # This can fail if there are no indexes, ignore failure. @es.indices.delete(:index => "*") rescue nil @es.index( - :index => 'logstash-update', - :type => doc_type, - :id => "123", - :body => { :message => 'Test', :counter => 1 } - ) + { + :index => 'logstash-update', + :id => '123', + :body => { :message => 'Test', :counter => 1 }, + :refresh => true + }) @es.indices.refresh end @@ -46,7 +47,7 @@ def get_es_output( options={} ) subject = get_es_output(plugin_parameters) subject.register subject.multi_receive([LogStash::Event.new("count" => 4 )]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => "123", :refresh => true) expect(r["_source"]["counter"]).to eq(5) end end @@ -57,7 +58,7 @@ def get_es_output( options={} ) subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' }) subject.register subject.multi_receive([LogStash::Event.new("message" => "sample message here")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true) expect(r["_source"]["message"]).to eq('upsert message') end @@ -65,7 +66,7 @@ def get_es_output( options={} ) subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true }) subject.register subject.multi_receive([LogStash::Event.new("message" => "sample message here")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true) expect(r["_source"]["message"]).to eq('sample message here') end @@ -82,7 +83,7 @@ def get_es_output( options={} ) subject.register subject.multi_receive([LogStash::Event.new("message" => "sample message here")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true) expect(r["_source"]["message"]).to eq('upsert message') end @@ -91,7 +92,7 @@ def get_es_output( options={} ) subject.register subject.multi_receive([LogStash::Event.new("counter" => 1)]) @es.indices.refresh - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true) expect(r["_source"]["counter"]).to eq(1) end end diff --git a/spec/integration/outputs/unsupported_actions_spec.rb b/spec/integration/outputs/unsupported_actions_spec.rb index 1ac4f9e3e..c3b12f713 100644 --- a/spec/integration/outputs/unsupported_actions_spec.rb +++ b/spec/integration/outputs/unsupported_actions_spec.rb @@ -27,16 +27,21 @@ def get_es_output( options={} ) @es.indices.delete(:index => "*") rescue nil # index single doc for update purpose @es.index( - :index => INDEX, - :type => doc_type, - :id => "2", - :body => { :message => 'Test to doc indexing', :counter => 1 } + { + :index => INDEX, + :id => '2', + :body => { :message => 'Test to doc indexing', :counter => 1 }, + :refresh => true + } ) + @es.index( - :index => INDEX, - :type => doc_type, - :id => "3", - :body => { :message => 'Test to doc deletion', :counter => 2 } + { + :index => INDEX, + :id => '3', + :body => { :message => 'Test to doc deletion', :counter => 2 }, + :refresh => true + } ) @es.indices.refresh end @@ -63,12 +68,12 @@ def get_es_output( options={} ) rejected_events = events.select { |event| !index_or_update.call(event) } indexed_events.each do |event| - response = @es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true) + response = @es.get(:index => INDEX, :id => event.get("doc_id"), :refresh => true) expect(response['_source']['message']).to eq(event.get("message")) end rejected_events.each do |event| - expect {@es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect {@es.get(:index => INDEX, :id => event.get("doc_id"), :refresh => true)}.to raise_error(get_expected_error_class) end end end diff --git a/spec/integration/outputs/update_spec.rb b/spec/integration/outputs/update_spec.rb index 4a86dc77e..1c28f6666 100644 --- a/spec/integration/outputs/update_spec.rb +++ b/spec/integration/outputs/update_spec.rb @@ -22,10 +22,12 @@ def get_es_output( options={} ) # This can fail if there are no indexes, ignore failure. @es.indices.delete(:index => "*") rescue nil @es.index( - :index => 'logstash-update', - :type => doc_type, - :id => "123", - :body => { :message => 'Test', :counter => 1 } + { + :index => 'logstash-update', + :id => '123', + :body => { :message => 'Test', :counter => 1 }, + :refresh => true + } ) @es.indices.refresh end @@ -40,14 +42,14 @@ def get_es_output( options={} ) subject = get_es_output({ 'document_id' => "456" } ) subject.register subject.multi_receive([LogStash::Event.new("message" => "sample message here")]) - expect {@es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + expect {@es.get(:index => 'logstash-update', :id => '456', :refresh => true)}.to raise_error(get_expected_error_class) end it "should update existing document" do subject = get_es_output({ 'document_id' => "123" }) subject.register subject.multi_receive([LogStash::Event.new("message" => "updated message here")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => '123', :refresh => true) expect(r["_source"]["message"]).to eq('updated message here') end @@ -57,7 +59,7 @@ def get_es_output( options={} ) subject = get_es_output({ 'document_id' => "123" }) subject.register subject.multi_receive([LogStash::Event.new("data" => "updated message here", "message" => "foo")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => '123', :refresh => true) expect(r["_source"]["data"]).to eq('updated message here') expect(r["_source"]["message"]).to eq('foo') end @@ -94,7 +96,7 @@ def get_es_output( options={} ) subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' }) subject.register subject.multi_receive([LogStash::Event.new("message" => "sample message here")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => '456', :refresh => true) expect(r["_source"]["message"]).to eq('upsert message') end @@ -102,7 +104,7 @@ def get_es_output( options={} ) subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true }) subject.register subject.multi_receive([LogStash::Event.new("message" => "sample message here")]) - r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true) + r = @es.get(:index => 'logstash-update', :id => '456', :refresh => true) expect(r["_source"]["message"]).to eq('sample message here') end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 207d83d5b..e84fe7617 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -8,3 +8,11 @@ module LogStash::Outputs::ElasticSearch::SpecHelper RSpec.configure do |config| config.include LogStash::Outputs::ElasticSearch::SpecHelper end + +# remove once plugin starts consuming elasticsearch-ruby v8 client +def elastic_ruby_v8_client_available? + Elasticsearch::Transport + false +rescue NameError # NameError: uninitialized constant Elasticsearch::Transport if Elastic Ruby client is not available + true +end \ No newline at end of file diff --git a/spec/support/elasticsearch/api/actions/get_alias.rb b/spec/support/elasticsearch/api/actions/get_alias.rb deleted file mode 100644 index ef4ebbd4f..000000000 --- a/spec/support/elasticsearch/api/actions/get_alias.rb +++ /dev/null @@ -1,18 +0,0 @@ -# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -# or more contributor license agreements. Licensed under the Elastic License; -# you may not use this file except in compliance with the Elastic License. - -module Elasticsearch - module API - module Actions - - # Retrieve the list of index lifecycle management policies - def get_alias(arguments={}) - method = HTTP_GET - path = Utils.__pathify '_alias', Utils.__escape(arguments[:name]) - params = {} - perform_request(method, path, params, nil).body - end - end - end -end \ No newline at end of file diff --git a/spec/support/elasticsearch/api/actions/put_alias.rb b/spec/support/elasticsearch/api/actions/put_alias.rb deleted file mode 100644 index d0585934f..000000000 --- a/spec/support/elasticsearch/api/actions/put_alias.rb +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -# or more contributor license agreements. Licensed under the Elastic License; -# you may not use this file except in compliance with the Elastic License. - -module Elasticsearch - module API - module Actions - - # @option arguments [String] :name The name of the alias (*Required*) - # @option arguments [Hash] :The alias definition(*Required*) - - def put_alias(arguments={}) - raise ArgumentError, "Required argument 'name' missing" unless arguments[:name] - raise ArgumentError, "Required argument 'body' missing" unless arguments[:body] - method = HTTP_PUT - path = Utils.__pathify Utils.__escape(arguments[:name]) - - params = Utils.__validate_and_extract_params arguments - body = arguments[:body] - perform_request(method, path, params, body.to_json).body - end - end - end -end diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index d4cee8f35..c2a93ac14 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -270,6 +270,83 @@ end end + context "the 'user-agent' header" do + let(:pool) { double("pool") } + let(:compression_level) { 6 } + let(:base_options) { super().merge( :client_settings => {:compression_level => compression_level}) } + let(:actions) { [ + ["index", {:_id=>nil, :_index=>"logstash"}, {"message_1"=> message_1}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message_2"=> message_2}], + ["index", {:_id=>nil, :_index=>"logstash"}, {"message_3"=> message_3}], + ]} + let(:message_1) { "hello" } + let(:message_2_size) { 1_000 } + let(:message_2) { SecureRandom.alphanumeric(message_2_size / 2 ) * 2 } + let(:message_3_size) { 1_000 } + let(:message_3) { "m" * message_3_size } + let(:messages_size) { message_1.size + message_2.size + message_3.size } + let(:action_overhead) { 42 + 16 + 2 } # header plus doc key size plus new line overhead per action + + let(:response) do + response = double("response") + allow(response).to receive(:code).and_return(response) + allow(response).to receive(:body).and_return({"errors" => false}.to_json) + response + end + + before(:each) do + subject.instance_variable_set("@pool", pool) + end + + it "carries bulk request's uncompressed size" do + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Event-Count"]).to eq("3") + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s + end.and_return(response) + + subject.send(:bulk, actions) + end + context "without compression" do + let(:compression_level) { 0 } + it "carries bulk request's uncompressed size" do + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Event-Count"]).to eq("3") + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s + end.and_return(response) + subject.send(:bulk, actions) + end + end + + context "with compressed messages over 20MB" do + let(:message_2_size) { 21_000_000 } + it "carries bulk request's uncompressed size" do + # only the first, tiny, message is sent first + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_1.size + action_overhead).to_s + expect(headers["X-Elastic-Event-Count"]).to eq("1") + end.and_return(response) + + # huge message_2 is sent afterwards alone + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_2.size + action_overhead).to_s + expect(headers["X-Elastic-Event-Count"]).to eq("1") + end.and_return(response) + + # finally medium message_3 is sent alone as well + expect(pool).to receive(:post) do |path, params, body| + headers = params.fetch(:headers, {}) + expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_3.size + action_overhead).to_s + expect(headers["X-Elastic-Event-Count"]).to eq("1") + end.and_return(response) + + subject.send(:bulk, actions) + end + end + end end describe "sniffing" do diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 35c908045..1e886d30c 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -777,7 +777,7 @@ end before(:each) do - allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions| + allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array), instance_of(Hash)) do |stream, actions, headers| expect( stream.string ).to include '"foo":"bar1"' expect( stream.string ).to include '"foo":"bar2"' end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely diff --git a/spec/unit/outputs/error_whitelist_spec.rb b/spec/unit/outputs/error_whitelist_spec.rb index 9780376fd..e6cbbcfff 100644 --- a/spec/unit/outputs/error_whitelist_spec.rb +++ b/spec/unit/outputs/error_whitelist_spec.rb @@ -4,7 +4,6 @@ describe "whitelisting error types in expected behavior" do let(:template) { '{"template" : "not important, will be updated by :index"}' } let(:event1) { LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z") } - let(:action1) { ["index", {:_id=>1, :routing=>nil, :_index=>"logstash-2014.11.17", :_type=> doc_type }, event1] } let(:settings) { {"manage_template" => true, "index" => "logstash-2014.11.17", "template_overwrite" => true, "hosts" => get_host_port() } } subject { LogStash::Outputs::ElasticSearch.new(settings) }