pool-5-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Error while checking if topic good exists: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
[pool-9-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Error while checking if topic bad exists: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
[pool-5-thread-1] ERROR com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Error while checking if topic good exists: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
Hi @srnagpure the error message seems to be pretty clear in that your application has not got sufficient access to access Google PubSub. You need to have some authentication available to the service to be able to access PubSub.
java.lang.IllegalArgumentException: Topic format enriched invalid
at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$PubSub.<init>(io.scala:297)
at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$PubSub$.apply(io.scala:283)
at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$anon$lazy$macro$127$1.$anonfun$inst$macro$21$6(io.scala:319)
at shapeless.Generic$$anon$1.from(generic.scala:164)
at shapeless.LabelledGeneric$$anon$2.from(generic.scala:239)
at shapeless.LabelledGeneric$$anon$2.from(generic.scala:236)
at io.circe.generic.extras.decoding.ConfiguredDecoder$NonStrictCaseClassConfiguredDecoder.apply(ConfiguredDecoder.scala:83)
at io.circe.generic.extras.decoding.ReprDecoder.withDiscriminator(ReprDecoder.scala:85)
at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Output$anon$lazy$macro$127$1$$anon$37.configuredDecode(io.scala:319)
at io.circe.generic.extras.decoding.ConfiguredDecoder$AdtConfiguredDecoder.apply(ConfiguredDecoder.scala:142)
at io.circe.Decoder.tryDecode(Decoder.scala:70)
at io.circe.Decoder.tryDecode$(Decoder.scala:69)
at io.circe.generic.decoding.DerivedDecoder.tryDecode(DerivedDecoder.scala:6)
at io.circe.Decoder$$anon$13.tryDecode(Decoder.scala:330)
at io.circe.Decoder$$anon$13.tryDecode(Decoder.scala:330)
at io.circe.generic.extras.decoding.ReprDecoder.orDefault(ReprDecoder.scala:47)
at com.snowplowanalytics.snowplow.enrich.common.fs2.config.io$Outputs$anon$lazy$macro$15$3$$anon$29.configuredDecode(io.scala:260)
at io.circe.generic.extras.decoding.ConfiguredDecoder$NonStrictCaseClassConfiguredDecoder.apply(ConfiguredDecoder.scala:81)
at io.circe.Decoder.tryDecode(Decoder.scala:70)
at io.circe.Decoder.tryDecode$(Decoder.scala:69)
at io.circe.generic.decoding.DerivedDecoder.tryDecode(DerivedDecoder.scala:6)
at io.circe.generic.extras.decoding.ReprDecoder.orDefault(ReprDecoder.scala:47)
at com.snowplowanalytics.snowplow.enrich.common.fs2.config.ConfigFile$anon$lazy$macro$39$1$$anon$1.configuredDecode(ConfigFile.scala:69)
at io.circe.generic.extras.decoding.ConfiguredDecoder$NonStrictCaseClassConfiguredDecoder.apply(ConfiguredDecoder.scala:81)
at io.circe.Decoder.tryDecode(Decoder.scala:70)
at io.circe.Decoder.tryDecode$(Decoder.scala:69)
at io.circe.generic.decoding.DerivedDecoder.tryDecode(DerivedDecoder.scala:6)
at io.circe.Decoder$$anon$13.tryDecode(Decoder.scala:330)
at io.circe.Decoder$$anon$13.apply(Decoder.scala:327)
at io.circe.Decoder.decodeJson(Decoder.scala:88)
at io.circe.Decoder.decodeJson$(Decoder.scala:88)
at io.circe.Decoder$$anon$13.decodeJson(Decoder.scala:326)
at io.circe.Parser.finishDecode(Parser.scala:12)
at io.circe.Parser.finishDecode$(Parser.scala:9)
at io.circe.config.parser$.finishDecode(parser.scala:64)
at io.circe.config.parser$.decode(parser.scala:164)
at io.circe.config.syntax$CirceConfigOps$.as$extension0(syntax.scala:202)
at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$12(FileSystem.scala:56)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$9(FileSystem.scala:55)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$6(FileSystem.scala:54)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJson$3(FileSystem.scala:53)
at scala.util.Either.flatMap(Either.scala:341)
at cats.data.EitherT.$anonfun$subflatMap$1(EitherT.scala:415)
at delay @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:48)
at leftMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:50)
at subflatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:51)
at flatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.config.ParsedConfigs$.parse(ParsedConfigs.scala:70)
at delay @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.$anonfun$info$4(Slf4jLoggerInternal.scala:90)
at delay @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.isInfoEnabled(Slf4jLoggerInternal.scala:65)
at ifM$extension @ org.typelevel.log4cats.slf4j.internal.Slf4jLoggerInternal$Slf4jLogger.info(Slf4jLoggerInternal.scala:90)
at liftF @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.$anonfun$readJsonDir$4(FileSystem.scala:69)
at flatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.config.ParsedConfigs$.parse(ParsedConfigs.scala:70)
at map @ fs2.internal.CompileScope.$anonfun$close$9(CompileScope.scala:246)
at flatMap @ fs2.internal.CompileScope.$anonfun$close$6(CompileScope.scala:245)
at map @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:222)
at flatMap @ fs2.internal.CompileScope.$anonfun$close$4(CompileScope.scala:244)
at map @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:222)
at flatMap @ fs2.internal.CompileScope.$anonfun$close$2(CompileScope.scala:242)
Hey @srnagpure is your config correct? The error indicates that the topic name for enriched is invalid. Another thing I noted is that you have the nsq name in your config file?
And I can see, following, which indicates collector is up and healthy and it can find out the topics
[pool-9-thread-1] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Topic bad exists
[pool-5-thread-1] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.GooglePubSubSink - Topic good exists
[scala-stream-collector-akka.actor.default-dispatcher-6] INFO com.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector$ - REST interface bound to /0:0:0:0:0:0:0:0:8080
[scala-stream-collector-akka.actor.default-dispatcher-6] INFO com.snowplowanalytics.snowplow.collectors.scalastream.GooglePubSubCollector$ - Setting health endpoint to healthy
How can I use this collector in Javascript, so that I can see the events flowing till pub/sub?
Should I use
https://<ip_addr_shown_on_vm_inst>:80808/com.snowplowanalytics/tp2
Essentially yes - your Collector endpoint is http://<ip_addr_shown_on_vm_inst>:8080 - this is assuming that:
The VM is exposed to the internet and;
You have allowed access to port 8080 on that VM from the internet
The easiest way to validate is to try and execute the following curl http://<ip_addr_shown_on_vm_inst>:8080/health - if this returns OK your Collector should be good to go.
Its important to note that https will not work here as for that to function you would need to:
Setup a valid DNS A record pointing to your VM instance which matches the certificate you have purchased and configured
The above complexity is why generally if you want to use HTTPS connections we recommend setting up behind a Load Balancer to manage this (as we have done in the quick-start).