Enrich pubsub and bq streamloader down often

Hi,

My Snowplow pipeline is hosted on GCP. Recently, I did two main changes to the pipeline. One is changing from Beam enrich to Enrich PubSub, another is changing from BigQuery loader to BigQuery StreamLoader. And the current version of Enrich PubSub is 2.0.2, and BigQuery StreamLoader is 1.0.0. But after I updated the pipeline, these two services are down very often, nearly three times a week, which caused much inconvenience. The errors info are shown below. And it cannot be recovered even if I rolling-restart the instance group. The only solution for now is to rolling-replace the instance group.

Before, when I used the Beam Enrich and BigQuery Loader, it would create dataflow jobs and instances, and it would never fail like this. I am still confused about the root cause of the failure.

Logs of StreamLoader

Nov 04, 2021 6:12:25 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:520)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:495)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
        at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
        at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
        at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
        at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:510)
        at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:630)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:518)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:692)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:681)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 24 more
com.google.cloud.bigquery.BigQueryException: Connection reset
        at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:106)
        at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:460)
        at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:978)
        at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
        at delay$extension @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$1(Bigquery.scala:65)
        at delay$extension @ com.permutive.pubsub.consumer.grpc.internal.PubsubSubscriber$.$anonfun$subscribe$1(PubsubSubscriber.scala:72)
        at flatMap @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.insert(Bigquery.scala:45)
        at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
        at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
        at main$ @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Main$.main(Main.scala:17)
Caused by: javax.net.ssl.SSLException: Connection reset
        at sun.security.ssl.Alert.createSSLException(Alert.java:127)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
        at sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1554)
        at sun.security.ssl.SSLSocketImpl.access$400(SSLSocketImpl.java:73)
        at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:964)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
        at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
        at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:149)
        at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
        at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:458)
        at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:978)
        at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:108)
        at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
        at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
        at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
        at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
        at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
        at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
        at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
        at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:192)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:480)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:501)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:439)
        at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
        at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: java.net.SocketException: Broken pipe (Write failed)
                at java.net.SocketOutputStream.socketWrite0(Native Method)
                at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
                at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
                at sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
                at sun.security.ssl.TransportContext.fatal(TransportContext.java:355)
                ... 47 more
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:457)
        at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68)
        at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1332)
        at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
        at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:948)
        ... 42 more

Logs of Enrich PubSub

[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Main - Initialising resources for Enrich job
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.io.FileSystem - Files found in /snowplow/config/enrichments: /snowplow/config/enrichments/campaign_attribution.json, /snowplow/config/enrichments/pii_enrichment_config.json, /sno
wplow/config/enrichments/ua_parser_config.json, /snowplow/config/enrichments/anon_ip.json, /snowplow/config/enrichments/ip_lookups.json, /snowplow/config/enrichments/event_fingerprint_enrichment.json, /snowplow/config/enrichments/referer_parser.j
son, /snowplow/config/enrichments/yauaa_enrichment_config.json, /snowplow/config/enrichments/cookie_extractor_config.json
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Environment - Parsed Iglu Client with following registries: Iglu Central, Michaels Static Repo (HTTP)
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Environment - Parsed following enrichments: campaign_attribution, ua_parser_config, anon_ip, ip_lookups, event_fingerprint_config, referer_parser, yauaa_enrichment_config, cookie
_extractor_config
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Preparing enrichment assets
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Downloading gs://snowplow-tst00/snowplow-temp/enrichments_schema/regexes-latest.yaml
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Downloading gs://snowplow-tst00/snowplow-temp/enrichments_schema/GeoLite2-City.mmdb
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Downloading gs://snowplow-tst00/snowplow-temp/enrichments_schema/referers-latest.json
[pool-1-thread-1] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - - Loaded 69 files in 1074 msec using expression: classpath*:UserAgents/**/*.yaml
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - 
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - /-----------------------------------------------------------\
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - | Yauaa 5.23 (v5.23 @ 2021-03-05T11:05:38Z)                 |
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - +-----------------------------------------------------------+
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - | For more information: https://yauaa.basjes.nl             |
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - | Copyright (C) 2013-2021 Niels Basjes - License Apache 2.0 |
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - \-----------------------------------------------------------/
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - 
[pool-1-thread-1] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Building all matchers for all possible fields.
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Environment - Enrich environment initialized
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Main - Running enrichment stream
[pool-1-thread-3] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Initializing assets refresh stream in /home/snowplow, ticking every 7 days
[pool-1-thread-4] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Initializing Analyzer data structures
[pool-1-thread-4] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Built in 1363 msec : Hashmap 154566, Ranges map:2909
Nov 04, 2021 2:05:35 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Policy checks are unavailable.
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
        at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Policy checks are unavailable.
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 15 more
Nov 04, 2021 5:05:55 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
        at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 15 more

Hi @phxtorise, sorry to hear you’re having problems the new streaming apps.

BigQuery loader 1.0.0 had a problem in which it did not exit cleanly when it gets an exception like the one you shared. We fixed this in version 1.0.1 by bumping one of the underlying libraries. It won’t make your exception go away, but it should make the loader exit gracefully and restart (depending on your orchestration).

Enrich 3.0.0 (not released yet, but will be soon) will have a similar fix.

Having said all of that… I cannot explain why you get this exception so frequently as three times per week. I was aware this exception could happen, but only as a rare occurrence.

Thanks for your reply, @istreeter. I have followed your advice and upgraded to version 1.0.1. But the streamloader would still down often, twice a day. Now the error info is shown below. The docker process of streamloader will exit and never restart. Do you have any idea about this issue?

com.google.cloud.bigquery.BigQueryException: A load-shedding retryable throttled error could not be retried due to Extensible Stubs retrying limits (see go/stubs-retries). (old status: RPC::STREAM_BROKEN: Connection to server broken (OnChannelErr
or))
        at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
        at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:494)
        at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1065)
        at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
        at delay$extension @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$1(Bigquery.scala:65)
        at delay$extension @ com.permutive.pubsub.consumer.grpc.internal.PubsubSubscriber$.$anonfun$createSubscriber$4(PubsubSubscriber.scala:59)
        at flatMap @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.insert(Bigquery.scala:45)
        at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
        at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
        at main$ @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Main$.main(Main.scala:17)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 500 Internal Server Error
POST https://www.googleapis.com/bigquery/v2/projects/mkp-dev-np1/datasets/atomic/tables/events/insertAll?prettyPrint=false
{
  "code" : 500,
  "errors" : [ {
    "domain" : "global",
    "message" : "A load-shedding retryable throttled error could not be retried due to Extensible Stubs retrying limits (see go/stubs-retries). (old status: RPC::STREAM_BROKEN: Connection to server broken (OnChannelError))",
    "reason" : "backendError"
  } ],
  "message" : "A load-shedding retryable throttled error could not be retried due to Extensible Stubs retrying limits (see go/stubs-retries). (old status: RPC::STREAM_BROKEN: Connection to server broken (OnChannelError))",
  "status" : "INTERNAL"
}
        at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
        at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
        at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
        at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:492)
        at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1065)
        at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:108)
        at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
        at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
        at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
        at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
        at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
        at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
        at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
        at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:192)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:480)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:501)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:439)
        at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
        at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Hi @phxtorise, I suggest you use the --restart=always option when you run the docker container:

docker run \
    --restart=always \
    snowplow/snowplow-bigquery-streamloader:1.0.1 \
   --config=.... --resolver=....

More about docker restart policies on the docker documentation site. It is by design that the streamloader terminates if it encounters an error that it cannot handle, so the restart should be enough to keep your loader running in a production.

I cannot explain the exception you shared, and it looks like something unexpected his happening with the BigQuery insert. Maybe someone else on this board will recognise that exception.