Hi,
I was trying to add monitoring.metrics.statsd conf in s3loader-2.2.5 deployed in kubernetes container
config.hocon
{
# Optional, but recommended
"region": ${envregion}
# Options are: RAW, ENRICHED_EVENTS, JSON
# RAW simply sinks data 1:1
# ENRICHED_EVENTS work with monitoring.statsd to report metrics (identical to RAW otherwise)
# SELF_DESCRIBING partitions self-describing data (such as JSON) by its schema
"purpose": "ENRICHED_EVENTS"
# Input Stream config
"input": {
# Kinesis Client Lib app name (corresponds to DynamoDB table name)
"appName": "snowplow-s3-loader-kinesis"
# Kinesis stream name
"streamName": ${enrich_good}
# Options are: LATEST, TRIM_HORIZON, AT_TIMESTAMP
"position": "LATEST"
# Max batch size to pull from Kinesis
"maxRecords": 10000
}
"output": {
"s3": {
# Full path to output data
"path": ${s3path}
# Partitioning format; Optional
# Valid substitutions are {vendor}, {schema}, {format}, {model} for self-describing jsons
# and {yy}, {mm}, {dd}, {hh} for year, month, day, hour
partitionFormat: "year={yy}"/"month={mm}"
# Prefix for all file names; Optional
"filenamePrefix": ${prefix}
# Maximum Timeout that the application is allowed to fail for, e.g. in case of S3 outage
"maxTimeout": 2000
# Output format; Options: GZIP, LZO
"compression": "GZIP"
}
# Kinesis Stream to output failures
"bad": {
"streamName": ${enrich_bad}
}
}
# Flush control. A first limit the KCL worker hits will trigger flushing
"buffer": {
# Maximum bytes to read before flushing
"byteLimit": 67108864
# Maximum records to read before flushing
"recordLimit": 20000
# Maximum time between flushes
"timeLimit": 600000
}
# Optional
"monitoring": {
"metrics": {
# StatsD-powered metrics; Optional
"statsd": {
"hostname": "localhost",
"port": 8125,
"tags": {
# It can resolve environment variables
# "worker": ${HOST}
}
# Optional
"prefix": ${log_prefix}
}
}
}
}
No UDP port is present in container
Response of netstat -a in loader container
/snowplow-loader # netstat -a
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp 0 0 clickstream-etl-s3loader-5f758b5588-bcgbm:57178 ec2-3-227-250-206.compute-1.amazonaws.com:https ESTABLISHED
tcp 0 0 clickstream-etl-s3loader-5f758b5588-bcgbm:39132 52.94.0.104:https ESTABLISHED
Active UNIX domain sockets (servers and established)
Proto RefCnt Flags Type State I-Node Path
unix 2 [ ] STREAM CONNECTED 24179739
unix 2 [ ] STREAM CONNECTED 24179748
I would also like to know if there is any way to write s3loader metrices directly to stdout like enricher
enricher_config.hocon
{
"input": {
"type": "Kinesis"
# Optional. Name of the application which the KCL daemon should assume
"appName": "snowplow-enrich-kinesis"
# Name of the Kinesis stream to read from
"streamName": ${collector_good}
# Optional. Region where the Kinesis stream is located
# This field is optional if it can be resolved with AWS region provider chain.
# It checks places like env variables, system properties, AWS profile file.
# https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
"region": ${envregion}
# Optional, set the initial position to consume the Kinesis stream
# Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# AT_TIMESTAMP: start from the record at or after the specified timestamp
"initialPosition": {
"type": "TRIM_HORIZON"
}
# "initialPosition": {
# "type": "AT_TIMESTAMP"
# "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
# }
# Optional, set the mode for retrieving records.
"retrievalMode": {
"type": "Polling"
# Maximum size of a batch returned by a call to getRecords.
# Records are checkpointed after a batch has been fully processed,
# thus the smaller maxRecords, the more often records can be checkpointed
# into DynamoDb, but possibly reducing the throughput.
"maxRecords": 10000
}
# "retrievalMode": {
# "type": "FanOut"
# }
# Optional. Size of the internal buffer used when reading messages from Kinesis,
# each buffer holding up to maxRecords from above
"bufferSize": 3
# Optional. Settings for backoff policy for checkpointing.
# Records are checkpointed after all the records of the same chunk have been enriched
"checkpointBackoff": {
"minBackoff": 100 milliseconds
"maxBackoff": 10 seconds
"maxRetries": 10
}
# Optional, endpoint url configuration to override aws kinesis endpoints
# Can be used to specify local endpoint when using localstack
# "customEndpoint": "http://localhost:4566"
# Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
# Can be used to specify local endpoint when using localstack
#"dynamodbCustomEndpoint": "http://dynamodb:us-east-1:180648733583:table/snowplow-enrich-kinesis:8000"
# Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
# Can be used to specify local endpoint when using localstack
# "cloudwatchCustomEndpoint": "http://localhost:4582"
}
"output": {
"good": {
"streamName": ${enrich_good}
# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
"recordLimit": 500
# Optional. Limits the number of bytes in a single PutRecords request,
# including records and partition keys.
# Several requests are made in parallel
# Maximum allowed: 5 MB
"byteLimit": 5242880
}
"bad": {
"streamName": ${enrich_bad}
# Optional. Limits the number of events in a single PutRecords request.
# Several requests are made in parallel
# Maximum allowed: 500
"recordLimit": 500
# Optional. Limits the number of bytes in a single PutRecords request,
# including records and partition keys.
# Several requests are made in parallel
# Maximum allowed: 5 MB
"byteLimit": 5242880
}
}
# Optional. Concurrency of the app
"concurrency" : {
# Number of events that can get enriched at the same time within a chunk
"enrich": 256
# Number of chunks that can get sunk at the same time
# WARNING: if greater than 1, records can get checkpointed before they are sunk
"sink": 1
}
#monitoring stdout
"monitoring": {
"metrics": {
"stdout": {
"period": "1 minute"
"prefix": ${log_prefix}
}
}
}
}