Error in setting up Collector Part

Hello,
The following is my config file

Copyright © 2013-2017 Snowplow Analytics Ltd. All rights reserved.

    #
    # This program is licensed to you under the Apache License Version 2.0, and
    # you may not use this file except in compliance with the Apache License
    # Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
    # http://www.apache.org/licenses/LICENSE-2.0.
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the Apache License Version 2.0 is distributed on an "AS
    # IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
    # implied.  See the Apache License Version 2.0 for the specific language
    # governing permissions and limitations there under.

    # This file (application.conf.example) contains a template with
    # configuration options for the Scala Stream Collector.
    #
    # To use, copy this to 'application.conf' and modify the configuration options.

    # 'collector' contains configuration options for the main Scala collector.
    collector {
      # The collector runs as a web service specified on the following
      # interface and port.
      interface = "123.123.123"
      port = 8080

      # Configure the P3P policy header.
      p3p {
            policyRef = "/w3c/p3p.xml"
            CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
      }

      # The collector returns a cookie to clients for user identification
      # with the following domain and expiration.
      cookie {
            enabled = true
            expiration = "365 days" # e.g. "365 days"
            # Network cookie name
            name = UnilogAnalytics
            # The domain is optional and will make the cookie accessible to other
            # applications on the domain. Comment out this line to tie cookies to
            # the collector's full domain
            domain = "collector.cookie.domain"
      }

      # When enabled and the cookie specified above is missing, performs a redirect to itself to check
      # if third-party cookies are blocked using the specified name. If they are indeed blocked,
      # fallbackNetworkId is used instead of generating a new random one.
      cookieBounce {
            enabled = false
            # The name of the request parameter which will be used on redirects checking that third-party
            # cookies work.
            name = "n3pc"
            # Network user id to fallback to when third-party cookies are blocked.
            fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
      }
       # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
       # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
       # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
            bad = collector.streams.good
       # specified, the default value is `${SP_NUID}`.
            redirectMacro {
                enabled = false
       # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
                placeholder = "${SP_NUID}"
               }

       streams {
            # Events which have successfully been collected will be stored in the good stream/topic
            good = GoodStream

            # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
            bad = BadStream

            # Whether to use the incoming event's ip as the partition key for the good stream/topic
            # Note: Nsq does not make use of partition key.
            useIpAddressAsPartitionKey = false

            # Enable the chosen sink by uncommenting the appropriate configuration
            sink {
              # Choose between kinesis, kafka, nsq, or stdout
              # To use stdout comment everything
              enabled = kinesis

              # Region where the streams are located
              region = ca-central-1

              # Thread pool size for Kinesis API requests
              threadPoolSize = 10

              # The following are used to authenticate for the Amazon Kinesis sink.
              # If both are set to 'default', the default provider chain is used
              # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
              # If both are set to 'iam', use AWS IAM Roles to provision credentials.
              # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
              aws {
                    accessKey = ABC-XYZ
                    secretKey = ABC-XYZ
              }

              # Minimum and maximum backoff periods
              backoffPolicy {
                    minBackoff = 3000
                    maxBackoff = 600000
              }

              # Or Kafka
              #brokers = "{{kafkaBrokers}}"
              ## Number of retries to perform before giving up on sending a record
              #retries = 0

              # Or NSQ
              ## Host name for nsqd
              #host = "{{nsqd}}"
              ## TCP port for nsqd, 4150 by default
              #port = 4150
            }

            # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
            # Note: Buffering is not supported by NSQ.
            # The buffer is emptied whenever:
            # - the number of stored records reaches record-limit or
            # - the combined size of the stored records reaches byte-limit or
            # - the time in milliseconds since the buffer was last emptied reaches time-limit
            buffer {
              byteLimit = 4500000
              recordLimit = 500 # Not supported by Kafka; will be ignored
              timeLimit = 60000
            }
      }
    }

    # Akka has a variety of possible configuration options defined at
    # http://doc.akka.io/docs/akka/current/scala/general/configuration.html
    akka {
      loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
      loggers = ["akka.event.slf4j.Slf4jLogger"]

      # akka-http is the server the Stream collector uses and has configurable options defined at
      # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
      http.server {
            # To obtain the hostname in the collector, the 'remote-address' header
            # should be set. By default, this is disabled, and enabling it
            # adds the 'Remote-Address' header to every request automatically.
            remote-address-header = on

            raw-request-uri-header = on

            # Define the maximum request length (the default is 2048)
            parsing {
              max-uri-length = 32768
              uri-parsing-mode = relaxed
            }
      }
    }

And this is the Error I am facing…

java -Dcom.amazonaws.sdk.disableCbor -jar snowplow-stream-collector-0.12.0.jar --config collector.conf
[scala-stream-collector-akka.actor.default-dispatcher-6] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
Exception in thread “main” java.lang.IllegalArgumentException: requirement failed: Kinesis stream GoodStream doesn’t exist
at scala.Predef$.require(Predef.scala:224)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink.(KinesisSink.scala:114)
at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$.createAndInitialize(KinesisSink.scala:48)
at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.run(Collector.scala:80)
at com.snowplowanalytics.snowplow.collectors.scalastream.Collector$.main(Collector.scala:63)
at com.snowplowanalytics.snowplow.collectors.scalastream.Collector.main(Collector.scala)

I need assistance regarding this issue.

Do the GoodStream and BadStream kinesis streams exist in your account already? Are they accessible using the access key and secret key you’ve specified in the configuration?

Yes, I did set the GoodStream and BadStream later on. Thank you so much for helping me out from that.
But now I encountered a new situation and the following is it’s screenshot.


Am I also supposed to set the Kinesis Firehose and Data Analytics stuffs??
Can you please help me figure out that?

The screenshot doesn’t show any errors - if the collector is binding to an interface that means you should now be able to send events to the collector that will sink into Kinesis.

Snowplow doesn’t use Kinesis Firehose or Kinesis Analytics as part of the AWS pipeline so the next step would be to set up the stream enrichment process.

1 Like

Hello, The last reply did helped me a lot, Thank You. As I moved forward for Stream enrichment I am having one exception. The screen shot of it is as follows.

The config file is as follows:

# Copyright (c) 2013-2017 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
 # Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
 # http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express             or
 # implied.  See the Apache License Version 2.0 for the specific language
  # governing permissions and limitations there under.

 # This file (application.conf.example) contains a template with
    # configuration options for Stream Enrich.

 enrich {
   # Sources currently supported are:
 # 'kinesis' for reading Thrift-serialized records from a Kinesis stream
  # 'kafka' for reading Thrift-serialized records from a Kafka topic
 # 'nsq' for reading Thrift-serialized records from a Nsq topic
  # 'stdin' for reading Base64-encoded Thrift-serialized records from stdin
   source = kinesis

    # Sinks currently supported are:
  # 'kinesis' for writing enriched events to one Kinesis stream and invalid events to another.
  # 'kafka' for writing enriched events to one Kafka topic and invalid events to another.
  # 'nsq' for writing enriched events to one Nsq topic and invalid events to another.
  # 'stdouterr' for writing enriched events to stdout and invalid events to stderr.
  #    Using "sbt assembly" and "java -jar" is recommended to disable sbt logging.
 sink = kinesis

    # AWS credentials
 # If both are set to 'default', use the default AWS credentials provider chain.
 # If both are set to 'iam', use AWS IAM Roles to provision credentials.
  # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and       AWS_SECRET_ACCESS_KEY
    aws {
accessKey = xyzxyz
secretKey = xyzxyz
 }

  streams {
in {
  # Stream/topic where the raw events to be enriched are located
  raw = "{{enrichStreamsInRaw}}"
}

out {
  # Stream/topic where the events that were successfully enriched will end up
  enriched = GoodStream
  # Stream/topic where the event that failed enrichment will be stored
  bad = BadStream

  # How the output stream/topic will be partitioned.
  # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
  # user_ipaddress, domain_sessionid, user_fingerprint.
  # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
  # possible parittion keys correspond to.
  # Otherwise, the partition key will be a random UUID.
  # Note: Nsq does not make use of partition key.
  partitionKey = "{{partitionKeyName}}"
}

kinesis {
  # Region where the streams are located
  region = ca-central-1

  # Maximum number of records to get from Kinesis per call to GetRecords
  maxRecords = 10000

  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only effects the first run of this application on a stream.
  initialPosition = TRIM_HORIZON

  # Need to be specified when initial-position is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  initialTimestamp = "{{initialTimestamp}}"

  # Minimum and maximum backoff periods, in milliseconds
  backoffPolicy {
    minBackoff = 3000
    maxBackoff = 600000
  }
}

# Kafka configuration
kafka {
  brokers = "{{kafkaBrokers}}"

  # Number of retries to perform before giving up on sending a record
  retries = 0
}

# config for nsq
nsq {
  # Channel name for nsq source
  # If more than one application is reading from the same NSQ topic at the same time,
  # all of them must have the same channel name
  rawChannel = "{{nsqSourceChannelName}}"

  # Host name for nsqd
  host = "{{nsqHost}}"

  # TCP port for nsqd, 4150 by default
  port = 4150

  # Host name for lookupd
  lookupHost = "{{lookupHost}}"

  # HTTP port for nsqlookupd, 4161 by default
  lookupPort = 4161
}

# After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches recordLimit or
# - the combined size of the stored records reaches byteLimit or
# - the time in milliseconds since it was last emptied exceeds timeLimit when
#   a new event enters the buffer
buffer {
  byteLimit = 4500000
  recordLimit = 500 # Not supported by Kafka; will be ignored
  timeLimit = 60000
}

# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# You can set it automatically using: "SnowplowEnrich-$\\{enrich.streams.in.raw\\}"
appName = outStream
  }

   # Optional section for tracking endpoints
   #  monitoring {
   #    snowplow {
   #      collectorUri = "123.123.123.123"
   #      collectorPort = 80
  #      appId = "1"
  #      method = GET
  #    }
  #  }
   }

And the JSON file is as follows:

{
 "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
 "data": {
   "cacheSize": 500,
   "repositories": [
     {
      "name": "Iglu Central",
    "priority": 0,
    "vendorPrefixes": [ "com.snowplowanalytics" ],
    "connection": {
      "http": {
        "uri": "http://iglucentral.com"
      }
    }
  },
  {
    "name": "Iglu Central - GCP Mirror",
    "priority": 1,
    "vendorPrefixes": [ "com.snowplowanalytics" ],
    "connection": {
      "http": {
        "uri": "http://mirror01.iglucentral.com"
      }
    }
  }
]
}
}

The project is stalled at this point and need some help to get through this error.

The answers you seek are in the error output.

Kinesis stream Goodstream doesn’t exist or is neither active nor updating

parasp have you found the answer to your question? i’m having something similar and been interested. mike, if i post mine can you help me too? thanks!

Hi Mike, Kinesis data stream is Active. I did check the goodStream on Kinesis Dashboard.
Following is the screenshot of it.

Also, in the last reply about the collector part you said;

Now I dint really understood the part, can you please explain how to send event which is binding to an interface. And how should I check it?? Is it just by ip (123.123.123.123:80) ??

Hello @Anwat19 , for sure I can look into your issue and help you accordingly based on my issue and experience to resolve it.

It looks like you may have doubled up on stream names in your collector and enricher on “GoodStream”.

At the moment your enrich configuration does haven’t an input (raw) stream defined as its set to raw = "{{enrichStreamsInRaw}}". You should replace {{enrichStreamsInRaw}} here with the name of your Kinesis stream that the collector is pushing events into.

https://sea2.discourse-cdn.com/flex016/uploads/snowplowanalytics/optimized/1X/0f303cb694e20d1f2f62f6485a62c8d49741caae_1_690x448.png

This will mean you have 4 unique Kinesis streams (although the diagram above only shows 3)

  1. Kinesis raw events good - for all the good events from the Collector
  2. Kinesis raw events bad - for bad events (e.g., events > 1 MB if using Kinesis)
  3. Kinesis enriched good events - for all enriched good events
  4. Kinesis enriched bad events - for events that fail enrichment/validation

Ok. Which means, I have to create 2 more dataStream in Kinesis which totals up to 4. 2 for collector and 2 for enrich. Following is the screen shot of my current kinesis dashboard.

And what should be in “Stream-OUT” section? Where should I pass the out Events??
This will solve all my issues with enrich out stream.