Hi there,
I am trying to set up this configuration as a demo. tracker -> collector -> enrich -> load to s3 using Kinesis streams.
I can see that both the collector stream (stream-good) and the enrich stream (stream-enrich-2) both have data, but nothing gets logged to the terminal and no data is put into the S3 bucket when I run the s3-loader. However, using the same loader config file, but just changing the input stream to the collector stream (stream-good), the process works. Any help would be appreciated.
To me, it seems like there is something I’m doing wrong in my enrich configuration because I seem to be able to get the flow to work if I set the source stream to my collector stream.
Thanks, Tim
–
Here are the conf files for the collector, enricher, and loader
Collector
collector {
interface = "0.0.0.0"
port = 8000
ssl {
enable = false
redirect = false
port = 9543
}
paths {
}
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
crossDomain {
enabled = false
domains = [ "*" ]
secure = true
}
cookie {
enabled = true
expiration = "365 days" # e.g. "365 days"
name = "sp"
domains = [
"website.org"
]
fallbackDomain = "website.org"
secure = false
httpOnly = false
}
doNotTrackCookie {
enabled = false
name = ""
value = ""
}
cookieBounce {
enabled = false
name = "n3pc"
forwardedProtocolHeader = "X-Forwarded-Proto"
}
enableDefaultRedirect = true
redirectMacro {
enabled = false
}
rootResponse {
enabled = false
statusCode = 302
headers = {
Location = "https://127.0.0.1/",
X-Custom = "something"
}
body = "302, redirecting"
}
cors {
accessControlMaxAge = 5 seconds
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "stream-good"
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = "stream-bad"
useIpAddressAsPartitionKey = false
sink {
enabled = kinesis
region = "us-west-2"
threadPoolSize = 10
aws {
accessKey = env
secretKey = env
}
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
}
buffer {
byteLimit = 500000
recordLimit = 1000
timeLimit = 10000
}
}
}
Enrich
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = "stream-good"
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = "stream-enrich-2"
# Stream/topic where the event that failed enrichment will be stored
bad = "stream-enrich-fail"
# Stream/topic where the pii tranformation events will end up
pii = "stream-enrich-pii"
partitionKey = "event_id"
}
sourceSink {
enabled = "kinesis"
region = "us-west-2"
aws {
accessKey = "env"
secretKey = "env"
}
maxRecords = 10000
initialPosition = "TRIM_HORIZON"
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
buffer {
byteLimit = 500000
recordLimit = 1000 # Not supported by Kafka; will be ignored
timeLimit = 10000
}
appName = "myApp"
}
}
S3-loader
source = "kinesis"
sink = "kinesis"
aws {
accessKey = "env"
secretKey = "env"
}
nsq {
channelName = ""
host = 0.0.0.0
port = 0
lookupPort = 0
}
kinesis {
initialPosition = "TRIM_HORIZON"
initialTimestamp = "{{timestamp}}"
maxRecords = 10000
region = "us-west-2"
appName = "myApp-s3"
}
streams {
inStreamName = "stream-enrich-2"
outStreamName = "stream-storage-fail"
buffer {
byteLimit = 500000 # Not supported by NSQ; will be ignored
recordLimit = 1000
timeLimit = 10000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "us-west-2"
bucket = "buckket-2"
dateFormat = "{YYY}-{MM}-{dd}-{HH}"
format = "gzip"
maxTimeout = 30000
}