Hi,
My Snowplow pipeline is hosted on GCP. Recently, I did two main changes to the pipeline. One is changing from Beam enrich to Enrich PubSub, another is changing from BigQuery loader to BigQuery StreamLoader. And the current version of Enrich PubSub is 2.0.2, and BigQuery StreamLoader is 1.0.0. But after I updated the pipeline, these two services are down very often, nearly three times a week, which caused much inconvenience. The errors info are shown below. And it cannot be recovered even if I rolling-restart the instance group. The only solution for now is to rolling-replace the instance group.
Before, when I used the Beam Enrich and BigQuery Loader, it would create dataflow jobs and instances, and it would never fail like this. I am still confused about the root cause of the failure.
Logs of StreamLoader
Nov 04, 2021 6:12:25 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:520)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:495)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:700)
at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
at io.grpc.internal.CensusTracingModule$TracingClientInterceptor$1$1.onClose(CensusTracingModule.java:399)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:510)
at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:630)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$700(ClientCallImpl.java:518)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:692)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:681)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at io.grpc.Status.asRuntimeException(Status.java:533)
... 24 more
com.google.cloud.bigquery.BigQueryException: Connection reset
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:106)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:460)
at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:978)
at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
at delay$extension @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$1(Bigquery.scala:65)
at delay$extension @ com.permutive.pubsub.consumer.grpc.internal.PubsubSubscriber$.$anonfun$subscribe$1(PubsubSubscriber.scala:72)
at flatMap @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.insert(Bigquery.scala:45)
at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
at main$ @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Main$.main(Main.scala:17)
Caused by: javax.net.ssl.SSLException: Connection reset
at sun.security.ssl.Alert.createSSLException(Alert.java:127)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
at sun.security.ssl.SSLSocketImpl.handleException(SSLSocketImpl.java:1554)
at sun.security.ssl.SSLSocketImpl.access$400(SSLSocketImpl.java:73)
at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:964)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:352)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:149)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:541)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:474)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:591)
at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:458)
at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:978)
at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:108)
at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:192)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:480)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:501)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:439)
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
at sun.security.ssl.TransportContext.fatal(TransportContext.java:355)
... 47 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:457)
at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68)
at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1332)
at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:948)
... 42 more
Logs of Enrich PubSub
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Main - Initialising resources for Enrich job
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.io.FileSystem - Files found in /snowplow/config/enrichments: /snowplow/config/enrichments/campaign_attribution.json, /snowplow/config/enrichments/pii_enrichment_config.json, /sno
wplow/config/enrichments/ua_parser_config.json, /snowplow/config/enrichments/anon_ip.json, /snowplow/config/enrichments/ip_lookups.json, /snowplow/config/enrichments/event_fingerprint_enrichment.json, /snowplow/config/enrichments/referer_parser.j
son, /snowplow/config/enrichments/yauaa_enrichment_config.json, /snowplow/config/enrichments/cookie_extractor_config.json
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Environment - Parsed Iglu Client with following registries: Iglu Central, Michaels Static Repo (HTTP)
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Environment - Parsed following enrichments: campaign_attribution, ua_parser_config, anon_ip, ip_lookups, event_fingerprint_config, referer_parser, yauaa_enrichment_config, cookie
_extractor_config
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Preparing enrichment assets
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Downloading gs://snowplow-tst00/snowplow-temp/enrichments_schema/regexes-latest.yaml
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Downloading gs://snowplow-tst00/snowplow-temp/enrichments_schema/GeoLite2-City.mmdb
[pool-1-thread-2] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Downloading gs://snowplow-tst00/snowplow-temp/enrichments_schema/referers-latest.json
[pool-1-thread-1] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - - Loaded 69 files in 1074 msec using expression: classpath*:UserAgents/**/*.yaml
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion -
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - /-----------------------------------------------------------\
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - | Yauaa 5.23 (v5.23 @ 2021-03-05T11:05:38Z) |
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - +-----------------------------------------------------------+
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - | For more information: https://yauaa.basjes.nl |
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - | Copyright (C) 2013-2021 Niels Basjes - License Apache 2.0 |
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion - \-----------------------------------------------------------/
[pool-1-thread-1] INFO nl.basjes.parse.useragent.utils.YauaaVersion -
[pool-1-thread-1] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Building all matchers for all possible fields.
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Environment - Enrich environment initialized
[pool-1-thread-1] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Main - Running enrichment stream
[pool-1-thread-3] INFO com.snowplowanalytics.snowplow.enrich.pubsub.Assets - Initializing assets refresh stream in /home/snowplow, ticking every 7 days
[pool-1-thread-4] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Initializing Analyzer data structures
[pool-1-thread-4] INFO nl.basjes.parse.useragent.AbstractUserAgentAnalyzerDirect - Built in 1363 msec : Hashmap 154566, Ranges map:2909
Nov 04, 2021 2:05:35 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Policy checks are unavailable.
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Policy checks are unavailable.
at io.grpc.Status.asRuntimeException(Status.java:533)
... 15 more
Nov 04, 2021 5:05:55 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection$2 onFailure
WARNING: failed to send operations
com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1050)
at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1176)
at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:545)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:515)
at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:426)
at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.close(ClientCallImpl.java:689)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.access$900(ClientCallImpl.java:577)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:751)
at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:740)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: 502:Bad Gateway
at io.grpc.Status.asRuntimeException(Status.java:533)
... 15 more