Unable to receive Snowplow data into Elasticsearch

Hello Support,
I am using following configurations to get logs into my elasticsearch server but i am not getting any logs there
i have pasted only important feild of my config file bellow. I think i am using wrong streams names at wrong places.

i have three streams in my Amazon Kinesis : good , bad , enriched
please help me if i am doing something wrong. TIA

Collector Configuration:

kinesis {
  stream {
    region: "us-west-2"
    good: "good"
    bad: "bad"
  }

Enricher Configuration

enrich {
source = “kinesis”
sink = “kinesis”
streams {
in: {
raw: “good”
maxRecords: 10000
}
out: {
enriched: “enriched”
bad: “bad”
}
}
app-name: “snownrich”

}

Elasticsearch Configuration

sink {
source = “kinesis”
sink {
“good”: “elasticsearch”
“bad”: “none”
}
stream-type: “good”

kinesis {
in {
stream-name: “good” # Kinesis stream name
maxRecords: 10000
}
out {
stream-name: “bad”
}

region: "us-west-2"
app-name: "snownrich"

}

elasticsearch {

client {
  type: "http"
  endpoint: "xx.xx.xx.xx"
  port: "9200"
  max-timeout: "7200"
  # Section for configuring the HTTP client
  http {
    conn-timeout: "7200"
    read-timeout: "7200"
  }
}
cluster {
  name: "elasticsearch"
  index: "snowplow"
  type: "elasticsearch"
}

}

}

Your Elasticsearch Sink in Stream should be pointed at “Enriched” not “Good”. We tend to call the initial collector stream “Raw” as it is an unprocessed event stream.

Hope that helps!

Hi Josh, i changed it to enriched but then i am getting following error. not sure why

[pool-1-thread-1] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask - Caught exception:
[pool-1-thread-3] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000003 with 49570661405857612471055537955668858758726304423435305010
[pool-1-thread-3] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask - Caught exception:
[cw-metrics-publisher] INFO com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Successfully published 10 datums.
[pool-1-thread-2] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000008 with 49570692548870582964269293893151063713634786083948462210
[pool-1-thread-2] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask - Caught exception:
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Current stream shard assignments: shardId-000000000009, shardId-000000000008, shardId-000000000003
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Sleeping …
[pool-1-thread-1] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000009 with 49570656065944174682897872212148655484433362121641689234
[pool-1-thread-1] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask - Caught exception:
[cw-metrics-publisher] INFO com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable - Successfully published 20 datums.
[pool-1-thread-3] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000003 with 49570661405857612471055537955668858758726304423435305010
[pool-1-thread-3] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitializeTask - Caught exception:

Hi @geetanshjindal I think I see the problem. The app-name in each Kinesis consumer (Stream Enrich, LZO Sink, Elasticsearch Sink) must be unique - this is because each of these applications maintains state in DynamoDB based on that app-name. If you changed these I imagine it should start to work!

Hi @josh
i have changed app-name in all the places (different name in each file)
but i am still not getting any data in elastic search.

and log file’s in s3 have very weird data.

Log filename:

2017-02-23-49570692548870582964269307975052674111052681301100331138-49570692548870582964269307975052674111052681301100331138.lzo

i have decompressed the file using

lzop -d 2017-02-23-49570692548870582964269307975052674111052681301100331138-49570692548870582964269307975052674111052681301100331138.lzop

File content

)\D8\D5X\CDL)\B2\BCW\99!q\BD\FF\D2\00\00byte\C5 \00d\00\00\00 52.33.168.12
\00\C8\00\00ZjV\80\E3 \00\D2\00\00\00UTF-8 \00\DC\00\00\00ssc-0.9.0-kinesis ,\00\00\00spray-can/1.3.2 @\00\00\00/i J\00\00\99e=ue&eid=fc2cd35a-1794-4879-a702-ba59ff4f98ab&aid=collector-monitor&tna=snowplow-kinesis-s3&tv=scala-0.1.0&p=srv&dtm=1487842738390&ue_px=eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy91bnN0cnVjdF9ldmVudC9qc29uc2NoZW1hLzEtMC0wIiwiZGF0YSI6eyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5tb25pdG9yaW5nLmtpbmVzaXMvYXBwX2hlYXJ0YmVhdC9qc29uc2NoZW1hLzEtMC0wIiwiZGF0YSI6eyJpbnRlcnZhbCI6MzAwMDAwfX19^ \00\00\00\00\00\00Host: 50.112.46.42\00\00\00eUser-Agent: spray-can/1.3.2 \90\00\00\00 50.112.46.42 \9A\00\00\00$b17dd114-e4f5-417b-b71b-ad05c48eb33e zi\00\00\00Aiglu:com.snowplowanalytics.snowplow/CollectorPayload/thrift/1-0-0\00

i am not sure where i did the mistake

Hi @geetanshjindal - if no data is coming into the good elasticsearch sink it might be worthwhile setting up a bad elasticsearch sink as well to process the records from the bad stream into elasticsearch. This will tell you whether or not the events are being dropped at the enrichment point due to validation errors.

Hi @Josh,

when i changed stream-type to bad in elasticsearch sink i started getting following error

[pool-1-thread-5] ERROR com.snowplowanalytics.snowplow.storage.kinesis.elasticsearch.generated.ElasticsearchSenderHTTP - Record
failed with message:
{“type”:“mapper_parsing_exception”,“reason”:“failed to
parse”,“caused_by”:
{“type”:“not_x_content_exception”,“reason”:“Compressor detection can only be called on some xcontentytes or compressed xcontent bytes”}}

Did you also change the in stream?

Hi @josh,
i am sharing my entire configuration of my setup with you. please let me know if i messed up somewhere.
Thanks

Tracker code

< script type=“text/javascript” >
;(function(p,l,o,w,i,n,g){if(!p[i]){p.GlobalSnowplowNamespace=p.GlobalSnowplowNamespace||;
p.GlobalSnowplowNamespace.push(i);p[i]=function(){(p[i].q=p[i].q||).push(arguments)
};p[i].q=p[i].q||;n=l.createElement(o);g=l.getElementsByTagName(o)[0];n.async=1;
n.src=w;g.parentNode.insertBefore(n,g)}}(window,document,“script”,“//d1fc8wv8zag5ca.cloudfront.net/2.6.2/sp.js","snowplow”));

window.snowplow(‘newTracker’, ‘cf’, ‘xx.xx.xx.xx’, { // Initialise a tracker
appId: ‘pa-index’,
cookieDomain: ‘.proxyacid.com’
});

window.snowplow(‘trackPageView’);

collector.conf

collector {
interface = “0.0.0.0”
port = “80”

production = true

p3p {
policyref = “/w3c/p3p.xml”
CP = “NOI DSP COR NID PSA OUR IND COM NAV STA”
}

cookie {
enabled = true
expiration = “365 days” # e.g. “365 days”
name = cookie
#domain = “{{collectorCookieDomain}}”
}

sink {
enabled = “kinesis”

kinesis {
  thread-pool-size: 10 # Thread pool size for Kinesis API requests
  aws {
    access-key: "key"
    secret-key: "key"
  }
  stream {
    region: "us-west-2"
    good: "good"
    bad: "bad"
  }
  backoffPolicy: {
    minBackoff: 3000
    maxBackoff: 600000
  }
}
kafka {
  brokers: "{{collectorKafkaBrokers}}"
  topic {
    good: "{{collectorKafkaTopicGoodName}}"
    bad: "{{collectorKafkaTopicBadName}}"
  }
}
buffer: {
  byte-limit: 4500000 # 4.5MB
  record-limit: 500 # 500 records
  time-limit: 60000 # 1 minute
}

}
}

akka {
loglevel = DEBUG # ‘OFF’ for no logging, ‘DEBUG’ for all logging.
loggers = [“akka.event.slf4j.Slf4jLogger”]
}

spray.can.server {
remote-address-header = on

uri-parsing-mode = relaxed
raw-request-uri-header = on

parsing {
max-uri-length = 32768
}
}

enricher.conf

enrich {
source = “kinesis”
sink = “kinesis”
aws {
access-key: “key”
secret-key: “key”
}

kafka {
brokers: “{{enrichKafkaBrokers}}”
}

streams {
in: {
raw: “good”
maxRecords: 10000
buffer: {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}
}

out: {
  enriched: "enriched"
  bad: "bad"
  backoffPolicy: {
    minBackoff: 3000
    maxBackoff: 600000
  }
}
app-name: "enricher-app"
initial-position = "TRIM_HORIZON"
region: "us-west-2"

}

monitoring {
snowplow {
collector-uri: “xx.xx.xx.xx”
collector-port: 80
app-id: “collector-monitor”
method: “GET”
}
}
}

resolver.json

{
“schema”: “iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0”,
“data”: {
“cacheSize”: 500,
“repositories”: [
{
“name”: “Iglu Central”,
“priority”: 0,
“vendorPrefixes”: [ “com.snowplowanalytics” ],
“connection”: {
“http”: {
“uri”: “http://iglucentral.com
}
}
}
]
}
}

enrichment/anon_ip.json

{
“schema”: “iglu:com.snowplowanalytics.snowplow/anon_ip/jsonschema/1-0-0”,

"data": {
    "name": "anon_ip",
    "vendor": "com.snowplowanalytics.snowplow",
    "enabled": true,
    "parameters": {
        "anonOctets": 2
    }
}

}

s3sink.conf

sink {
aws {
access-key: “key”
secret-key: “key”
}

kinesis {
in {
stream-name: “good”
initial-position: “TRIM_HORIZON”
max-records: “1000”
}

out {
  stream-name: "bad"
}
region: "us-west-2"
app-name: "s3-sink-app"

}

s3 {

    region: "us-west-2"
    endpoint: "http://s3-us-west-2.s3.amazonaws.com"
    bucket: "snowplow-logs-cnw/logs"
    max-timeout: "300000"
    format: "lzo"

}

buffer {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}

logging {
level: “error”
}
monitoring {
snowplow {
collector-uri: “xx.xx.xx.xx”
collector-port: 80
app-id: “collector-monitor”
method: “GET”
}
}
}

elasticsearch.conf

sink {
source = “kinesis”
sink {
“good”: “elasticsearch”
“bad”: “kinesis”
}

stream-type: “good”
aws {
access-key: “key”
secret-key: “key”
}

kinesis {

in {
  stream-name: "enriched" # Kinesis stream name
  initial-position: "TRIM_HORIZON"
  maxRecords: 10000
}
out {
  stream-name: "bad"
  shards: 1
}
region: "us-west-2"
app-name: "elastics-app"

}

elasticsearch {

client {
  type: "http"
  endpoint: "localhost"
  port: "9200"
  max-timeout: "7200"
  http {
    conn-timeout: "7200"
    read-timeout: "7200"
  }
}
cluster {
  name: "elasticsearch"
  index: "filebeat"
  type: "esnow"
}

}

buffer {
byte-limit: 4500000
record-limit: 500 # Not supported by Kafka; will be ignored
time-limit: 60000
}

monitoring {
snowplow {
collector-uri: “xx.xx.xx.xx”
collector-port: 80
app-id: “collector-monitor”
method: “GET”
}
}
}

Hi @geetanshjindal - nothing in particular jumps out as being wrong. What I would recommend is instead of trying to get the entire pipeline running in one go is that you instead take it piece by piece. Each of the components (apart from the LZO sink) has a stdout/stderr sink that you can use to validate data after each micro-service.

  1. Setup Collector to sink to stdout/stderr - if this works reconfigure to push to kinesis streams
  2. Setup Kinesis Enrich to read from kinesis but to write to stdout/stderr - if this works reconfigure to push to kinesis streams
  3. Setup Elasticsearch Sink to read from kinesis but to write to stdout/stderr - if this works reconfigure to push to Elasticsearch Cluster

In this way you can be sure that each component is working along the way rather than trying to find the needle in the haystack.

I would also recommend looking at Snowplow Mini for guidance as there are a lot of config resources and a topology diagram that could help in understanding the realtime flow:

Hi @josh

Thanks for your help, issue was in elastic search now its resolved.
thank you for all your help.

Snowplow support team is the best team i have ever worked with, very active and very knowledgeable
Thank you :slight_smile:

and josh if you ever come to Chandigarh,India.
Drinks are on me

3 Likes

Hi @geetanshjindal! Very happy to hear it is all resolved now - setting up a real-time pipeline is no easy task!

Sounds good!

What was the issue? Do you mind sharing?
I’m having a problem with elasticsearch not getting data from enrichGood as well.
Thanks in advance!

Hi @farhah,

It would be best if you opened a up a fresh topic with the issue you are seeing and any extra information that might help us help you get the issue resolved!

Cheers,
Josh