Hi, Right now I’m trying to automate our snowplow pipeline. Currently the stack is:
ELB > Collector Group > Kinesis Stream 1 > Enricher Group > Kinesis Stream 2 > S3 Loader > S3 > datarunner (same process as the etlemrrunner) > Redshift
Where the Collectors and Enrichers are in aws autoscale groups and the S3 loader is on an ec2 instance.
The Problem is our S3 Loader fails to get data from Kinesis Stream 2. I send 1,000,000 structured events from a python test tracker, and I see the data pass through the rest of the pipeline from collectors to enrichers to streams. But no get calls are made to kinesis stream 2 and all I get from the S3 loader are some very generic logs:
[ec2-user@ip-172-31-34-147 ~]$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-s3-loader-0.6.0.jar --config s3loader.conf
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream stream-bad-data-to-s3 exists and is active
[main] INFO com.snowplowanalytics.s3.loader.SinkApp$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-west-2, s3Endpoint=https://s3-us-west-2.amazonaws.com, kinesisInputStream=stream-enricher-to-s3, maxRecords=500, connectorDestination=s3, bufferMillisecondsLimit=9000, bufferRecordCountLimit=200, s3Bucket=piv-stream-data-prod-bucket, kinesisEndpoint=https://kinesis.us-west-2.amazonaws.com, appName=piv-data, bufferByteSizeLimit=12000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 0 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Successfully serialized 0 records out of 0
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 0 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Successfully serialized 0 records out of 0
...
I used the config here as a template since I’m planning to run the thing in a docker container for automation (right now it’s not). Here’s my config:
# Prod configuration for PIV s3 Loader
# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"
# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = "iam"
secretKey = "iam"
}
# Config for NSQ
nsq {
channelName = "{{dummy}}"
host = "{{nsqHost}}"
# TCP port for nsqd, 4150 by default
port = 4150
# Host name for lookupd
lookupHost = "{{lookupHost}}"
# HTTP port for nsqlookupd, 4161 by default
lookupPort = 4161
}
kinesis {
initialPosition = "TRIM_HORIZON"
# initialTimestamp = AT_TIMESTAMP
# Maximum number of records to read per GetRecords call
# Chosen per Josh recommendation at:
# https://discourse.snowplow.io/t/350k-rpm-of-throughput-with-stream-collector-kinesis/103
maxRecords = 100
region = "us-west-2"
# "appName" is used for a DynamoDB table to maintain stream state.
appName = "piv-data"
}
streams {
# Input stream name
inStreamName = "stream-enricher-to-s3"
# Stream for events for which the storage process fails
outStreamName = "stream-bad-data-to-s3"
# Events are accumulated in a buffer before being sent to S3.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 12000 # Not supported by NSQ; will be ignored
recordLimit = 200
timeLimit = 60 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "us-west-2"
bucket = "piv-stream-data-prod-bucket"
# Format is one of lzo or gzip
# Note, that you can use gzip only for enriched data stream.
format = "gzip"
# Maximum Timeout that the application is allowed to fail for
maxTimeout = 5000
}
My guess is that it’s a credentials problem but IAM role used to run the instance has full access to Kinesis and even if it couldn’t connect to the stream I’d imagine there’d be some sort of error popping up. And ideas would be a great help.
Thanks!