Hi,
I am using minikube and localstack to setup a snowplow test pipeline.
I managed to set up the tracker (a python tracker), collector and enrichment module including streams. It writes record successfully to the streams.
Now I am stuck with the s3 loader part. The loader log in the console provides generic warning and info messages but there is no error and no records are written into the s3 bucket on localstack.
loader → 2020-05-16T11:54:12.993Z → [main] INFO com.snowplowanalytics.s3.loader.sinks.KinesisSink - Stream bad-stream-loader exists and is active
loader → 2020-05-16T11:54:13.010Z → [main] INFO com.snowplowanalytics.s3.loader.S3Loader$ - Initializing sink with KinesisConnectorConfiguration: {regionName=eu-west-1, s3Endpoint=localstack:4566, kinesisInputStream=good-stream-enriched, maxRecords=50, connectorDestination=s3, bufferMillisecondsLimit=2000, bufferRecordCountLimit=2, s3Bucket=test-bucket-loader, kinesisEndpoint=localstack:4566, appName=oneapp-loader, bufferByteSizeLimit=10000, retryLimit=1, initialPositionInStream=TRIM_HORIZON}
loader → 2020-05-16T11:54:13.277Z → [main] INFO com.snowplowanalytics.s3.loader.KinesisSourceExecutor - KinesisSourceExecutor worker created
I already figured that the corresponding dynamodb table is not created in localstack. The dynamodb table creation works fine in the enrichment module (table name differs: “oa-enrich”):
I am using the following config:
# Default configuration for s3-loader
# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = kinesis
# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = kinesis
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = env
secretKey = env
}
# Config for NSQ
nsq {
# Channel name for NSQ source
# If more than one application reading from the same NSQ topic at the same time,
# all of them must have unique channel name for getting all the data from the same topic
channelName = "not_used"
# Host name for NSQ tools
host = "not_used"
# HTTP port for nsqd
port = 0000
# HTTP port for nsqlookupd
lookupPort = 0000
}
kinesis {
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only affects the first run of this application on a stream.
initialPosition = TRIM_HORIZON
# Need to be specified when initialPosition is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
# initialTimestamp = "{{timestamp}}"
# Maximum number of records to read per GetRecords call
maxRecords = 50
region = ${LOADER_AWS_REGION}
# "appName" is used for a DynamoDB table to maintain stream state.
appName = oa-loader
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
customEndpoint = "localstack:4566"
}
streams {
# Input stream name
inStreamName = ${LOADER_STREAMS_GOOD}
# Stream for events for which the storage process fails
outStreamName = ${LOADER_STREAMS_BAD}
# Events are accumulated in a buffer before being sent to S3.
# The buffer is emptied whenever:
# - the combined size of the stored records exceeds byteLimit or
# - the number of stored records exceeds recordLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit
buffer {
byteLimit = ${LOADER_STREAMS_BUFFER_BYTE_LIMIT} # Not supported by NSQ; will be ignored
recordLimit = ${LOADER_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = ${LOADER_STREAMS_BUFFER_TIME_LIMIT} # Not supported by NSQ; will be ignored
}
}
s3 {
region = ${LOADER_AWS_REGION}
bucket = test-bucket-loader
# optional bucket where to store partitioned data
#partitionedBucket = "{{s3bucket}}/partitioned"
# optional date format prefix for directory pattern
# eg: {YYYY}/{MM}/{dd}/{HH}
#dateFormat = "{{s3DateFormat}}"
# optional directory structure to use while storing data on s3 (followed by dateFormat config)
# eg: outputDirectory = "enriched/good/"‚
#outputDirectory = "{{s3OutputDirectory}}"
# optional filename prefix
# eg: output
#filenamePrefix = "oa"
# Format is one of lzo or gzip
# Note, that you can use gzip only for enriched data stream.
format = gzip
# Maximum Timeout that the application is allowed to fail for (in milliseconds)
maxTimeout = 60000
## Optional endpoint url configuration to override aws s3 endpoints,
## this can be used to specify local endpoints when using localstack
customEndpoint = "localstack:4566"
}
# Optional section for tracking endpoints
# monitoring {
# snowplow{
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = "{{appName}}"
# method = "{{method}}"
# }
# }
After enabling log4j I realised that the loader tries to connect to the wrong dynamodb endpoint:
https://dynamodb.eu-west-1.amazonaws.com
loader → 2020-05-16T16:43:11.100Z → 2020-05-16 16:43:11,100 [main] DEBUG com.amazonaws.request - Sending Request: POST https://dynamodb.eu-west-1.amazonaws.com /
The endpoint should be `https//localstack:4566 like for the s3 and kinesis resources. This works for fine for enrichment but not for the s3 loader? How can I set the endpoint for dynamodb correctly?
I would really appreciate your help.