S3 loader writes same amount of records

We have varying levels of collector traffic throughout the day and this is confirmed both at the kinesis data stream and also at the HTTPS level. However we are seeing that the s3 loader is writing the same amount of records every hour.

This causes the manifest after ETL to be extremely uniform down to the thousand which is very suspect.

We run the s3 loader using an ECS scheduled job which starts and stops our s3 loader docker container every time.

Is our configuration correct?

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = "TRIM_HORIZON"

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  initialTimestamp = "{{timestamp}}"

  # Maximum number of records to read per GetRecords call
  maxRecords = 10

  region = "us-east-1"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "s3-loader"

  ## Optional endpoint url configuration to override aws kinesis endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

streams {
  # Input stream name
  inStreamName = "snowplow-raw-good"

  # Stream for events for which the storage process fails
  outStreamName = "snowplow-s3-bad"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 104857600 # 100Mb
    recordLimit = 1000000 # 1M records
    timeLimit = 3600000 # 1 hour
  }
}

s3 {
  region = "us-east-1"
  bucket = "test-snowplow"
  # optional directory structure to use while storing data on s3, you can place date format strings which will be
  # replaced with date time values
  # eg: directoryPattern = "enriched/good/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"
  # directoryPattern = "enriched/good/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"
  # directoryPattern = ""

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "lzo"

  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 10000

  ## Optional endpoint url configuration to override aws s3 endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

Correction to previous post, i do run as a ECS task, but it’s not scheduled.

Followup question:

  buffer {
    byteLimit = 104857600 # 100Mb
    recordLimit = 1000000 # 1M records
    timeLimit = 3600000 # 1 hour
  }

Q1. Does anyone know if the byte limit is uncompressed?
Q2. Is there a way we can enable more logging on which buffer rule it hit to cause a write?

@ihor
I am running kinesis -s3 loader with below configuration .

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "iam"
  secretKey = "iam"
}

# Config for NSQ
nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  channelName = "{{nsqSourceChannelName}}"
    
  # Host name for NSQ tools
  host = "{{nsqHost}}"

  # HTTP port for nsqd
  port = 4343

  # HTTP port for nsqlookupd
  lookupPort = 34354
}

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = "TRIM_HORIZON"

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  # initialTimestamp = "{{timestamp}}"

  # Maximum number of records to read per GetRecords call     
  maxRecords = 10000

  region = "us-west-2"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "kinesis-s3-dynamodb"

  ## Optional endpoint url configuration to override aws kinesis endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

streams {
  # Input stream name
  inStreamName = "Enriched"

  # Stream for events for which the storage process fails
  outStreamName = "Bad"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 50000 # Not supported by NSQ; will be ignored
    recordLimit = 10
    timeLimit = 6000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "us-west-2"
  bucket = "snowplow-kinesis-s3"
  # optional directory structure to use while storing data on s3, you can place date format strings which will be
  # replaced with date time values
  # eg: directoryPattern = "enriched/good/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"
  directoryPattern = "enriched/good"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 50000

  ## Optional endpoint url configuration to override aws s3 endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}
logging {
lever = "DEBUG"
}
# Optional section for tracking endpoints
# monitoring {
#   snowplow{
#     collectorUri = "{{collectorUri}}"
#     collectorPort = 80 
#     appId = "{{appName}}"
#     method = "{{method}}"
#   }
# }

I have a doubt.
This is batch processing job as per the snowplow lambda architecture.
While running this job , it does not kill itself when timeouts.
Can you please tell a bit about the reason.

Hi @anshratn1997 - the Kinesis S3 Loader is a realtime application that perpetually loads data from Kinesis down to S3 - it batches up chunks of data but it is an always on system as it is consuming from a stream. There is no end as such!

You run batch processes on the data that this application pushes to S3 (which will end after execution). But this is very much a streaming application.

Hope that helps clarify!