Hello everyone
I am doing a POC in snowplow. In single ec2 I had deployed collector,enricher,s3loader
Initially I was using iglu central repo in enricher config, now I tried to change repo to s3 for this I had changed s3 to be publicly accessible and enables Static website hosting in particular bucket.
Collector and loader application is working fine when i tried to run enricher application following error occurs
Error
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.common.fs2.config.ParsedConfigs - Parsed Iglu Client with following registries: AWS S3 Schema Repository
[pool-1-thread-2] ERROR com.snowplowanalytics.snowplow.enrich.common.fs2.Run - CLI arguments valid but some of the configuration is not correct. Error: Cannot decode enrichments {“error”:“ResolutionError”,“lookupHistory”:[{“repository”:“AWS S3 Schema Repository”,“errors”:[{“error”:“NotFound”}],“attempts”:1,“lastAttempt”:“2022-08-04T07:14:40.627Z”},{“repository”:“Iglu Client Embedded”,“errors”:[{“error”:“NotFound”}],“attempts”:1,“lastAttempt”:“2022-08-04T07:14:40.665Z”}]}
Collector Config config.kinesis.hocon
collector {
interface = "ec2-private-ip"
port = 8080
paths {
"/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
}
doNotTrackCookie {
enabled = false
#enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
# name = {{doNotTrackCookieName}}
name = collector-do-not-track-cookie
# value = {{doNotTrackCookieValue}}
value = collector-do-not-track-cookie-value
}
streams {
good = "kinesis-poc"
bad = "kinesis-poc"
sink {
enabled = "kinesis"
threadPoolSize = 10
region = "us-east-1"
aws {
accessKey = "iam"
secretKey = "iam"
}
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
}
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
}
akka {
loglevel = WARNING
loggers = ["akka.event.slf4j.Slf4jLogger"]
http.server {
remote-address-header = on
raw-request-uri-header = on
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
illegal-header-warnings = off
}
max-connections = 2048
}
coordinated-shutdown {
run-by-jvm-shutdown-hook = off
}
}
resolver.json
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-2",
"data": {
"cacheSize": 500,
"cacheTtl": 60,
"repositories": [
{
"name": "AWS S3 Schema Repository",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://cs-poc.s3.amazonaws.com"
}
}
}
]
}
}
enricher.config.hocon
{
"input": {
"streamName": "kinesis-poc"
"region": "us-east-1"
}
"output": {
"good": {
"streamName": "cs-poc-enriched-events-stream"
}
"bad": {
"streamName": "cs-poc-enriched-events-stream"
}
}
}
S3loader config
{
# Optional, but recommended
"region": "us-east-1",
# Options are: RAW, ENRICHED_EVENTS, JSON
# RAW simply sinks data 1:1
# ENRICHED_EVENTS work with monitoring.statsd to report metrics (identical to RAW otherwise)
# SELF_DESCRIBING partitions self-describing data (such as JSON) by its schema
"purpose": "ENRICHED_EVENTS",
# Input Stream config
"input": {
# Kinesis Client Lib app name (corresponds to DynamoDB table name)
"appName": "acme-s3-loader",
# Kinesis stream name
"streamName": "cs-poc-enriched-events-stream",
# Options are: LATEST, TRIM_HORIZON, AT_TIMESTAMP
"position": "LATEST",
# Max batch size to pull from Kinesis
"maxRecords": 10
},
"output": {
"s3": {
# Full path to output data
"path": "s3://cs_poc_out/cs_demo_output/",
# Partitioning format; Optional
# Valid substitutions are {vendor}, {schema}, {format}, {model} for self-describing jsons
# and {yy}, {mm}, {dd}, {hh} for year, month, day, hour
#partitionFormat: "date={yy}-{mm}-{dd}"
# Prefix for all file names; Optional
"filenamePrefix": "raw_data",
# Maximum Timeout that the application is allowed to fail for, e.g. in case of S3 outage
"maxTimeout": 2000,
# Output format; Options: GZIP, LZO
"compression": "GZIP"
},
# Kinesis Stream to output failures
"bad": {
"streamName": "cs-poc-enriched-events-stream"
}
},
# Flush control. A first limit the KCL worker hits will trigger flushing
"buffer": {
# Maximum bytes to read before flushing
"byteLimit": 2048,
# Maximum records to read before flushing
"recordLimit": 10,
# Maximum time between flushes
"timeLimit": 5000
}
}
Note: For the sake of poc I had used same stream for both good and bad streams