Snowplow-micro Kafka push is not working

Hi team,
I am trying to push my real-time events to a Kafka topic named “test,” but nothing is received on the consumer side. It may be that events are not producing to Kafka by snowplow.
Individually, I tried to push the messages to Kafka-topic and check them into the consumer console; messages are receiving.
Kafka is running through the Docker version.

I am using Matomo on-premise, and their events are also coming to the same topic. I shut down Matomo and tried on Snowplow, but their events are not coming to the same test topic.

My Kafka docker compose is shown below.

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines: 
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 29092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup. 
    # If the latter is true, you will need to change the value 'localhost' in 
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those 
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:9092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  
  init-kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - kafka
    entrypoint: [ '/bin/sh', '-c' ]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka:9092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic test --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server kafka:9092 --list
      "

Below is my running Snowplow on 9090 port

puneetmakhija@puneetmakhija:/mnt/c/Users/puneetmakhija$ docker run -p 9090:9090   --mount type=bind,source=$(pwd)/config,destination=/config   --mount type=bind,source=$(pw
d)/schemas,destination=/config/iglu-client-embedded/schemas   snowplow/snowplow-micro:1.6.0   --collector-config /config/config.hocon
[INFO] akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[INFO] com.snowplowanalytics.snowplow.micro.Main$ - No enrichments enabled.
[INFO] com.snowplowanalytics.snowplow.micro.Main$ - REST interface bound to /0.0.0.0:9090
[INFO] EventLog - GOOD id:77237ca7-391a-4d4a-944a-33b3d204d0a5 app_id:try-snowplow type:page_ping (iglu:com.snowplowanalytics.snowplow/page_ping/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:8279adfc-2d60-40a1-ab10-6ce98056bb4b app_id:try-snowplow type:unstruct (iglu:com.example/my-schema/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:38e506a9-c66b-4e97-9970-0a5e628371ba app_id:try-snowplow type:unstruct (iglu:com.example/my-schema/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:688b568f-a7e9-4309-a435-e1c4f4c49ffd app_id:try-snowplow type:page_ping (iglu:com.snowplowanalytics.snowplow/page_ping/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:5d24dc78-85dd-4e14-aa9a-e344af84f997 app_id:try-snowplow type:unstruct (iglu:com.example/my-schema/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:0824a1b5-15cc-4645-a220-8e055e3abe07 app_id:try-snowplow type:unstruct (iglu:com.example/my-schema/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:5afa18c3-d951-4f22-bed3-745a959c7f25 app_id:try-snowplow type:unstruct (iglu:com.example/my-schema/jsonschema/1-0-0)
[INFO] EventLog - GOOD id:26e5ed25-51a9-49dc-b7dd-447f38b7fac3 app_id:try-snowplow type:page_ping (iglu:com.snowplowanalytics.snowplow/page_ping/jsonschema/1-0-0)

Below is my config directory

puneetmakhija@puneetmakhija:/mnt/c/Users/puneetmakhija/config$ ls
config.hocon  iglu-client-embedded

config.hocon file content

collector {
  interface = "0.0.0.0"
  port = 9090
}

sink {
  enabled = "kafka"

  streams {
    kafka {
      brokers = "localhost:29092"
      topic = "test"
      producerConf = {
        bootstrap.servers = "localhost:29092"
        acks = "all"
        retries = 0
        batch.size = 16384
        linger.ms = 1
        buffer.memory = 33554432
        key.serializer = "org.apache.kafka.common.serialization.StringSerializer"
        value.serializer = "org.apache.kafka.common.serialization.StringSerializer"
      }
    }
  }
}

stream {
  enabled = false
}

Kafka console consumer in python

from confluent_kafka import Consumer, KafkaError
import time, json

conf = {
    'bootstrap.servers': 'localhost:29092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)

consumer.subscribe(['test'])

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    message = json.loads(msg.value().decode('utf-8'))
    print('Received message: {}'.format(message))

consumer.close()

Please guide why events are not pushing to kafka ?

Snowplow micro doesn’t have the ability to push to Kafka. You can read our documentation to find out what it does do.

@Colm thanks for your response
So snowplow-micro has the capability to change the default session duration from 30 minutes to something else, If yes then in the above context how can i achieve this ?

That’s nothing to do with micro, it’s a tracker setting

1 Like