We are loading data from the Good kinesis stream into a ElasticSearch cluster using the ElasticSearch Loader. However, we are experiencing a lot of crashes, seemingly due to out of memory issues. Can anyone help with explaining the possible reasons?
Below is our configuration:
# Copyright (c) 2014-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (config.hocon.sample) contains a template with
# configuration options for the Elasticsearch Loader.
# Sources currently supported are:
# "kinesis" for reading records from a Kinesis stream
# "stdin" for reading unencoded tab-separated events from stdin
# If set to "stdin", JSON documents will not be sent to Elasticsearch
# but will be written to stdout.
# "nsq" for reading unencoded tab-separated events from NSQ
source = kinesis
# Where to write good and bad records
sink {
# Sinks currently supported are:
# "elasticsearch" for writing good records to Elasticsearch
# "stdout" for writing good records to stdout
good = "elasticsearch"
# Sinks currently supported are:
# "kinesis" for writing bad records to Kinesis
# "stderr" for writing bad records to stderr
# "nsq" for writing bad records to NSQ
# "none" for ignoring bad records
bad = "stderr"
}
# "good" for a stream of successfully enriched events
# "bad" for a stream of bad events
# "plain-json" for writing plain json
enabled = "good"
# The following are used to authenticate for the Amazon Kinesis sink.
#
# If both are set to "default", the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
#
# If both are set to "iam", use AWS IAM Roles to provision credentials.
#
# If both are set to "env", use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = iam
secretKey = iam
}
queue {
# What queue to use, can be "kinesis" or "nsq"
enabled="kinesis"
# Config for Kinesis
# "LATEST": most recent data.
# "TRIM_HORIZON": oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = "TRIM_HORIZON"
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# Region where the Kinesis stream is located
region = "redacted"
# "appName" is used for a DynamoDB table to maintain stream state.
# You can set it automatically using: "SnowplowElasticsearchSink-${sink.kinesis.in.stream-name}"
appName = "snowplow_elasticsearch_good"
# Config for NSQ
# Channel name for NSQ source
# If more than one application reading from the same NSQ topic at the same time,
# all of them must have unique channel name for getting all the data from the same topic
#channelName = "{{nsqSourceChannelName}}"
# Host name for nsqd
#nsqdHost = "{{nsqdHost}}"
# HTTP port for nsqd
#nsqdPort = {{nsqdPort}}
# Host name for nsqlookupd
#nsqlookupdHost = "{{nsqlookupdHost}}"
# HTTP port for nsqd
#nsqlookupdPort = {{nsqlookupdPort}}
}
# Common configuration section for all stream sources
streams {
inStreamName = "snowplow-good"
# Stream for enriched events which are rejected by Elasticsearch
outStreamName = "snowplow-bad"
# Events are accumulated in a buffer before being sent to Elasticsearch.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 5000000 # Not supported by NSQ, will be ignored
recordLimit = 1000
timeLimit = 5000 # Not supported by NSQ, will be ignored
}
}
elasticsearch {
# Events are indexed using an Elasticsearch Client
# - endpoint: the cluster endpoint
# - port: the port the cluster can be accessed on
# - for http this is usually 9200
# - for transport this is usually 9300
# - username (optional, remove if not active): http basic auth username
# - password (optional, remove if not active): http basic auth password
# - shardDateFormat (optional, remove if not needed): formatting used for sharding good stream, i.e. _yyyy-MM-dd
# - shardDateField (optional, if not specified derived_tstamp is used): timestamp field for sharding good stream
# - max-timeout: the maximum attempt time before a client restart
# - ssl: if using the http client, whether to use ssl or not
client {
endpoint = "redacted"
port = "443"
# username = "{{elasticsearchUsername}}"
# password = "{{elasticsearchPassword}}"
shardDateFormat = "-yyyy-MM-dd"
shardDateField = "collector_tstamp"
maxTimeout = "60000"
maxRetries = 3
ssl = true
}
# When using the AWS ES service
# - signing: if using the http client and the AWS ES service you can sign your requests
# http://docs.aws.amazon.com/general/latest/gr/signing_aws_api_requests.html
# - region where the AWS ES service is located
aws {
signing = true
region = "redacted"
}
# index: the Elasticsearch index name
# type: the Elasticsearch index type
cluster {
name = "redacted"
index = "good-stream"
documentType = "_doc"
}
}
# Optional section for tracking endpoints
monitoring {
snowplow {
collectorUri = "redacted"
collectorPort = 443
appId = ${SNOWPLOW_APP_ID}
method = POST
ssl = "true"
}
}
We originally started off with a T3.Small instance on AWS, which has 2 vCPUs and 2GB of memory. But it would often crash once a day. Today I finally decided to fix this, and changed the instance type to R5.Large, giving it 2 vCPUs and 16GB of memory. However, despite doing this, the memory will keep increasing and the process will eventually crash again.
I have also tried to change the record limit, byte limit, maxRetries, maxTimeout values. But nothing helps.
We do seem to get a lot of errors with ES rejecting some of the documents as well. Would this cause a problem? We have defined a field to be both a float and a string. But in ES it is mapped to float, causing all documents with this field as strings to fail. Is this an issue with the auto-schema generation by the ES loader?