BigQuery Streamloader throwing errors and then stops working

Hi there,
we have a problem with our BigQuery Streamloader deployment on Kubernetes. This is our initial deployment and we use v1.4.1.
The loader works as long as no events fail during loading (e.g. the events contain custom contexts that are not yet available in the output bigquery table). If the events fail to be loaded to Bigquery (e.g. contain new contexts) not only the load process fails which is expected at this point but the loader throws a couple of errors which are all identical (see screenshot below). Then, the loader stops working and is not processing any further events. We have traced the error and here (
snowplow-bigquery-loader/Shutdown.scala at ac7baca0353e59db65a3f0e06ed3e98c3e44eef8 · snowplow-incubator/snowplow-bigquery-loader · GitHub) it looks like the loader receives a SIGINT signal.
Any idea how to fix this issue?

Thanks in advance!

Hi @mike , Hi @dilyan , do you have any ideas on this? Thanks a lot!

Are you able to share your BQ stream loader configuration? Failed inserts should go into PubSub (if they can’t be inserted) rather than terminating the pipeline.

Hi @mike , thanks for your reply. This is our stream loader configuration:

{
    "projectId": "XXX"
    "loader": {
        "input": {
            "type": "PubSub"
            "subscription": "sp-enriched-good-sub"
        }
        "output": {
            "good": {
                "type": "BigQuery"
                "datasetId": "snowplow"
                "tableId": "events"
            }
            "bad": {
                "type": "PubSub"
                "topic": "bq-bad-rows"
            }
            "types": {
                "type": "PubSub"
                "topic": "bq-types"
            }
            "failedInserts": {
                "type": "PubSub"
                "topic": "bq-failed-inserts"
            }
        }
    }
    "mutator": {
        "input": {
            "type": "PubSub"
            "subscription": "bq-types-sub"
        }
        "output": {
            "good": ${loader.output.good} # will be automatically inferred
        }
    }
    "repeater": {
        "input": {
            "type": "PubSub"
            "subscription": "bq-failed-inserts-sub"
        }
        "output": {
            "good": ${loader.output.good} # will be automatically inferred
            "deadLetters": {
                "type": "Gcs"
                "bucket": "gs://XXXX"
            }
        }
    }
    "monitoring": {
        "stdout": {
            "period": "10 min"
            "prefix": "snowplow.monitoring"
        }
    }
}

We run the image snowplow/snowplow-bigquery-streamloader:1.4.1 with the config and resolver provided as files:

args:
[
    "--resolver",
    "/config/resolver.json",
    "--config",
    "/config/loader.config",
]

Currently our pipeline only processes very few events since we are implementing the tracking at the moment. Just in case this information is relevant.

Thanks!

Hi @mike , we tried to run an older version of the Bigquery stream loader (v1.1.0), but there is still a problem. Looking at the error message there, it seems to be an issue with SSL. The stream loader processes some events but then fails after a couple of events. So the configuration and deployment is not completely wrong. Any idea what might causes this?

Thanks!

fs2.CompositeFailure: Multiple exceptions were thrown (3), first com.google.cloud.bigquery.BigQueryException: Remote host terminated the handshake
	at fs2.CompositeFailure$.apply(CompositeFailure.scala:56)
	at fs2.CompositeFailure$.apply(CompositeFailure.scala:45)
	at fs2.Stream$.$anonfun$parJoin$7(Stream.scala:2086)
	at scala.Option.fold(Option.scala:263)
	at fs2.Stream$.$anonfun$parJoin$5(Stream.scala:2085)
	at fs2.concurrent.SignallingRef$SignallingRefImpl.$anonfun$update$1(Signal.scala:238)
	at fs2.concurrent.SignallingRef$SignallingRefImpl.$anonfun$modify$1(Signal.scala:242)
	at get @ fs2.internal.CompileScope.isInterrupted(CompileScope.scala:397)
	at map @ fs2.concurrent.SignallingRef$SignallingRefImpl.get(Signal.scala:170)
	at flatMap @ fs2.Stream$.signalResult$1(Stream.scala:2155)
	at map @ fs2.internal.CompileScope.$anonfun$close$9(CompileScope.scala:246)
	at flatMap @ fs2.internal.CompileScope.$anonfun$close$6(CompileScope.scala:245)
	at map @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:222)
	at flatMap @ fs2.internal.CompileScope.$anonfun$close$4(CompileScope.scala:244)
	at map @ fs2.internal.CompileScope.fs2$internal$CompileScope$$traverseError(CompileScope.scala:222)
	at flatMap @ fs2.internal.CompileScope.$anonfun$close$2(CompileScope.scala:242)
	at modify @ fs2.internal.CompileScope.$anonfun$open$2(CompileScope.scala:142)
	at flatMap @ fs2.internal.CompileScope.close(CompileScope.scala:241)
	at main$ @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Main$.main(Main.scala:20)
	at get @ fs2.internal.CompileScope.isInterrupted(CompileScope.scala:397)
Caused by: com.google.cloud.bigquery.BigQueryException: Remote host terminated the handshake
	at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
	at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:494)
	at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1065)
	at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
	at delay$extension @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$1(Bigquery.scala:65)
	at delay$extension @ com.permutive.pubsub.consumer.grpc.internal.PubsubSubscriber$.$anonfun$createSubscriber$4(PubsubSubscriber.scala:59)
	at flatMap @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.insert(Bigquery.scala:45)
	at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
	at apply @ fs2.Stream$InvariantOps$.observeAsync$extension(Stream.scala:3667)
	at main$ @ com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Main$.main(Main.scala:20)
Caused by: javax.net.ssl.SSLHandshakeException: Remote host terminated the handshake
	at sun.security.ssl.SSLSocketImpl.handleEOF(SSLSocketImpl.java:1561)
	at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1391)
	at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1291)
	at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:435)
	at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
	at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:197)
	at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
	at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
	at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:264)
	at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:113)
	at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
	at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
	at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
	at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:492)
	at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1065)
	at com.snowplowanalytics.snowplow.storage.bigquery.streamloader.Bigquery$.$anonfun$mkInsert$2(Bigquery.scala:65)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:108)
	at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
	at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
	at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
	at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
	at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
	at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
	at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
	at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:192)
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:480)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:501)
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:439)
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
	at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	Suppressed: java.net.SocketException: Broken pipe (Write failed)
		at java.net.SocketOutputStream.socketWrite0(Native Method)
		at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
		at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
		at sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
		at sun.security.ssl.TransportContext.fatal(TransportContext.java:355)
		at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
		at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:438)
		... 37 more
Caused by: java.io.EOFException: SSL peer shut down incorrectly
	at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:167)
	at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109)
	at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1383)

Hi @tziegler , I haven’t been able to reproduce the same error you’ve posted; however, we know that there is a bug in 1.4.1 when some events fail to be inserted. You can follow this issue for the fix.

In the meantime, can you please use 1.4.0 where this bug is not present.

1 Like

Hi @dilyan , thanks for pointing me to the issue!
I tried to use 1.4.0, but the error also was present. However, v1.1.0 works. We will use this for now and upgrade later. Thanks!

Hi @tziegler , just FYI, we’ve fixed that bug and 1.4.2 is now available if you’d like to give it a try.

2 Likes

1.4.2 seems to work, we haven’t seen the problem any more.
Awesome @dilyan !