S3 Loader Not Loading Data from Stream

Hi, Right now I’m trying to automate our snowplow pipeline. Currently the stack is:

ELB > Collector Group > Kinesis Stream 1 > Enricher Group > Kinesis Stream 2 > S3 Loader > S3 > datarunner (same process as the etlemrrunner) > Redshift

Where the Collectors and Enrichers are in aws autoscale groups and the S3 loader is on an ec2 instance.

The Problem is our S3 Loader fails to get data from Kinesis Stream 2. I send 1,000,000 structured events from a python test tracker, and I see the data pass through the rest of the pipeline from collectors to enrichers to streams. But no get calls are made to kinesis stream 2 and all I get from the S3 loader are some very generic logs:

[ec2-user@ip-172-31-34-147 ~]$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar snowplow-s3-loader-0.6.0.jar --config s3loader.conf
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream stream-bad-data-to-s3 exists and is active
[main] INFO com.snowplowanalytics.s3.loader.SinkApp$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-west-2, s3Endpoint=https://s3-us-west-2.amazonaws.com, kinesisInputStream=stream-enricher-to-s3, maxRecords=500, connectorDestination=s3, bufferMillisecondsLimit=9000, bufferRecordCountLimit=200, s3Bucket=piv-stream-data-prod-bucket, kinesisEndpoint=https://kinesis.us-west-2.amazonaws.com, appName=piv-data, bufferByteSizeLimit=12000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 0 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Successfully serialized 0 records out of 0
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 0 records.
[RecordProcessor-0001] INFO com.snowplowanalytics.s3.loader.S3Emitter - Successfully serialized 0 records out of 0
...

I used the config here as a template since I’m planning to run the thing in a docker container for automation (right now it’s not). Here’s my config:

# Prod configuration for PIV s3 Loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "iam"
  secretKey = "iam"
}

# Config for NSQ
nsq {
  channelName = "{{dummy}}"

  host = "{{nsqHost}}"

  # TCP port for nsqd, 4150 by default
  port = 4150

  # Host name for lookupd
  lookupHost = "{{lookupHost}}"

  # HTTP port for nsqlookupd, 4161 by default
  lookupPort = 4161
}

kinesis {

  initialPosition = "TRIM_HORIZON"


  # initialTimestamp = AT_TIMESTAMP

  # Maximum number of records to read per GetRecords call
  # Chosen per Josh recommendation at:
  # https://discourse.snowplow.io/t/350k-rpm-of-throughput-with-stream-collector-kinesis/103     
  maxRecords = 100

  region = "us-west-2"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "piv-data"
}

streams {
  # Input stream name
  inStreamName = "stream-enricher-to-s3"

  # Stream for events for which the storage process fails
  outStreamName = "stream-bad-data-to-s3"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 12000 # Not supported by NSQ; will be ignored
    recordLimit = 200
    timeLimit = 60 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "us-west-2"
  bucket = "piv-stream-data-prod-bucket"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for
  maxTimeout = 5000
}

My guess is that it’s a credentials problem but IAM role used to run the instance has full access to Kinesis and even if it couldn’t connect to the stream I’d imagine there’d be some sort of error popping up. And ideas would be a great help.

Thanks!

PS: This stack used to work. It stopped working after I setup the kinesis streams with cloudformation configs managed by ansible. However, I tried deleting and creating new kinesis streams and that didn’t seem to do anything to fix things.

I realize now my folly: the app name needs to be different between the enricher and loader ergo the 2 dynamoDB tables were conflicting. Everything makes so much sense now…

2 Likes