Similar to this post: Snowplow docker s3-load work incorrect
I’m getting the same behaviour as the mentioned post. Upon running the loader, the following happens:
$ sudo docker run -v $PWD/snowplow/config:/snowplow/config snowplow/snowplow-s3-loader:0.7.0 --config /snowplow/config/config.hocon
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream temp-corvidae-s3-loader-bad exists and is active
[main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=eu-west-1, s3Endpoint=https://s3-eu-west-1.amazonaws.com, kinesisInputStream=temp-corvidae-snowplow-good, maxRecords=100, connectorDestination=s3, bufferMillisecondsLimit=10000, bufferRecordCountLimit=50, s3Bucket=qc-playground, kinesisEndpoint=https://kinesis.eu-west-1.amazonaws.com, appName=s3-loader, bufferByteSizeLimit=1000000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
The mentioned post says giving the instance full resource permissions (resource not specified) fixed the issue, however I have given my instance full permissions for S3, Kinesis and DynamoDB yet still fails silently. Nothing is pulled out of the Kinesis data streams, there is not activity in S3 but there is activity on DynamoDB as can be seen below.
Have I configured the loader correctly or am I missing something?
The config.hocon
file is detailed below:
# Default configuration for s3-loader
# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"
# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = "iam"
secretKey = "iam"
}
# Config for NSQ
nsq {
# Channel name for NSQ source
# If more than one application reading from the same NSQ topic at the same time,
# all of them must have unique channel name for getting all the data from the same topic
channelName = "{{nsqSourceChannelName}}"
# Host name for NSQ tools
host = "{{nsqHost}}"
# HTTP port for nsqd
port = 0
# HTTP port for nsqlookupd
lookupPort = 0
}
kinesis {
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = "TRIM_HORIZON"
# Need to be specified when initialPosition is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
# initialTimestamp = "{{timestamp}}"
# Maximum number of records to read per GetRecords call
maxRecords = 100
region = "eu-west-1"
# "appName" is used for a DynamoDB table to maintain stream state.
appName = "s3-loader"
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
}
streams {
# Input stream name
inStreamName = "temp-snowplow-good"
# Stream for events for which the storage process fails
outStreamName = "temp-s3-loader-bad"
# Events are accumulated in a buffer before being sent to S3.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 1000000 # Not supported by NSQ; will be ignored
recordLimit = 50
timeLimit = 10000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "eu-west-1"
bucket = "bucket"
# optional bucket where to store partitioned data
# partitionedBucket = "bucket/partitioned"
# optional date format prefix for directory pattern
# eg: {YYYY}/{MM}/{dd}/{HH}
dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
# optional directory structure to use while storing data on s3 (followed by dateFormat config)
# eg: outputDirectory = "enriched/good/"
outputDirectory = "snowplow-good"
# optional filename prefix
# eg: output
filenamePrefix = "s3-loader"
# Format is one of lzo or gzip
# Note, that you can use gzip only for enriched data stream.
format = "gzip"
# Maximum Timeout that the application is allowed to fail for (in milliseconds)
maxTimeout = 10000
## Optional endpoint url configuration to override aws s3 endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
}
# Optional section for tracking endpoints
# monitoring {
# snowplow{
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = "{{appName}}"
# method = "{{method}}"
# }
# }