Bigquery mutator and repeater works abnormally

BigQuery is what I use to store the data. I launched a VM instance on GCP compute engine to run the ETL pipeline. I run BigQuery mutator and loader to load the data from enrichment and run BigQuery repeater to reinsert failed-insert data into BigQuery.

There are two weird points.

  1. the repeater is always automatically down, not sure the reason. And the loader and mutator’s process looks still working according to docker ps
  2. the mutator actually doesn’t work as well since it cannot automatically add the custom schemas which doesn’t exist in BigQuery. But the weird point is that it shows the process of the mutator is still working.

I am wondering whether anyone may know the real reason for the two situations above, and how I can debug it.

Hi @phxtorise,

I think we’d need to see your logs, especially Repeater’s. If it automatically goes down there must be an error. Also Mutator either should be printing error messages or there’s a wrong config and it doesn’t get anything from types subscription.

1 Like

Thanks for reply. Following is the repeater’s log.

[scala-execution-context-global-9] WARN com.networknt.schema.JsonMetaSchema - Unknown keyword exclusiveMinimum - you should define your own Meta Schema. If the keyword is irrelevant for validation
, just use a NonValidationKeyword
[scala-execution-context-global-9] INFO com.snowplowanalytics.snowplow.storage.bigquery.repeater.Repeater - Initializing Repeater from bq-failed-inserts to events with 64 streams
[scala-execution-context-global-9] INFO com.snowplowanalytics.snowplow.storage.bigquery.repeater.Repeater - Statistics: 0 rows inserted, 0 rows rejected in 0 minutes.
[scala-execution-context-global-37] INFO com.snowplowanalytics.snowplow.storage.bigquery.repeater.Repeater - Preparing write to a snowplow-temp/dead-end/2021-09-09-0501541771 with 0 items
[scala-execution-context-global-37] INFO com.snowplowanalytics.snowplow.storage.bigquery.repeater.Repeater - Written snowplow-temp/dead-end/2021-09-09-0501541771 of 0 bytes
[scala-execution-context-global-37] WARN com.snowplowanalytics.snowplow.storage.bigquery.repeater.Repeater - Terminating. Flushed 0 desperates

This is what I use to launch the repeater in gcp vm instance

bq_version=0.6.1
temp_bucket=xxxxx
project_id=xxxxx
sudo docker run \
  -v $PWD/snowplow/config:/snowplow/config \
  snowplow/snowplow-bigquery-repeater:$bq_version \
  --config=$(cat $PWD/snowplow/config/bigquery_config.json | base64 -w 0) \
  --resolver=$(cat $PWD/snowplow/config/iglu_resolver.json | base64 -w 0) \
  --failedInsertsSub=bq-failed-inserts-sub \
  --deadEndBucket=gs://$temp_bucket/dead-end \
  --desperatesBufferSize=20 \
  --desperatesWindow=20 \
  --backoffPeriod=900
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Mutator is listening bq-types-sub PubSub subscription
Sep 13, 2021 8:33:17 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1 onFailure
SEVERE: terminated streaming with exception
com.google.api.gax.rpc.PermissionDeniedException: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:55)
        at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$1.onFailure(StreamingSubscriberConnection.java:238)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
        at com.google.api.core.AbstractApiFuture$InternalSettableFuture.setException(AbstractApiFuture.java:95)
        at com.google.api.core.AbstractApiFuture.setException(AbstractApiFuture.java:77)
        at com.google.api.core.SettableApiFuture.setException(SettableApiFuture.java:52)
        at com.google.cloud.pubsub.v1.StreamingSubscriberConnection$StreamingPullResponseObserver.onError(StreamingSubscriberConnection.java:174)
        at com.google.api.gax.tracing.TracedResponseObserver.onError(TracedResponseObserver.java:103)
        at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:84)
        at com.google.api.gax.rpc.StateCheckingResponseObserver.onError(StateCheckingResponseObserver.java:86)
        at com.google.api.gax.grpc.GrpcDirectStreamController$ResponseObserverAdapter.onClose(GrpcDirectStreamController.java:149)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:510)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:630)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:518)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:692)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:681)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.api.gax.rpc.PermissionDeniedException: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:55)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
        at com.google.api.gax.grpc.ExceptionResponseObserver.onErrorImpl(ExceptionResponseObserver.java:82)
        ... 25 more
Caused by: io.grpc.StatusRuntimeException: PERMISSION_DENIED: User not authorized to perform this action.
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 24 more

Above is the error log of the bigquery mutator. Did anyone know how to fix this error?

It straight up says that the user / service account you are using to run this as does not have permissions to perform the action. Permissions needed are for PubSub and BigQuery Data Editing.

1 Like

Thanks @blackknight467 ! I will try to get the permission and try it again.