Hi, I’m trying to build lambda architecture with AWS RT pipeline and so far got the collecting and enrichment nailed.
My troubles start with ES loaders. My understanding is that you need to be running 2 instances of the elasticsearch-loader, one consuming the good stream and one for the bad stream. Is it reasonable to have scala enrich + good es loader + bad es loader + s3 loader for batch pipeline in the same ec2 instance within ASG?
The issue I’m having is that the loaders seem to be idle, though not producing any errors. Is there any debug mode?
enrich log:
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Writing 2 records to Kinesis stream enriched_good
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sinks.KinesisSink - Successfully wrote 2 out of 2 records
es loader good:
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Starting LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization complete. Starting worker loop.
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - No activities assigned
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping …
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - No activities assigned
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping …
Here’s my good loader config:
source = kinesis
sink {
good = elasticsearch
bad = stderr
}
enabled = good
aws {
accessKey = iam
secretKey = iam
}
nsq {
channelName = raw
host = localhost
port = 4150
lookupPort = 4160
}
kinesis {
initialPosition = TRIM_HORIZON
maxRecords = 1000
region = eu-central-1
appName = "snowplow-enrich"
}
streams {
inStreamName = enriched_good
outStreamName = enriched_bad
buffer {
recordLimit = 500
}
}
elasticsearch {
client {
endpoint = "127.0.0.1"
port = "9200"
maxTimeout = 10
ssl = false
}
aws {
signing = false
region = eu-central-1
}
cluster {
name = db
index = snowplow
clusterType = enriched
}
}
EDIT: I’m using snowplow-elasticsearch-loader-http-0.10.1
snowplow-stream-enrich-0.13.0.
small cluster with elasticsearch 6.2.2