Manual Opensource GCP setup

I am running a manual setup for GCP opensource as per the documentation I am running following command:

java -jar snowplow-stream-collector-google-pubsub-2.9.0.jar --config collector.hocon

but getting following errors:

pool-5-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Error while checking if topic good exists: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
[pool-9-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Error while checking if topic bad exists: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
[pool-5-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Error while checking if topic good exists: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.

Any help in this regard would be appreciated.

@josh
hello Josh,
Can you help me with the above issue?

Hi @srnagpure the error message seems to be pretty clear in that your application has not got sufficient access to access Google PubSub. You need to have some authentication available to the service to be able to access PubSub.

You can see a full working example of this in our OSS Terraform Module: GitHub - snowplow-devops/terraform-google-collector-pubsub-ce

You should also read up on the credentials chain logic for Google here: How Application Default Credentials works  |  Authentication  |  Google Cloud

Hope this helps!

Thanks @josh - this issue is resolved now.

I am getting following error while configuring Enrich Pub/Sub:

sudo docker run \
  -it --rm \
  -v $PWD:/snowplow \
  -e GOOGLE_APPLICATION_CREDENTIALS=/snowplow/praxis-tractor-390009-40c7b1a4b691.json \
  snowplow/snowplow-enrich-pubsub:3.8.0 \
  --enrichments /snowplow/enrichments \
  --iglu-config /snowplow/iglu_resolver.json \
  --config /snowplow/config.nsq.minimal.hocon

Below is the stacktrace:

java.lang.IllegalArgumentException: Topic format enriched invalid
        at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$PubSub.<init>(io.scala:297)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$PubSub$.apply(io.scala:283)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$anon$lazy$macro$127$1.$anonfun$inst$macro$21$6(io.scala:319)
        at shapeless.Generic$$anon$1.from(generic.scala:164)
        at shapeless.LabelledGeneric$$anon$2.from(generic.scala:239)
        at shapeless.LabelledGeneric$$anon$2.from(generic.scala:236)
        at io.circe.generic.extras.decoding.ConfiguredDecoder$NonStrictCaseClassConfiguredDecoder.apply(ConfiguredDecoder.scala:83)
        at io.circe.generic.extras.decoding.ReprDecoder.withDiscriminator(ReprDecoder.scala:85)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$anon$lazy$macro$127$1$$anon$37.configuredDecode(io.scala:319)
        at io.circe.generic.extras.decoding.ConfiguredDecoder$AdtConfiguredDecoder.apply(ConfiguredDecoder.scala:142)
        at io.circe.Decoder.tryDecode(Decoder.scala:70)
        at io.circe.Decoder.tryDecode$(Decoder.scala:69)
        at io.circe.generic.decoding.DerivedDecoder.tryDecode(DerivedDecoder.scala:6)
        at io.circe.Decoder$$anon$13.tryDecode(Decoder.scala:330)
        at io.circe.Decoder$$anon$13.tryDecode(Decoder.scala:330)
        at io.circe.generic.extras.decoding.ReprDecoder.orDefault(ReprDecoder.scala:47)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Outputs$anon$lazy$macro$15$3$$anon$29.configuredDecode(io.scala:260)
        at io.circe.generic.extras.decoding.ConfiguredDecoder$NonStrictCaseClassConfiguredDecoder.apply(ConfiguredDecoder.scala:81)
        at io.circe.Decoder.tryDecode(Decoder.scala:70)
        at io.circe.Decoder.tryDecode$(Decoder.scala:69)
        at io.circe.generic.decoding.DerivedDecoder.tryDecode(DerivedDecoder.scala:6)
        at io.circe.generic.extras.decoding.ReprDecoder.orDefault(ReprDecoder.scala:47)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.config.ConfigFile$anon$lazy$macro$39$1$$anon$1.configuredDecode(ConfigFile.scala:69)
        at io.circe.generic.extras.decoding.ConfiguredDecoder$NonStrictCaseClassConfiguredDecoder.apply(ConfiguredDecoder.scala:81)
        at io.circe.Decoder.tryDecode(Decoder.scala:70)
        at io.circe.Decoder.tryDecode$(Decoder.scala:69)
        at io.circe.generic.decoding.DerivedDecoder.tryDecode(DerivedDecoder.scala:6)
        at io.circe.Decoder$$anon$13.tryDecode(Decoder.scala:330)
        at io.circe.Decoder$$anon$13.apply(Decoder.scala:327)
        at io.circe.Decoder.decodeJson(Decoder.scala:88)
        at io.circe.Decoder.decodeJson$(Decoder.scala:88)
        at io.circe.Decoder$$anon$13.decodeJson(Decoder.scala:326)
        at io.circe.Parser.finishDecode(Parser.scala:12)
        at io.circe.Parser.finishDecode$(Parser.scala:9)
        at io.circe.config.parser$.finishDecode(parser.scala:64)
        at io.circe.config.parser$.decode(parser.scala:164)
        at io.circe.config.syntax$CirceConfigOps$.as$extension0(syntax.scala:202)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$12(FileSystem.scala:56)
        at scala.util.Either.flatMap(Either.scala:341)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$9(FileSystem.scala:55)
        at scala.util.Either.flatMap(Either.scala:341)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$6(FileSystem.scala:54)
        at scala.util.Either.flatMap(Either.scala:341)
        at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$3(FileSystem.scala:53)
        at scala.util.Either.flatMap(Either.scala:341)
        at cats.data.EitherT.$anonfun$subflatMap$1(EitherT.scala:415)
        at delay @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:48)
        at leftMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:50)
        at subflatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:51)
        at flatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.config.ParsedConfigs$.parse(ParsedConfigs.scala:70)
        at delay @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.$anonfun$info$4(Slf4jLoggerInternal.scala:90)
        at delay @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.isInfoEnabled(Slf4jLoggerInternal.scala:65)
        at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.info(Slf4jLoggerInternal.scala:90)
        at liftF @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJsonDir$4(FileSystem.scala:69)
        at flatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.config.ParsedConfigs$.parse(ParsedConfigs.scala:70)
        at map @ fs2.internal.CompileScope.$anonfun$close$9(CompileScope.scala:246)
        at flatMap @ fs2.internal.CompileScope.$anonfun$close$6(CompileScope.scala:245)
        at map @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:222)
        at flatMap @ fs2.internal.CompileScope.$anonfun$close$4(CompileScope.scala:244)
        at map @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:222)
        at flatMap @ fs2.internal.CompileScope.$anonfun$close$2(CompileScope.scala:242)

Hey @srnagpure is your config correct? The error indicates that the topic name for enriched is invalid. Another thing I noted is that you have the nsq name in your config file?

Would you be able to share your config file here?

Hello @josh

Yes, I now corrected the config and it is resolved.

Another thing I wanted to check was I have setup the collector using following command

java -jar snowplow-stream-collector-google-pubsub-2.9.0.jar --config collector.hocon

And I can see, following, which indicates collector is up and healthy and it can find out the topics

[pool-9-thread-1] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Topic bad exists
[pool-5-thread-1] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Topic good exists
[scala-stream-collector-akka.actor.default-dispatcher-6] INFO com.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector$ - REST interface bound to /0:0:0:0:0:0:0:0:8080
[scala-stream-collector-akka.actor.default-dispatcher-6] INFO com.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector$ - Setting health endpoint to healthy

How can I use this collector in Javascript, so that I can see the events flowing till pub/sub?
Should I use
https://<ip_addr_shown_on_vm_inst>:80808/com.snowplowanalytics/tp2

In general how can I test this?

Essentially yes - your Collector endpoint is http://<ip_addr_shown_on_vm_inst>:8080 - this is assuming that:

  1. The VM is exposed to the internet and;
  2. You have allowed access to port 8080 on that VM from the internet

The easiest way to validate is to try and execute the following curl http://<ip_addr_shown_on_vm_inst>:8080/health - if this returns OK your Collector should be good to go.


Its important to note that https will not work here as for that to function you would need to:

  1. Attach an SSL certificate to your Collector deployment and;
  2. Setup a valid DNS A record pointing to your VM instance which matches the certificate you have purchased and configured

The above complexity is why generally if you want to use HTTPS connections we recommend setting up behind a Load Balancer to manage this (as we have done in the quick-start).