Hey,
Firstly this is my first time attempting to set up Snowplow.
I am running a scala collector which sends custom structured events to kinesis, which the stream enricher needs to validate and send into the enriched kinesis stream. I am not doing any additional enriching, I just want to get the events through the stream enricher so I can load them into Redshift.
I am expecting about 10 events in a clump every 10 seconds.
I have attempted to follow the setup guide a few times, but I am not overly sure about the config file and the resolver file.
I am not getting any errors, just the events are being transferred onto the bad kinesis stream.
config file:
enrich {
# Sources currently supported are:
# 'kinesis' for reading Thrift-serialized records from a Kinesis stream
# 'kafka' for reading Thrift-serialized records from a Kafka topic
# 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
source = kinesis
# Sinks currently supported are:
# 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
# 'kafka' for writing enriched events to one Kafka topic and invalid events to another.
# 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
# Using "sbt assembly" and "java -jar" is recommended to disable sbt logging.
sink = kinesis
# AWS credentials
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = default
secretKey = default
}
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = "raw-stream"
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = "enriched-stream"
# Stream/topic where the event that failed enrichment will be stored
bad = "bad-stream"
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
partitionKey = event_id
}
kinesis {
# Region where the streams are located
region = "us-east-1"
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 1000
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
initialPosition = TRIM_HORIZON
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "2017-05-17T10:00:00Z"
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 300
maxBackoff = 60000
}
}
# Kafka configuration
kafka {
brokers = "{{kafkaBrokers}}"
# Number of retries to perform before giving up on sending a record
retries = 0
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 1000000
recordLimit = 100 # Not supported by Kafka; will be ignored
timeLimit = 60000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
appName = "app"
}
# Optional section for tracking endpoints
#monitoring {
# snowplow {
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = {{enrichAppName}}
# method = GET
# }
#}
}
And the resolver is just the default one I found:
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
}
]
}
}
The output in the terminal running the enricher:
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Initializing record processor for shard: shardId-000000000000
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Processing 10 records from shardId-000000000000
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 10 records to Kinesis stream bad-stream
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 10 out of 10 records
Any help would be greatly appreciated.
Edit:
After playing around with my tracker I am now getting a error message through the bad stream:
"errors":[{"level":"error","message":"Querystring is empty: no raw event to process"}],"failure_tstamp":"2017-11-22T20:33:48.277Z"}
I am note sure what this actually means