Hello Support,
I am facing error while running Snowplow elasticsearch sink
I am running following command
command: snowplow-elasticsearch-sink-0.8.0-2x --config snowplow.conf
ERROR
./snowplow-elasticsearch-sink-0.8.0-2x --config snowplow.conf Exception in thread “main” scala.MatchError: ‘elasticsearch’ (of class java.lang.String)
at com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.ElasticsearchSinkApp$delayedInit$body.apply(ElasticsearchSinkApp.scala:123)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.ElasticsearchSinkApp$.main(ElasticsearchSinkApp.scala:69)
at com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.ElasticsearchSinkApp.main(ElasticsearchSinkApp.scala)
Snowplow.conf
sink {
source = ‘kinesis’
sink {
“good”: elasticsearch
"bad": ‘none’
}
stream-type: “good”
aws {
access-key: "key"
secret-key: “key”
}
kinesis {
in { stream-name: "enriched" # Kinesis stream name
# LATEST: most recent data. # TRIM_HORIZON: oldest available data. # Note: This only affects the first run of this application # on a stream. initial-position: "TRIM_HORIZON"
# Maximum number of records to get from Kinesis per call to GetRecords maxRecords: 10000 }
out { # Stream for enriched events which are rejected by Elasticsearch stream-name: "bad" shards: 1 }
region: "us-west-2"
app-name: "snownrich"
}
elasticsearch {
client { type: "http" endpoint: "xx.xx.xx.xx" #IP address of server port: "9200" max-timeout: "7200"
# Section for configuring the HTTP client http { conn-timeout: "7200" read-timeout: "7200" } }
cluster { name: "elasticsearch" index: "snowplow" type: "elasticsearch" }
}
Events are accumulated in a buffer before being sent to Elasticsearch.
The buffer is emptied whenever:
- the combined size of the stored records exceeds byte-limit or
- the number of stored records exceeds record-limit or
- the time in milliseconds since it was last emptied exceeds time-limit
buffer {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}
Optional section for tracking endpoints
monitoring {
snowplow {
collector-uri: "xx.xx.xx.xx"
collector-port: 80
app-id: "collector-monitor"
method: “GET”
}
}
}
Please help, TIA