I am completely new to Snowplow and am in the process of setting up my Open Source Scala Collector for Kinesis and I continue to run into what appears to be a unique issue. I have searched all of the previous topics, to no avail.
When I attempt to run this:
java -jar -Dcom.amazonaws.sdk.disableCbor snowplow-stream-collector-kinesis-1.0.0.jar --config poc.config
I continue to receive this message immediately following:
Exception in thread “main” com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), com.amazonaws.auth.profile.ProfileCredentialsProvider@73386d72: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@2516fc68: Unable to load credentials from service endpoint]
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:136)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1225)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:801)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:751)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at com.amazonaws.services.kinesis.AmazonKinesisClient.executeDescribeStream(AmazonKinesisClient.java:875)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:846)
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:887)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.streamExists(KinesisSink.scala:125)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.$anonfun$createAndInitialize$2(KinesisSink.scala:52)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.createAndInitialize(KinesisSink.scala:50)
at com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$.$anonfun$main$2(KinesisCollector.scala:38)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$.main(KinesisCollector.scala:30)
at com .snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector.main(KinesisCollector.scala)
Here is how my config is currently set up:
# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.
# This file (config.hocon.sample) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.
# 'collector' contains configuration options for the main Scala collector.
collector {
# The collector runs as a web service specified on the following interface and port.
interface = "0.0.0.0"
interface = ${?COLLECTOR_INTERFACE}
port = 9092
port = ${?COLLECTOR_PORT}
# optional SSL/TLS configuration
ssl {
enable = false
enable = ${?COLLECTOR_SSL}
# whether to redirect HTTP to HTTPS
redirect = false
redirect = ${?COLLECTOR_SSL_REDIRECT}
port = 9543
port = ${?COLLECTOR_SSL_PORT}
}
# The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
# The expected values are:
# - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
# - r/tp2 for redirects
# - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
# Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
# downstream of the collector.
# But you can also map any valid (i.e. two-segment) path to one of the three defaults.
# Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
# valid paths starting with a leading slash.
# Pass in an empty map to avoid mapping.
paths {
# "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
# "/com.acme/redirect" = "/r/tp2"
# "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
}
# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
# Cross domain policy configuration.
# If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
# route.
crossDomain {
enabled = false
# Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
domains = [ "*" ]
domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = true
secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
}
# The collector returns a cookie to clients for user identification
# with the following domain and expiration.
cookie {
enabled = true
enabled = ${?COLLECTOR_COOKIE_ENABLED}
expiration = "365 days" # e.g. "365 days"
expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
# Network cookie name
name = "userEvents"
name = ${?COLLECTOR_COOKIE_NAME}
# The domain is optional and will make the cookie accessible to other
# applications on the domain. Comment out these lines to tie cookies to
# the collector's full domain.
# The domain is determined by matching the domains from the Origin header of the request
# to the list below. The first match is used. If no matches are found, the fallback domain will be used,
# if configured.
# If you specify a main domain, all subdomains on it will be matched.
# If you specify a subdomain, only that subdomain will be matched.
# Examples:
# domain.com will match domain.com, www.domain.com and secure.client.domain.com
# client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
domains = [
"liquidweb.com" # e.g. "domain.com" -any origin domain ending with this will be matched and domain.com will be returned
"nexcess.net" # e.g. "secure.anotherdomain.com" -any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
# ... more domains
]
domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
# ... more domains
# If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
# cookie domains configured above. (For example, if there is no Origin header.)
fallbackDomain = "liquidweb.com"
fallbackDomain = ${?FALLBACK_DOMAIN}
secure = false
secure = ${?COLLECTOR_COOKIE_SECURE}
httpOnly = false
httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
# The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
# `Lax` or `None` to limit the cookie sent context.
# Strict: the cookie will only be sent along with "same-site" requests.
# Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
# None: the cookie will be sent with same-site and cross-site requests.
#sameSite = "{{cookieSameSite}}"
#sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
}
# If you have a do not track cookie in place, the Scala Stream Collector can respect it by
# completely bypassing the processing of an incoming request carrying this cookie, the collector
# will simply reply by a 200 saying "do not track".
# The cookie name and value must match the configuration below, where the names of the cookies must
# match entirely and the value could be a regular expression.
doNotTrackCookie {
enabled = false
enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
name = "doNotTrackCookieName"
name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
value = "doNotTrackCookieName"
value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
}
# When enabled and the cookie specified above is missing, performs a redirect to itself to check
# if third-party cookies are blocked using the specified name. If they are indeed blocked,
# fallbackNetworkId is used instead of generating a new random one.
cookieBounce {
enabled = false
enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
# Optionally, specify the name of the header containing the originating protocol for use in the
# bounce redirect location. Use this if behind a load balancer that performs SSL termination.
# The value of this header must be http or https. Example, if behind an AWS Classic ELB.
forwardedProtocolHeader = "X-Forwarded-Proto"
forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
}
# When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
# Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
# Custom redirects configured in `paths` can still be used.
enableDefaultRedirect = true
enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
# When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
# token. All instances of that token are replaced withe the network ID. If the placeholder isn't
# specified, the default value is `${SP_NUID}`.
redirectMacro {
enabled = false
enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
}
# Customize response handling for requests for the root path ("/").
# Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
rootResponse {
enabled = false
enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
statusCode = 302
statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
# Optional, defaults to empty map
headers = {
Location = "https://127.0.0.1/",
Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
X-Custom = "something"
}
# Optional, defaults to empty string
body = "302, redirecting"
body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
}
# Configuration related to CORS preflight requests
cors {
# The Access-Control-Max-Age response header indicates how long the results of a preflight
# request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
accessControlMaxAge = 5 seconds
accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
}
# Configuration of prometheus http metrics
prometheusMetrics {
# If metrics are enabled then all requests will be logged as prometheus metrics
# and '/metrics' endpoint will return the report about the requests
enabled = false
# Custom buckets for http_request_duration_seconds_bucket duration metric
#durationBucketsInSeconds = [0.1, 3, 10]
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good_stream"
good = ${?COLLECTOR_STREAMS_GOOD}
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = "bad_stream"
bad = ${?COLLECTOR_STREAMS_BAD}
# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false
useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kinesis
enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}
# Region where the streams are located
region = "us-east-2"
region = ${?COLLECTOR_STREAMS_SINK_REGION}
## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}
# customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}
# Thread pool size for Kinesis API requests
threadPoolSize = 10
threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = default
accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
secretKey = default
secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
}
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 10
minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
maxBackoff = 1000
maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
}
# Or Google Pubsub
#googleProjectId = ID
## Minimum, maximum and total backoff periods, in milliseconds
## and multiplier between two backoff
#backoffPolicy {
# minBackoff = {{minBackoffMillis}}
# maxBackoff = {{maxBackoffMillis}}
# totalBackoff = {{totalBackoffMillis}} # must be >= 10000
# multiplier = {{backoffMultiplier}}
#}
# Or Kafka
#brokers = "{{kafkaBrokers}}"
## Number of retries to perform before giving up on sending a record
#retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit
#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}
# Or NSQ
## Host name for nsqd
#host = "{{nsqHost}}"
## TCP port for nsqd, 4150 by default
#port = {{nsqdPort}}
}
# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 10485760 # 10MB
byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 1024 # Not supported by Kafka; will be ignored
recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 5000 # 5 seconds
timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
}
}
}
# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loglevel = ${?AKKA_LOGLEVEL}
loggers = ["akka.event.slf4j.Slf4jLogger"]
loggers = [${?AKKA_LOGGERS}]
# akka-http is the server the Stream collector uses and has configurable options defined at
# http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on
remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}
raw-request-uri-header = on
raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
uri-parsing-mode = relaxed
uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
}
}
# By default setting `collector.ssl` relies on JSSE (Java Secure Socket
# Extension) to enable secure communication.
# To override the default settings set the following section as per
# https://lightbend.github.io/ssl-config/ExampleSSLConfig.html
# ssl-config {
# debug = {
# ssl = true
# }
# keyManager = {
# stores = [
# {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
# ]
# }
# loose {
# disableHostnameVerification = false
# }
# }
}