I’ve managed to build snowplow using docker and connect it to kafka which is built in the same network as snowplow-enrich and snowplow-collector. However, when i connect snowplow-enrich and snowplow-collector to existed kafka which stays outside of the network, i got an error:
[main] WARN org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=enrich] Connection to node 1 could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-2] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-2] Connection to node 1 could not be established. Broker may not be available.
I’ve changed the brokers in file stream-enrich.hocon and stream-collector.hocon to advertise listener port of my kafka, which is “192.168.1.23:9093”
This is my stream-collector.hocon file:
collector {
# The collector runs as a web service specified on the following interface and port.
interface = "0.0.0.0"
port = 8080
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
crossDomain {
enabled = false
domains = [ "*" ]
secure = true
}
cookie {
enabled = false
expiration = "365 days"
name = snowplow
domain = "{{cookieDomain}}"
}
doNotTrackCookie {
enabled = false
name = snowplow_do_not_track
value = snowplow_do_not_track_value
}
cookieBounce {
enabled = false
name = "n3pc"
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
forwardedProtocolHeader = "X-Forwarded-Proto"
}
redirectMacro {
enabled = false
placeholder = "[TOKEN]"
}
# Customize response handling for requests for the root path ("/").
# Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
rootResponse {
enabled = false
statusCode = 302
# Optional, defaults to empty map
headers = {
Location = "https://127.0.0.1/",
X-Custom = "something"
}
# Optional, defaults to empty string
body = "302, redirecting"
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = snowplow_raw_good
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = snowplow_raw_bad
useIpAddressAsPartitionKey = false
# Enable the chosen sink by uncommenting the appropriate configuration
sink {
enabled = kafka
brokers = "192.168.1.23:9093"
# Or Kafka
## Number of retries to perform before giving up on sending a record
retries = 1
}
# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 1000000
recordLimit = 0 # Not supported by Kafka; will be ignored
timeLimit = 100000
}
}
}
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loggers = ["akka.event.slf4j.Slf4jLogger"]
http.server {
remote-address-header = on
raw-request-uri-header = on
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
uri-parsing-mode = relaxed
}
}
}
And this is my stream-enrich.hocon
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = snowplow_raw_good
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = snowplow_enriched_good
# Stream/topic where the event that failed enrichment will be stored
bad = snowplow_enriched_bad
partitionKey = event_id
}
sourceSink {
enabled = kafka
# Minimum and maximum backoff periods, in milliseconds
brokers = "192.168.1.23:9093"
backoffPolicy {
minBackoff = 200
maxBackoff = 1000
}
# Or Kafka (Comment out for other types)
# Number of retries to perform before giving up on sending a record
retries = 1
}
buffer {
byteLimit = 1000000
recordLimit = 0 # Not supported by Kafka; will be ignored
timeLimit = 100000
}
appName = "enrich"
}
}
My docker-compose.yml
version: '3'
services:
iglu:
container_name: iglu
image: nginx
ports:
- "81:80"
volumes:
- ./iglu:/usr/share/nginx/html:ro
command: /bin/bash -c "echo 'autoindex on;' > /etc/nginx/conf.d/autoindex.conf && nginx -g 'daemon off;'"
restart: unless-stopped
stream-collector:
container_name: stream-collector
image: snowplow-docker-registry.bintray.io/snowplow/scala-stream-collector-kafka:0.14.0
command: [ "--config", "/snowplow/config/stream-collector.hocon" ]
ports:
- "8080:8080"
volumes:
- ./config:/snowplow/config
environment:
- "SP_JAVA_OPTS=-Xms512m -Xmx512m"
# extra_hosts:
# - "host:192.168.1.23"
restart: unless-stopped
stream-enrich:
container_name: stream-enrich
image: snowplow-docker-registry.bintray.io/snowplow/stream-enrich-kafka:0.19.0
command: [
"--config", "/snowplow/config/stream-enrich.hocon",
"--resolver", "file:/snowplow/config/resolver.json",
"--enrichments", "file:/snowplow/config/enrichments",
"--force-cached-files-download"
]
depends_on:
- stream-collector
links:
- iglu
volumes:
- ./config:/snowplow/config
environment:
- "SP_JAVA_OPTS=-Xms512m -Xmx512m"
extra_hosts:
- "host:192.168.1.23"
restart: unless-stopped