SOLVED: Pubsub collector 1.0.1 - buffer timeLimit problems

I have tried to build and run the pubsub collector but I keep getting the same error.
Seems like I’m missing something. Build is 1.0.1 .

Build steps:
After git clone, ran this here:
sbt "project pubsub" assembly

Which created .jar in pubsub/target/..../

To run jar file: java -jar snowplow-stream-collector-pubsub-1.0.1.jar --config config.conf


Exception in thread "main" java.lang.IllegalArgumentException: elementCountThreshold must be either unset or positive
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink$.batchingSettings(GooglePubSubSink.scala:78)
	at com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink$.createAndInitialize(GooglePubSubSink.scala:41)
	at com.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector$.$anonfun$main$2(GooglePubSubCollector.scala:35)
	at scala.util.Either.flatMap(Either.scala:341)
	at com.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector$.main(GooglePubSubCollector.scala:28)
	at om.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector.main(GooglePubSubCollector.scala)

Here is the config used:

# Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved.
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (config.hocon.sample) contains a template with
# configuration options for the Scala Stream Collector.
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = ""
  interface = ${?COLLECTOR_INTERFACE}
  port = 8080
  port = ${?COLLECTOR_PORT}

  # optional SSL/TLS configuration
  ssl {
    enable = false
    enable = ${?COLLECTOR_SSL}
    # whether to redirect HTTP to HTTPS
    redirect = false
    redirect = ${?COLLECTOR_SSL_REDIRECT}
    port = 9543
    port = ${?COLLECTOR_SSL_PORT}

  # The collector responds with a cookie to requests with a path that matches the 'vendor/version' protocol.
  # The expected values are:
  # - com.snowplowanalytics.snowplow/tp2 for Tracker Protocol 2
  # - r/tp2 for redirects
  # - com.snowplowanalytics.iglu/v1 for the Iglu Webhook
  # Any path that matches the 'vendor/version' protocol will result in a cookie response, for use by custom webhooks
  # downstream of the collector.
  # But you can also map any valid (i.e. two-segment) path to one of the three defaults.
  # Your custom path must be the key and the value must be one of the corresponding default paths. Both must be full
  # valid paths starting with a leading slash.
  # Pass in an empty map to avoid mapping.
  paths {
    # "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
    # "/com.acme/redirect" = "/r/tp2"
    # "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domains that are granted access, * will match and
    domains = [ "*" ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days"
    # Network cookie name
    name = snowplow
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out these lines to tie cookies to
    # the collector's full domain.
    # The domain is determined by matching the domains from the Origin header of the request
    # to the list below. The first match is used. If no matches are found, the fallback domain will be used,
    # if configured.
    # If you specify a main domain, all subdomains on it will be matched.
    # If you specify a subdomain, only that subdomain will be matched.
    # Examples:
    # will match, and
    # will match but not or
    domains = [
        "{{cookieDomain1}}" # e.g. "" -> any origin domain ending with this will be matched and will be returned
        "{{cookieDomain2}}" # e.g. "" -> any origin domain ending with this will be matched and will be returned
        # ... more domains
    domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
    domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
    # ... more domains
    # If specified, the fallback domain will be used if none of the Origin header hosts matches the list of
    # cookie domains configured above. (For example, if there is no Origin header.)
    #fallbackDomain = "{{fallbackDomain}}"
    #fallbackDomain = ${?FALLBACK_DOMAIN}
    secure = false
    httpOnly = false
    # The sameSite is optional. You can choose to not specify the attribute, or you can use `Strict`,
    # `Lax` or `None` to limit the cookie sent context.
    #   Strict: the cookie will only be sent along with "same-site" requests.
    #   Lax: the cookie will be sent with same-site requests, and with cross-site top-level navigation.
    #   None: the cookie will be sent with same-site and cross-site requests.
    #sameSite = "{{cookieSameSite}}"

  # If you have a do not track cookie in place, the Scala Stream Collector can respect it by
  # completely bypassing the processing of an incoming request carrying this cookie, the collector
  # will simply reply by a 200 saying "do not track".
  # The cookie name and value must match the configuration below, where the names of the cookies must
  # match entirely and the value could be a regular expression.
  doNotTrackCookie {
    enabled = false
    name = snowplow_do_not_track
    value = snowplow_do_not_track_value

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"

  # When enabled, redirect prefix `r/` will be enabled and its query parameters resolved.
  # Otherwise the request prefixed with `r/` will be dropped with `404 Not Found`
  # Custom redirects configured in `paths` can still be used.
  enableDefaultRedirect = true
  enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"

  # Customize response handling for requests for the root path ("/").
  # Useful if you need to redirect to web content or privacy policies regarding the use of this collector.
  rootResponse {
    enabled = false
    statusCode = 302
    # Optional, defaults to empty map
    headers = {
      Location = "",
      X-Custom = "something"
    # Optional, defaults to empty string
    body = "302, redirecting"

  # Configuration related to CORS preflight requests
  cors {
    # The Access-Control-Max-Age response header indicates how long the results of a preflight
    # request can be cached. -1 seconds disables the cache. Chromium max is 10m, Firefox is 24h.
    accessControlMaxAge = 5 seconds

  # Configuration of prometheus http metrics
  prometheusMetrics {
    # If metrics are enabled then all requests will be logged as prometheus metrics
    # and '/metrics' endpoint will return the report about the requests
    enabled = false
    # Custom buckets for http_request_duration_seconds_bucket duration metric
    #durationBucketsInSeconds = [0.1, 3, 10]

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = snowplow_raw_good

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = snowplow_raw_bad

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, google-pub-sub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      enabled = google-pub-sub

      # Region where the streams are located
      #region = {{kinesisRegion}}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}

      # Thread pool size for Kinesis API requests
      #threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      #aws {
      #  accessKey = iam
      #  secretKey = iam

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 10000 # {{minBackoffMillis}}
        maxBackoff = 30000 # {{maxBackoffMillis}}

      # Or Google Pubsub
      googleProjectId = ***removed because of public eyes***
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoff
      backoffPolicy {
        minBackoff = 100
        maxBackoff = 300
        totalBackoff = 20000 # must be >= 10000
        multiplier = 2

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0
      # The kafka producer has a variety of possible configuration options defined at
      # Some values are set to other values from this config by default:
      # "bootstrap.servers" = brokers
      # "buffer.memory"     = buffer.byteLimit
      # ""         = buffer.timeLimit
      #producerConf {
      #  acks = all
      #  "key.serializer"     = "org.apache.kafka.common.serialization.StringSerializer"
      #  "value.serializer"   = "org.apache.kafka.common.serialization.StringSerializer"

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit = 1000000 #{{bufferByteThreshold}}
      recordLimit = 0 #{{bufferRecordThreshold}} # Not supported by Kafka; will be ignored
      timeLimit = 1000 #{{bufferTimeThreshold}}


# Akka has a variety of possible configuration options defined at
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loglevel = ${?AKKA_LOGLEVEL}
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loggers = [${?AKKA_LOGGERS}]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on
    remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}

    raw-request-uri-header = on
    raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
      uri-parsing-mode = relaxed
      uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}

  # By default setting `collector.ssl` relies on JSSE (Java Secure Socket
  # Extension) to enable secure communication.
  # To override the default settings set the following section as per
  # ssl-config {
  #   debug = {
  #     ssl = true
  #   }
  #   keyManager = {
  #     stores = [
  #       {type = "PKCS12", classpath = false, path = "/etc/ssl/mycert.p12", password = "mypassword" }
  #     ]
  #   }
  #   loose {
  #     disableHostnameVerification = false
  #   }
  # }

I have create an Issue in github repo for this here.
Plus I created a PR to fix the problem.

Ok, I have found my error and have closed the PR and the Issue created. Sorry for the inconvenience.
So error was recordLimit was set to 0 (zero) and that was wrong. Question solved

1 Like