@josh @ihor , thanks for your help! I’m still have issues: with S3 Loader at current moment. I have such config:
# Default configuration for s3-loader
# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"
# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = "hardcoded_key"
secretKey = "hardcoded_secret_key"
}
# Config for NSQ
nsq {
# Channel name for NSQ source
# If more than one application reading from the same NSQ topic at the same time,
# all of them must have unique channel name for getting all the data from the same topic
channelName = "{{nsqSourceChannelName}}"
# Host name for NSQ tools
host = "{{nsqHost}}"
# HTTP port for nsqd
port = 80
# HTTP port for nsqlookupd
lookupPort = 80
}
kinesis {
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = "TRIM_HORIZON"
# Need to be specified when initialPosition is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "2018-08-01T10:00:00Z"
# Maximum number of records to read per GetRecords call
maxRecords = 200
region = "us-east-1"
# "appName" is used for a DynamoDB table to maintain stream state.
appName = "s3-stage"
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
}
streams {
# Input stream name
inStreamName = "snowplow-events-stage-enriched-good"
# Stream for events for which the storage process fails
outStreamName = "snowplow-events-stage-enriched-bad-s3"
# Events are accumulated in a buffer before being sent to S3.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = 45000 # Not supported by NSQ; will be ignored
recordLimit = 200
timeLimit = 10000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "us-east-1"
bucket = "bucket"
# optional directory structure to use while storing data on s3, you can place date format strings which will be
# replaced with date time values
# eg: directoryPattern = "enriched/good/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"
directoryPattern = "enriched/stream/run_yr={YYYY}/run_mo={MM}/run_dy={DD}"
# Format is one of lzo or gzip
# Note, that you can use gzip only for enriched data stream.
format = "gzip"
# Maximum Timeout that the application is allowed to fail for (in milliseconds)
maxTimeout = 120000
## Optional endpoint url configuration to override aws s3 endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
}
# Optional section for tracking endpoints
monitoring {
snowplow{
collectorUri = "host_of_collector.com"
collectorPort = 443
appId = "snowplow"
method = "get"
}
}
But I get nothing in my S3 bucket. I’m running s3-loader locally and get such output:
java -jar snowplow-s3-loader-0.6.0.jar --config ./examples/config.hocon.sample
log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream snowplow-events-stage-enriched-bad-s3 exists and is active
[main] INFO com.snowplowanalytics.s3.loader.SinkApp$ - Initializing sink with KinesisConnectorConfiguration: {regionName=us-east-1, s3Endpoint=https://s3.amazonaws.com, kinesisInputStream=snowplow-events-stage-enriched-good, maxRecords=200, connectorDestination=s3, bufferMillisecondsLimit=10000, bufferRecordCountLimit=200, s3Bucket=plowbird-test, kinesisEndpoint=https://kinesis.us-east-1.amazonaws.com, appName=s3-stage, bufferByteSizeLimit=45000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
[main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
[INFO] [09/11/2018 19:34:22.670] [snowplow-scala-tracker-akka.actor.default-dispatcher-2] [akka://snowplow-scala-tracker/system/IO-TCP/selectors/$a/1] Message [akka.io.SelectionHandler$ChannelReadable$] from Actor[akka://snowplow-scala-tracker/deadLetters] to Actor[akka://snowplow-scala-tracker/system/IO-TCP/selectors/$a/1#2074028712] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
As @ihor mentioned I need to put:
collectors:
format: "thrift"
But I could not find where should placed this setting, however as far as I understand in case of using Kinesis, messages are transfered in thrift format. Once again my layout in details:
Js Tracker -> ScalaStreamCollector -> Kinesis ( [snowplow-events-stage]) -> StreamEnricher -> Kinesis( [snowplow-events-stage-enriched-good]) -> S3 Loader
I also attached to stream snowplow-events-stage-enriched-good
Kinesis Firehose, that is saving data as TSV to s3 and I clearly could see incoming events. So events are coming, but for some reasons they are only saved by Kinesis Firehose to s3, but not by S3 Loader. I also connected with Kinesis analytics tools to both streams snowplow-events-stag
, snowplow-events-stage-enriched-good
and could see events are passing. In the stream snowplow-events-stage-enriched-bad-s3
I can find anything also.
The only issue that I can see: Kinesis analytics tool show that events after snowplow-events-stage-enriched-good
are going as plain text (TSV), so seems like it is not thrift format. Am I correct? Where should I correct setting for enabling thrift format? Any other issues in my setup?