ElasticSearch Loader 0.12.1 Crashes and Schema Errors

We are loading data from the Good kinesis stream into a ElasticSearch cluster using the ElasticSearch Loader. However, we are experiencing a lot of crashes, seemingly due to out of memory issues. Can anyone help with explaining the possible reasons?

Below is our configuration:

    # Copyright (c) 2014-2020 Snowplow Analytics Ltd. All rights reserved.
    #
    # This program is licensed to you under the Apache License Version 2.0, and
    # you may not use this file except in compliance with the Apache License
    # Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
    # http://www.apache.org/licenses/LICENSE-2.0.
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the Apache License Version 2.0 is distributed on an "AS
    # IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    # implied.  See the Apache License Version 2.0 for the specific language
    # governing permissions and limitations there under.

    # This file (config.hocon.sample) contains a template with
    # configuration options for the Elasticsearch Loader.

    # Sources currently supported are:
    # "kinesis" for reading records from a Kinesis stream
    # "stdin" for reading unencoded tab-separated events from stdin
    # If set to "stdin", JSON documents will not be sent to Elasticsearch
    # but will be written to stdout.
    # "nsq" for reading unencoded tab-separated events from NSQ
    source = kinesis

    # Where to write good and bad records
    sink {
      # Sinks currently supported are:
      # "elasticsearch" for writing good records to Elasticsearch
      # "stdout" for writing good records to stdout
      good = "elasticsearch"

      # Sinks currently supported are:
      # "kinesis" for writing bad records to Kinesis
      # "stderr" for writing bad records to stderr
      # "nsq" for writing bad records to NSQ
      # "none" for ignoring bad records
      bad = "stderr"
    }

    # "good" for a stream of successfully enriched events
    # "bad" for a stream of bad events
    # "plain-json" for writing plain json
    enabled = "good"

    # The following are used to authenticate for the Amazon Kinesis sink.
    #
    # If both are set to "default", the default provider chain is used
    # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
    #
    # If both are set to "iam", use AWS IAM Roles to provision credentials.
    #
    # If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
    aws {
      accessKey = iam
      secretKey = iam
    }

    queue {
      # What queue to use, can be "kinesis" or "nsq"
      enabled="kinesis"

      # Config for Kinesis
      # "LATEST": most recent data.
      # "TRIM_HORIZON": oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only affects the first run of this application on a stream.
      initialPosition = "TRIM_HORIZON"

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000

      # Region where the Kinesis stream is located
      region = "redacted"

      # "appName" is used for a DynamoDB table to maintain stream state.
      # You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
      appName = "snowplow_elasticsearch_good"

      # Config for NSQ
      # Channel name for NSQ source
      # If more than one application reading from the same NSQ topic at the same time,
      # all of them must have unique channel name for getting all the data from the same topic
      #channelName = "{{nsqSourceChannelName}}"

      # Host name for nsqd
      #nsqdHost = "{{nsqdHost}}"
      # HTTP port for nsqd
      #nsqdPort = {{nsqdPort}}

      # Host name for nsqlookupd
      #nsqlookupdHost = "{{nsqlookupdHost}}"
      # HTTP port for nsqd
      #nsqlookupdPort = {{nsqlookupdPort}}
    }

    # Common configuration section for all stream sources
    streams {
      inStreamName = "snowplow-good"

      # Stream for enriched events which are rejected by Elasticsearch
      outStreamName = "snowplow-bad"

      # Events are accumulated in a buffer before being sent to Elasticsearch.
      # The buffer is emptied whenever:
      # - the combined size of the stored records exceeds byteLimit or
      # - the number of stored records exceeds recordLimit or
      # - the time in milliseconds since it was last emptied exceeds timeLimit
      buffer {
        byteLimit = 5000000 # Not supported by NSQ, will be ignored
        recordLimit = 1000
        timeLimit = 5000 # Not supported by NSQ, will be ignored
      }
    }

    elasticsearch {

      # Events are indexed using an Elasticsearch Client
      # - endpoint: the cluster endpoint
      # - port: the port the cluster can be accessed on
      #   - for http this is usually 9200
      #   - for transport this is usually 9300
      # - username (optional, remove if not active): http basic auth username
      # - password (optional, remove if not active): http basic auth password
      # - shardDateFormat (optional, remove if not needed): formatting used for sharding good stream, i.e. _yyyy-MM-dd
      # - shardDateField (optional, if not specified derived_tstamp is used): timestamp field for sharding good stream
      # - max-timeout: the maximum attempt time before a client restart
      # - ssl: if using the http client, whether to use ssl or not
      client {
        endpoint = "redacted"
        port = "443"
        # username = "{{elasticsearchUsername}}"
        # password = "{{elasticsearchPassword}}"
        shardDateFormat = "-yyyy-MM-dd"
        shardDateField = "collector_tstamp"
        maxTimeout = "60000"
        maxRetries = 3
        ssl = true
      }

      # When using the AWS ES service
      # - signing: if using the http client and the AWS ES service you can sign your requests
      #    http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
      # - region where the AWS ES service is located
      aws {
        signing = true
        region = "redacted"
      }

      # index: the Elasticsearch index name
      # type: the Elasticsearch index type
      cluster {
        name = "redacted"
        index = "good-stream"
        documentType = "_doc"
      }
    }


    # Optional section for tracking endpoints
    monitoring {
      snowplow {
        collectorUri = "redacted"
        collectorPort = 443
        appId = ${SNOWPLOW_APP_ID}
        method = POST
        ssl = "true"
      }
    }

We originally started off with a T3.Small instance on AWS, which has 2 vCPUs and 2GB of memory. But it would often crash once a day. Today I finally decided to fix this, and changed the instance type to R5.Large, giving it 2 vCPUs and 16GB of memory. However, despite doing this, the memory will keep increasing and the process will eventually crash again.

I have also tried to change the record limit, byte limit, maxRetries, maxTimeout values. But nothing helps.

We do seem to get a lot of errors with ES rejecting some of the documents as well. Would this cause a problem? We have defined a field to be both a float and a string. But in ES it is mapped to float, causing all documents with this field as strings to fail. Is this an issue with the auto-schema generation by the ES loader?

1 Like

Hi @pat,

Indeed in Elasticsearch there are no union type and a field can have only one type, and this type will be the first one that ES sees for this field. Then all the events with a different type for this field will fail to be inserted. You should be able to solve this issue by setting this field to string in ES mapping and then using dynamic templates to map float values to string for the faulty field.

Still this doesn’t explain the OutOfMemory issues. In your configuration you have maxRetries = 3 so after 3 failed attempts to write to ES the events should be sent to bad and memory should not be used any more for these events. But there seems to be a memory leak.

Would you be able to use profiling on your ES loader to know what is using the memory ?

You can do that by adding -Dcom.sun.management.jmxremote.port=5555 -Dcom.sun.management.jmxremote.rmi.port=5555 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=127.0.0.1 to the JAVA options when running ES loader (5555 being the port that you want to use) and then you can inspect the JVM with a tool like visualvm for instance.