Hello,
I am using “snowplow-stream-collector-kafka-1.0.0” and i am trying to publish incoming messages from the trackers to Kafka topics.
This is my collector’s configuration file:
collector {
interface = "0.0.0.0"
interface = ${?COLLECTOR_INTERFACE}
port = 5359
port = ${?COLLECTOR_PORT}
ssl {
enable = false
enable = ${?COLLECTOR_SSL}
# whether to redirect HTTP to HTTPS
redirect = false
redirect = ${?COLLECTOR_SSL_REDIRECT}
port = 9543
port = ${?COLLECTOR_SSL_PORT}
}
paths {
# "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
# "/com.acme/redirect" = "/r/tp2"
# "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
}
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
crossDomain {
enabled = false
# Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
domains = [ "*" ]
domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = true
secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
}
cookie {
enabled = true
#enabled = ${?COLLECTOR_COOKIE_ENABLED}
expiration = "365 days" # e.g. "365 days"
#expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
# Network cookie name
name = charg
secure = false
secure = ${?COLLECTOR_COOKIE_SECURE}
httpOnly = false
httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
}
doNotTrackCookie {
enabled = false
enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
name = ""
name = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_NAME}
value = ""
value = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_VALUE}
}
cookieBounce {
enabled = false
enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
# The name of the request parameter which will be used on redirects checking that third-party
# cookies work.
name = "n3pc"
name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
# Network user id to fallback to when third-party cookies are blocked.
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
# Optionally, specify the name of the header containing the originating protocol for use in the
# bounce redirect location. Use this if behind a load balancer that performs SSL termination.
# The value of this header must be http or https. Example, if behind an AWS Classic ELB.
forwardedProtocolHeader = "X-Forwarded-Proto"
forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
}
enableDefaultRedirect = true
enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
redirectMacro {
enabled = false
enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
}
rootResponse {
enabled = false
enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
statusCode = 302
statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
# Optional, defaults to empty map
headers = {
Location = "https://127.0.0.1/",
Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
X-Custom = "something"
}
# Optional, defaults to empty string
body = "302, redirecting"
body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
}
# Configuration related to CORS preflight requests
cors {
# The Access-Control-Max-Age response header indicates how long the results of a preflight
# request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
accessControlMaxAge = 5 seconds
accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
}
# Configuration of prometheus http metrics
prometheusMetrics {
# If metrics are enabled then all requests will be logged as prometheus metrics
# and '/metrics' endpoint will return the report about the requests
enabled = false
# Custom buckets for http_request_duration_seconds_bucket duration metric
#durationBucketsInSeconds = [0.1, 3, 10]
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = good_sink
good = ${?COLLECTOR_STREAMS_GOOD}
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = bad_sink
bad = ${?COLLECTOR_STREAMS_BAD}
# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false
useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kafka
brokers = "localhost:9092"
## Number of retries to perform before giving up on sending a record
retries = 0
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
#"bootstrap.servers" -> brokers
#retries -> retries
#"buffer.memory" -> buffer.byteLimit
#"linger.ms" -> buffer.timeLimit
producerConf {
acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.ByteArraySerializer"
}
}
buffer {
byteLimit = 4500000
byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 500 # Not supported by Kafka; will be ignored
recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 60000
timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
}
}
}
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loglevel = ${?AKKA_LOGLEVEL}
loggers = ["akka.event.slf4j.Slf4jLogger"]
loggers = [${?AKKA_LOGGERS}]
http.server {
# To obtain the hostname in the collector, the 'remote-address' header
# should be set. By default, this is disabled, and enabling it
# adds the 'Remote-Address' header to every request automatically.
remote-address-header = on
remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}
raw-request-uri-header = on
raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
uri-parsing-mode = relaxed
uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
}
}
}
The collector’s response to the JS tracker is 200 an then in the collector’s console i can see the message:
[scala-stream-collector-akka.actor.default-dispatcher-4] [akka://scala-stream-collector/system/IO-T/$a/0] New connection accepted".
But nothing seems to reach the kafka topic. Then after 1 minute the message from the tracker reaches Kafka and in the collectors console there is the message:
[scala-stream-collector-akka.actor.default-dispatcher-7] [akka://scala-stream-collector/system/StreamSupervisor-0/flow-2-0-detacher] Aborting tcp connection to /0:0:0:0:0:0:0:1:59177 because of upstream failure: akka.http.impl.engine.HttpIdleTimeoutException: HTTP idle-timeout encountered, no bytes passed in the last 1 minute. This is configurable by akka.http.[server|client].idle-timeout.
Can anyone explain why is this happening? Why the collector is waiting for this timeout before pushing the data to Kafka?
Thank you!