Hi. I’m trying to run the stream enricher on Kinesis. Here’s the Command line I’m using:
$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar kinesis/target/scala-2.11/snowplow-stream-enrich-kinesis-0.17.0.jar --config etc/config.hocon --resolver file:etc/iglu_resolver.json --enrichments file:etc/enrichments/
And here’s its output:
$ java -Dorg.slf4j.simpleLogger.defaultLogLevel=debug -jar kinesis/target/scala-2.11/snowplow-stream-enrich-kinesis-0.17.0.jar --config etc/config.hocon --resolver file:etc/iglu_resolver.json --enrichments file:etc/enrichments/
Cannot convert configuration to a com.snowplowanalytics.snowplow.enrich.stream.model$EnrichConfig. Failures are:
at 'streams.sourceSink.enabled':
- (file:/home/aldrin/work/snowplow-r106-acropolis/3-enrich/stream-enrich/etc/config.hocon:48) Key not found.
Here’s my config:
# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = 'sp-prod-c-good'
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = 'sp-prod-e-enriched'
# Stream/topic where the event that failed enrichment will be stored
bad = 'sp-prod-e-bad'
# Stream/topic where the pii tranformation events will end up
pii = 'sp-prod-e-pii'
# How the output stream/topic will be partitioned.
# Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
# user_ipaddress, domain_sessionid, user_fingerprint.
# Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
# possible parittion keys correspond to.
# Otherwise, the partition key will be a random UUID.
# Note: Nsq does not make use of partition key.
partitionKey = 'event_id'
}
# Configuration shown is for Kafka, to use another uncomment the appropriate configuration
# and comment out the other
# To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
# "enabled" which should be set to "stdin".
sourceSink {
# Sources / sinks currently supported are:
# 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
# Kinesis stream
# 'googlepubsub' for reading / writing to a Google PubSub topic
# 'kafka' for reading / writing to a Kafka topic
# 'nsq' for reading / writing to a Nsq topic
# 'stdin' for reading from stdin and writing to stdout and stderr
type = kinesis
# Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
region = 'us-east-1'
# AWS credentials (pertinent to kinesis sink/source type)
# If both are set to 'default', use the default AWS credentials provider chain.
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = default
secretKey = default
}
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10000
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# (pertinent to kinesis source type)
initialPosition = TRIM_HORIZON
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
#initialTimestamp = "{{initialTimestamp}}"
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 256
maxBackoff = 65535
}
# Or Google PubSub
#googleProjectId = my-project-id
## Size of the subscriber thread pool
#threadPoolSize = 4
## Minimum, maximum and total backoff periods, in milliseconds
## and multiplier between two backoffs
#backoffPolicy {
# minBackoff = {{enrichStreamsOutMinBackoff}}
# maxBackoff = {{enrichStreamsOutMaxBackoff}}
# totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
# multiplier = {{enrichStreamsOutTotalBackoff}}
#}
# Or Kafka (Comment out for other types)
#brokers = "{{kafkaBrokers}}"
# Number of retries to perform before giving up on sending a record
retries = 128
# Or NSQ
## Channel name for nsq source
## If more than one application is reading from the same NSQ topic at the same time,
## all of them must have the same channel name
#rawChannel = "{{nsqSourceChannelName}}"
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
## Host name for lookupd
#lookupHost = "{{lookupHost}}"
## HTTP port for nsqlookupd, 4161 by default
#lookupPort = {{nsqlookupdPort}}
}
# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
# a new event enters the buffer
buffer {
byteLimit = 33554432
recordLimit = 2048 # Not supported by Kafka; will be ignored
timeLimit = 60000
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = 'sp-prod-e'
}
# Optional section for tracking endpoints
#monitoring {
# snowplow {
# collectorUri = "{{collectorUri}}"
# collectorPort = 80
# appId = {{enrichAppName}}
# method = GET
# }
#}
}