Hello,
we are currently using a stream enrich flavour of enrich (v2.0.5) and were in the process of upgrading to the new enrich kinesis flavour of enrich (v3.2.2), when we noticed a bunch of Java heap-space errors like the following that never occurred while using v2.0.5
software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: Java heap space
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:98)
at software.amazon.awssdk.core.exception.SdkClientException.create(SdkClientException.java:43)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:204)
at software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper.setLastException(RetryableStageHelper.java:200)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:179)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:159)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:76)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$null$0(MakeAsyncHttpRequestStage.java:105)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$WrappedErrorForwardingResponseHandler.onError(MakeAsyncHttpRequestStage.java:159)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.lambda$notifyError$6(ResponseHandler.java:386)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:249)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.access$600(ResponseHandler.java:74)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.notifyError(ResponseHandler.java:384)
at software.amazon.awssdk.http.nio.netty.internal.utils.ExceptionHandlingUtils.tryCatch(ExceptionHandlingUtils.java:42)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onNext(ResponseHandler.java:338)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler$PublisherAdapter$1.onNext(ResponseHandler.java:289)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.publishMessage(HandlerPublisher.java:407)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HandlerPublisher.channelRead(HandlerPublisher.java:383)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:228)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:199)
at software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:173)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at software.amazon.awssdk.http.nio.netty.internal.LastHttpContentHandler.channelRead(LastHttpContentHandler.java:43)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.onDataRead(Http2ToHttpInboundAdapter.java:85)
at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:49)
at software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter.channelRead0(Http2ToHttpInboundAdapter.java:42)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.doRead0(AbstractHttp2StreamChannel.java:901)
at io.netty.handler.codec.http2.AbstractHttp2StreamChannel.fireChildRead(AbstractHttp2StreamChannel.java:555)
at io.netty.handler.codec.http2.Http2MultiplexHandler.channelRead(Http2MultiplexHandler.java:180)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.http2.Http2FrameCodec.onHttp2Frame(Http2FrameCodec.java:707)
at io.netty.handler.codec.http2.Http2FrameCodec$FrameListener.onDataRead(Http2FrameCodec.java:646)
at io.netty.handler.codec.http2.Http2FrameListenerDecorator.onDataRead(Http2FrameListenerDecorator.java:36)
at io.netty.handler.codec.http2.Http2EmptyDataFrameListener.onDataRead(Http2EmptyDataFrameListener.java:49)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:307)
at io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:415)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:250)
at io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:159)
at io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
at io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:173)
at io.netty.handler.codec.http2.DecoratingHttp2ConnectionDecoder.decodeFrame(DecoratingHttp2ConnectionDecoder.java:63)
at io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
at io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space
Might there be an issue with the config for the new Enrich flavour from our side
Our config file for v2.0.5
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = "{{ .Values.environment }}-{{ .Values.tenant }}-collected-good"
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = "{{ .Values.environment }}-enriched-good"
# Stream/topic where the event that failed enrichment will be stored
bad = "{{ .Values.environment }}-enriched-bad"
# Stream/topic where the pii tranformation events will end up
#pii = "__TOPIC_PII__"
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = "event_id"
}
# Configuration shown is for Kafka, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
enabled = "kinesis"
# Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
region = eu-central-1
# region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {kinesisEndpoint}
# customEndpoint = ${?ENRICH_STREAMS_SOURCE_SINK_CUSTOM_ENDPOINT}
## Optional endpoint url configuration to override aws dyanomdb endpoints for Kinesis checkpoints lease table,
## this can be used to specify local endpoints when using Localstack
# dynamodbCustomEndpoint = "http://localhost:4569"
# dynamodbCustomEndpoint = ${?ENRICH_DYNAMODB_CUSTOM_ENDPOINT}
# Optional override to disable cloudwatch
disableCloudWatch = true
# disableCloudWatch = ${?ENRICH_DISABLE_CLOUDWATCH}
# AWS credentials
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = default
secretKey = default
}
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# maxRecords = ${?ENRICH_MAX_RECORDS}
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# (pertinent to kinesis source type)
initialPosition = TRIM_HORIZON
# initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION}
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "2017-05-17T10:00:00Z"
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 1000
maxBackoff = 10000
}
# Or Kafka (Comment out for other types)
# brokers = "__KAFKA_BROKERS__"
# Number of retries to perform before giving up on sending a record
# retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# retries -> retries
# "buffer.memory" -> buffer.byteLimit
# "linger.ms" -> buffer.timeLimit
#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}
# The kafka consumer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#consumerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" -> brokers
# "group.id" -> appName
#consumerConf {
# "enable.auto.commit" = true
# "auto.commit.interval.ms" = 1000
# "auto.offset.reset" = earliest
# "session.timeout.ms" = 30000
# "key.deserializer" = "org.apache.kafka.common.serialization.StringDeserializer"
# "value.deserializer" = "org.apache.kafka.common.serialization.ByteArrayDeserializer"
#}
# Or NSQ
## Channel name for nsq source
## If more than one application is reading from the same NSQ topic at the same time,
## all of them must have the same channel name
#rawChannel = "{nsqSourceChannelName}"
## Host name for nsqd
#host = "{nsqHost}"
## TCP port for nsqd, 4150 by default
#port = {nsqdPort}
## Host name for lookupd
#lookupHost = "{lookupHost}"
## HTTP port for nsqlookupd, 4161 by default
#lookupPort = {nsqlookupdPort}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
# Snowplow enriched event has about 2.25kb / event
buffer {
byteLimit = 4500000
recordLimit = 500 # Not supported by Kafka; will be ignored
timeLimit = 250
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = "{{ .Values.environment }}-{{ .Values.tenant }}-stream-enrich-manifest"
}
# The setting below requires an adapter being ready, i.e.: https://github.com/snowplow-incubator/remote-adapter-example
# remoteAdapters = [
# {
# vendor: "com.globeandmail"
# version: "v1"
# url: "http://remote-adapter-example:8995/sampleRemoteAdapter"
# connectionTimeout: 1000
# readTimeout: 5000
# }
# ]
# # Optional section for tracking endpoints
# monitoring {
# snowplow {
# collectorUri = "{collectorUri}"
# collectorUri = ${?ENRICH_MONITORING_COLLECTOR_URI}
# collectorPort = 80
# collectorPort = ${?ENRICH_MONITORING_COLLECTOR_PORT}
# appId = {enrichAppName}
# appId = ${?ENRICH_MONITORING_APP_ID}
# method = GET
# method = ${?ENRICH_MONITORING_METHOD}
# }
# }
}
Our config file for v3.2.2
{
# Where to read collector payloads from
"input": {
"type": "Kinesis"
# Optional. Name of the application which the KCL daemon should assume
"appName": "{{ .Values.environment }}-{{ .Values.tenant }}-stream-enrich-manifest"
# Name of the Kinesis stream to read from
"streamName": "{{ .Values.environment }}-{{ .Values.tenant }}-collected-good"
# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"
# Optional, set the initial position to consume the Kinesis stream
# Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# AT_TIMESTAMP: start from the record at or after the specified timestamp
"initialPosition": {
"type": "TRIM_HORIZON"
}
# "initialPosition": {
# "type": "AT_TIMESTAMP"
# "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
# }
# Optional, set the mode for retrieving records.
"retrievalMode": {
"type": "Polling"
# Maximum size of a batch returned by a call to getRecords.
# Records are checkpointed after a batch has been fully processed,
# thus the smaller maxRecords, the more often records can be checkpointed
# into DynamoDb, but possibly reducing the throughput.
"maxRecords": 10000
}
# "retrievalMode": {
# "type": "FanOut"
# }
# Optional. Size of the internal buffer used when reading messages from Kinesis,
# each buffer holding up to maxRecords from above
#"bufferSize": 3
# Optional, endpoint url configuration to override aws kinesis endpoints
# Can be used to specify local endpoint when using localstack
# "customEndpoint": "http://localhost:4566"
# Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
# Can be used to specify local endpoint when using localstack
# "dynamodbCustomEndpoint": "http://localhost:4569"
# Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
# Can be used to specify local endpoint when using localstack
# "cloudwatchCustomEndpoint": "http://localhost:4582"
}
"output": {
# Enriched events output
"good": {
"type": "Kinesis"
# Name of the Kinesis stream to write to
"streamName": "{{ .Values.environment }}-enriched-good"
# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"
# Optional. How the output stream/topic will be partitioned in Kinesis
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible partition keys correspond to.
# Otherwise, the partition key will be a random UUID.
"partitionKey": "event_id"
# Optional. Policy for cats-retry to retry after failures writing to kinesis
"backoffPolicy": {
"minBackoff": 1000 milliseconds
"maxBackoff": 10000 milliseconds
"maxRetries": 10
}
# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis
# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds
# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
"maxCount": 500
# Maximum amount of data to send with a PutRecords request
"maxSize": 5242880
}
# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
# If not specified, aggregation is deactivated
#"aggregation": {
# # Maximum number of enriched events to pack into an aggregated Kinesis record
# "maxCount": 4294967295
#
# # Maximum number of bytes to pack into an aggregated Kinesis record
# "maxSize": 51200
#}
# Optional. Maximum number of connections to open in the backend.
# HTTP requests are sent in parallel over multiple connections.
# Setting this too high may impact latency and consume additional resources
# without increasing throughput
"maxConnections": 24
# Optional. Minimum level of logs for the native KPL daemon.
# Logs show up on stderr
# Possible values: trace | debug | info | warning | error
"logLevel": "warning"
# Optional. Use a custom Kinesis endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS.
# Needs to be specified along with customPort
# "customEndpoint": "localhost"
# Optional. Server port to connect to for Kinesis.
# Needs to be specified along with customEndpoint
# "customPort": 4566
# Optional. Use a custom Cloudwatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS
# Needs to be specified along with cloudwatchPort
# "cloudwatchEndpoint": "localhost"
# Optional. Server port to connect to for CloudWatch.
# Needs to be specified along with cloudwatchPort
# "cloudwatchPort": 4582
}
# Bad rows output
"bad": {
"type": "Kinesis"
# Name of the Kinesis stream to write to
"streamName": "{{ .Values.environment }}-enriched-bad"
# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": "eu-central-1"
# Optional. Policy for cats-retry to retry after failures writing to kinesis
"backoffPolicy": {
"minBackoff": 1000 milliseconds
"maxBackoff": 10000 milliseconds
"maxRetries": 10
}
# Optional. Maximum amount of time an enriched event may spend being buffered before it gets sent
"maxBufferedTime": 100 millis
# Optional. The KPL will consider a record failed if it cannot be sent within this deadline.
# The KPL then yields back to the JVM, which will log the error, and might retry sending.
"recordTtl": 20 seconds
# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
"collection": {
# Maximum number of Kinesis records to pack into a PutRecords request
"maxCount": 500
# Maximum amount of data to send with a PutRecords request
"maxSize": 5242880
}
# Optional. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html
# If not specified, aggregation is deactivated
#"aggregation": {
# # Maximum number of enriched events to pack into an aggregated Kinesis record
# "maxCount": 4294967295
#
# # Maximum number of bytes to pack into an aggregated Kinesis record
# "maxSize": 51200
#}
# Optional. Maximum number of connections to open in the backend.
# HTTP requests are sent in parallel over multiple connections.
# Setting this too high may impact latency and consume additional resources
# without increasing throughput
"maxConnections": 24
# Optional. Minimum level of logs for the native KPL daemon.
# Logs show up on stderr
# Possible values: trace | debug | info | warning | error
"logLevel": "warning"
# Optional. Use a custom Kinesis endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS.
# Needs to be specified along with customPort
# "customEndpoint": "localhost"
# Optional. Server port to connect to for Kinesis.
# Needs to be specified along with customEndpoint
# "customPort": 4566
# Optional. Use a custom Cloudwatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses.
# There is no way to disable TLS
# Needs to be specified along with cloudwatchPort
# "cloudwatchEndpoint": "localhost"
# Optional. Server port to connect to for CloudWatch.
# Needs to be specified along with cloudwatchPort
# "cloudwatchPort": 4582
}
}
# Optional. Concurrency of the app
"concurrency" : {
# Number of events that can get enriched at the same time within a chunk
"enrich": 256
# Number of chunks that can get sunk at the same time
# WARNING: if greater than 1, records can get checkpointed before they are sunk
"sink": 1
}
}
Or might there be some other way to solve the Java heap-space issue we are currently facing, maybe someone could points us into the right direction.
Thanks in advance
Dieter