Hi All,
I have done the basic setup of snowplow.
When i try to run the scala stream collector using below command,
java -jar snowplow-stream-collector-0.11.0.jar --config my.conf
Below is the error i am getting.
[scala-stream-collector-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
Exception in thread "main" com.amazonaws.SdkClientException: Unable to marshall request to JSON:
Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:58)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:711)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:696)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:732)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.streamExists(KinesisSink.scala:123)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.<init>(KinesisSink.scala:114)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.createAndInitialize(KinesisSink.scala:48)
at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.run(Collector.scala:80)
at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.main(Collector.scala:63)
at com.snowplowanalytics.snowplow.collectors.scalastream.Collector.main(Collector.scala)
Caused by: java.lang.RuntimeException: Jackson jackson-core/jackson-dataformat-cbor incompatible library version detected.
You have two possible resolutions:
1) Ensure the com.fasterxml.jackson.core:jackson-core & com.fasterxml.jackson.dataformat:jackson-dataformat-cbor libraries on your classpath have the same version number
2) Disable CBOR wire-protocol by passing the -Dcom.amazonaws.sdk.disableCbor property or setting the AWS_CBOR_DISABLE environment variable (warning this may affect performance)
at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:68)
at com.amazonaws.protocol.json.internal.JsonProtocolMarshaller.finishMarshalling(JsonProtocolMarshaller.java:185)
at com.amazonaws.protocol.json.internal.NullAsEmptyBodyProtocolRequestMarshaller.finishMarshalling(NullAsEmptyBodyProtocolRequestMarshaller.java:53)
at com.amazonaws.services.kinesis.model.transform.DescribeStreamRequestProtocolMarshaller.marshall(DescribeStreamRequestProtocolMarshaller.java:56)
... 9 more
Caused by: java.lang.NoSuchMethodError: com.fasterxml.jackson.dataformat.cbor.CBORGenerator.getOutputContext()Lcom/fasterxml/jackson/core/json/JsonWriteContext;
at com.fasterxml.jackson.dataformat.cbor.CBORGenerator.close(CBORGenerator.java:903)
at com.amazonaws.protocol.json.SdkJsonGenerator.close(SdkJsonGenerator.java:253)
at com.amazonaws.protocol.json.SdkJsonGenerator.getBytes(SdkJsonGenerator.java:268)
at com.amazonaws.protocol.json.SdkCborGenerator.getBytes(SdkCborGenerator.java:66)
... 12 more
My configuration file is as below
# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.
# 'collector' contains configuration options for the main Scala collector.
collector {
# The collector runs as a web service specified on the following
# interface and port.
interface = "0.0.0.0"
port = 8080
# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
# The collector returns a cookie to clients for user identification
# with the following domain and expiration.
cookie {
enabled = true
expiration = "365 days" # e.g. "365 days"
# Network cookie name
name = UnilogAnalytics
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out this line to tie cookies to
# the collector's full domain
domain = "collector.cookie.domain"
}
# When enabled and the cookie specified above is missing, performs a redirect to itself to check
# if third-party cookies are blocked using the specified name. If they are indeed blocked,
# fallbackNetworkId is used instead of generating a new random one.
cookieBounce {
enabled = false
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = GoodStream
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = BadStream
# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, kafka, nsq, or stdout
# To use stdout comment everything
enabled = kinesis
# Region where the streams are located
region = us-east-1
# Thread pool size for Kinesis API requests
threadPoolSize = 10
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = xxx
secretKey = xxxxxxx
}
# Minimum and maximum backoff periods
backoffPolicy {
minBackoff = 3000
maxBackoff = 600000
}
# Or Kafka
#brokers = "{{kafkaBrokers}}"
## Number of retries to perform before giving up on sending a record
#retries = 0
# Or NSQ
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
}
# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 4500000
recordLimit = 500 # Not supported by Kafka; will be ignored
timeLimit = 60000
}
}
}
# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loggers = ["akka.event.slf4j.Slf4jLogger"]
# akka-http is the server the Stream collector uses and has configurable options defined at
# http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on
raw-request-uri-header = on
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
}
}
}
Please help me to resolve this error.