Kinesis stream doesn't exist or isn't available

Hi, this is the first time I’m trying to set up Snowplow open source. I’m getting the follow error when I try to run the snowplow-stream-collector-kinesis-2.3.1.jar in an Amazon linux 2 AMI:

[main] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$ - Kinesis stream snowplow-collected-good-events-stream doesn’t exist or isn’t available.
[main] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$ - SQS buffer is not configured.
[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
[main] WARN com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - No SQS buffer for surge protection set up (consider setting a SQS Buffer in config.hocon).
[main] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$ - Kinesis stream snowplow-collected-bad-events-stream doesn’t exist or isn’t available.
[main] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink$ - SQS buffer is not configured.
[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
[main] WARN com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - No SQS buffer for surge protection set up (consider setting a SQS Buffer in config.hocon).
[scala-stream-collector-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started

The collector does not seem to find the good and bad kinesis streams. I double checked the names of the streams in the config.hocon file and they are the same. I also gave the ec2 instance an IAM role for access to Kinesis. What am I missing here?

Hey @Ewoud so this is almost always a configuration issue.

Can you share:

  1. Your config file
  2. Your IAM Policy
  3. How your Collector is authenticating against AWS (Access + Secret Key / IAM Instance Role etc)

It might also be worth looking at our Terraform example here so you can compare the setup you have done to see if there is something missing from your IAM policy - GitHub - snowplow-devops/terraform-aws-collector-kinesis-ec2

  1. My config file:
collector {
  interface = "0.0.0.0"
  port = 8080

  ssl {
    enable = false
    # whether to redirect HTTP to HTTPS
    redirect = false
    port = 9543
  }

  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/p2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
  }

  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }


  crossDomain {
    enabled = false
    domains = [ "*" ]
    secure = true
  }

  cookie {
    enabled = true
    expiration = "365 days" # e.g. "365 days"
    name = collector_cookie
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # domain.com will match domain.com, www.domain.com and secure.client.domain.com
    # client.domain.com will match secure.client.domain.com but not domain.com or www.domain.com
    #domains = [
    #    "{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
    #    "{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
        # ... more domains
    #]
    #domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    #domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    #fallbackDomain = "{{fallbackDomain}}"
    #fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    #secure = ${?COLLECTOR_COOKIE_SECURE}
    httpOnly = false
    #httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    #sameSite = "{{cookieSameSite}}"
    #sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
  }

  doNotTrackCookie {
    enabled = false
    #enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    # name = {{doNotTrackCookieName}}
    name = collector-do-not-track-cookie
    # value = {{doNotTrackCookieValue}}
    value = collector-do-not-track-cookie-value
  }

  cookieBounce {
    enabled = false
    #enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    #name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    #fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
    #forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
  }

  enableDefaultRedirect = true
  #enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}


  redirectMacro {
    enabled = false
    #enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
    #placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
  }

 
  rootResponse {
    enabled = false
    #enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
    statusCode = 302
    #statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
    # Optional, defaults to empty map
    headers = {
      Location = "https://127.0.0.1/",
      # Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
      X-Custom = "something"
    }
    # Optional, defaults to empty string
    body = "302, redirecting"
    #body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
  }

  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds
    # accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
  }

  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow-collected-good-events-stream

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = snowplow-collected-bad-events-stream

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false
    #useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = "kinesis"

      # Region where the streams are located
      region = "eu-west-2"

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}
      # customEndpoint = ${?COLLECTOR_STREAMS_SINK_CUSTOM_ENDPOINT}

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10
      #threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = 'iam'
        secretKey = 'iam'
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        # minBackoff = {{minBackoffMillis}}
        minBackoff = 10
        # maxBackoff = {{maxBackoffMillis}}
        maxBackoff = 10
      }

      # Or Google Pubsub
      #googleProjectId = ID
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # https://kafka.apache.org/documentation/#producerconfigs
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" -> brokers
      # retries             -> retries
      # "buffer.memory"     -> buffer.byteLimit
      # "linger.ms"         -> buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"
      #}

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      # byteLimit = {{bufferByteThreshold}}
      byteLimit = 4500000
      # recordLimit = {{bufferRecordThreshold}} # Not supported by Kafka; will be ignored
      recordLimit = 500
      # timeLimit = {{bufferTimeThreshold}}
      timeLimit = 5000
    }
  }

}

akka {
  loglevel = DEBUG
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    #remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    #raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      # max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      # uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
    }
  }
}
  1. My IAM policy is the following:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:List*",
                "kinesis:Put*"
            ],
            "Resource": [
                "arn:aws:kinesis:eu-west-2:XXXXXXXX:stream/snowplow-collected-bad-events-stream",
                "arn:aws:kinesis:eu-west-2:XXXXXXXX:stream/snowplow-collected-good-events-stream"
            ]
        }
    ]
}
  1. I gave the ec2 instance the IAM role with the policy described above.

Thanks in advance!

IAM Policy and config seem fine to me @Ewoud

Do you have an egress Security Rule for TCP 443 to allow the server to connect to the Kinesis API?

The other thing to check is that your IAM Instance Role is correctly configured and setup - if you can SSH to that server I would install the AWS CLI and see if you can list the Kinesis Streams using that from the same server as a way of debugging the IAM setup.

Last thing - the server and the streams are in the same sub-account right?

Hi @josh, thanks for all the advice! The issues seemed to be a security rule for my ec2 instance, but is solved now! Thanks!

1 Like

Glad to hear it @Ewoud !