S3 Sink and S3 Sink Bad not serializing object

Hi Team,

S3 Sink and S3 Sink Bad not serializing object. Logs showing the message as -
‘Flushing buffer with XXXX records’. Data is not getting pushed to S3 bucket.

Configuration file:

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "iam"
  secretKey = "iam"
}

# Config for NSQ
nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
channelName = "{{nsqSourceChannelName}}"

  # Host name for NSQ tools
host = "{{nsqHost}}"

  # HTTP port for nsqd
port = 80

  # HTTP port for nsqlookupd
lookupPort = 82
}

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = "TRIM_HORIZON"

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  #initialTimestamp = AT_TIMESTAMP

  # Maximum number of records to read per GetRecords call
  maxRecords = 10000

  region = "us-east-1"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "SnowplowLzoS3Sink-AAAAAAA"

  ## Optional endpoint url configuration to override aws kinesis endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}

streams {
  # Input stream name
  inStreamName = "InputStream_Name_raw"

  # Stream for events for which the storage process fails
  outStreamName = "OutputStream_Name_rejected"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 50000
    recordLimit = 100
    timeLimit = 60000
  }
}

s3 {
  region = "us-east-1"
  bucket = "S3_Bucket_Name/raw"
  # optional bucket where to store partitioned data
  #partitionedBucket = "{{s3bucket}}/partitioned"

  # optional date format prefix for directory pattern
  # eg: {YYYY}/{MM}/{dd}/{HH}
  #dateFormat = "{{s3DateFormat}}"

  # optional directory structure to use while storing data on s3 (followed by dateFormat config)
  # eg: outputDirectory = "enriched/good/"
  #outputDirectory = "{{s3OutputDirectory}}"

  # optional filename prefix
  # eg: output
  #filenamePrefix = "{{s3DFilenamePrefix}}"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "lzo"

  # Maximum Timeout that the application is allowed to fail for (in milliseconds)
  maxTimeout = 10000

  ## Optional endpoint url configuration to override aws s3 endpoints,
  ## this can be used to specify local endpoints when using localstack
  # customEndpoint = {{kinesisEndpoint}}
}


# Optional section for tracking endpoints
#monitoring {
#  snowplow{
#    collectorUri = "{{collectorUri}}"
#    collectorPort = 80
#    appId = "{{appName}}"
#    method = "{{method}}"
#  }
#}

Note : App names for all the applications are unique and different. Also Stream names mentioned are dummy, actual streams are valid and verified.

snowplow-s3-loader-0.6.0.jar is in use.

S3 Sink Logs :

[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.
[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 27695 records.

Hey @sp_user ,

Your config seems fine (not sure why nsq is configured).

I’d recommend using latest 2.0.0 version.

If we upgrade S3 Sink application with 2.0 , do we need to upgrade other applications - Stream_enrich , Elasticsearch_sink as well ??

I’d recommend upgrading them as well since we introduced new bad rows format since then.

Take a look at our recommended component versions for a modern Snowplow experience.

Regards.

Thanks. Just have other question related initial one. As you mentioned that configuration wise its fine. But not sure why buffer is not getting flushed off after certain duration. Logs are getting increased with ‘[RecordProcessor-0000] INFO com.snowplowanalytics.s3.loader.S3Emitter - Flushing buffer with 1012xx which is greater than what it is configured.’ Neither its is synchronizing with S3 nor buffer is getting cleaned off.

Note:
All the required permission are granted to EC2 instance.

buffer {
byteLimit = 50000
recordLimit = 100
timeLimit = 60000
}

Hi oguzhanunlu , please provide your suggestion.