I am using the Azure implementation of snowplow and we are missing data in our events table that is being uploaded to our blob storage successfully.
I have looked at the logs of the collector, loader and transformer. The transformer seems to have logs that correlate to the missing data.
[io-compute-4] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.generic.Partitioned - Closing window run=2023-08-30-03-35-00-0eba2f06-1eae-4f22-b4ba-1398f84f0c8e
[io-compute-2] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.generic.KeyedEnqueue - Pulling 6 elements for output=good/
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to request timeout.
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Cancelled in-flight METADATA request with correlation id 8 due to node 0 being disconnected (elapsed time since creation: 49640ms, elapsed time since send: 49640ms, request timeout: 30000ms)
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Cancelled in-flight PRODUCE request with correlation id 9 due to node 0 being disconnected (elapsed time since creation: 30008ms, elapsed time since send: 30008ms, request timeout: 30000ms)
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition snowflake-loader-topic-1 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
[io-compute-4] ERROR com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Shutdown - Error on sinking and checkpointing events
org.apache.kafka.common.errors.NetworkException: Disconnected from node 0
[io-compute-5] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Shutdown - Source of events was cancelled
[io-compute-5] ERROR com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Run - Transformer shutting down
The issue seems to occur because of the “Disconnecting from node 0 due to request timeout.” line.
I am also running a very small instance only handling 100 or so events a minute.
Hi @Chris18 did you instrument this yourself or was this done with the quick-start examples? Can you share any networking / vm type details? As much context as possible please!
Hi @josh, it was done with the quick-start examples using terraform. So if I understand correctly the networking (azure v-net) was setup by the base terraform. As for the vm’s we are using the Standard_B2s vm’s for our scale sets.
Is there any specific additional context you need?
Hi @Chris18 thanks for sharing that! Other question that comes to mind is whether or not the missing data made it to the storage container or not? If you look in the folder is there a run= directory which relates to the missing time window?
The hypothesis here is that perhaps the application failed after the data had been pushed to the storage container but before it could send the message to load the data.
Hi @josh, this was my thought process as well. The data does make it to the storage container in the correct run= folder and has its inner output=good folder. When I look at the data in the output=good folder the missing rows that I expected to be in my events table is there.
Thats good news then! Is there also a “shredding_complete.json” file in that folder? If it is it should suffice to “push” the contents of that file as a message into the loaders queue topic to trigger it to be loaded.
There is a “shredding_complete.json” file in the folder. How would I go about pushing the contents to the loaders queue topic? Is there a way to tell snowplow to go back and refresh from a certain run?
So any tool that can send a message into Kafka will do the trick realistically - I have used our own tool Snowbridge before to do this. Once you can push a message to the topic then you just need to pipe the contents of that file in and it should just work.
Is there a way to tell snowplow to go back and refresh from a certain run?
Not currently as far as I am aware but I will ask the team.
Hi @josh Update on this, it is still happening but now when it fails it does not always get to finish uploading anything to blob storage. It looks like it is just dependent on when it the kafka node disconnects. Previously it must have just been a coincidence that the couple I checked did have their “shredding_complete.json” uploaded to blob.
Hi @Chris18 sorry to hear about the stability issues! Could you please share any logs from the instance from the moments it is failing in to help us in debugging whats going wrong?
Sure, this is what I see at the point of failure. The container then proceeds to restart after this occurs.
[io-compute-3] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.generic.Partitioned - Closing window run=2023-09-06-19-05-00-041803dc-c5f8-44d9-9a03-58119d8f21fa
[io-compute-4] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.generic.KeyedEnqueue - Pulling 47 elements for output=good/
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Disconnecting from node 0 due to request timeout.
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Cancelled in-flight METADATA request with correlation id 8 due to node 0 being disconnected (elapsed time since creation: 54025ms, elapsed time since send: 54025ms, request timeout: 30000ms)
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Cancelled in-flight PRODUCE request with correlation id 9 due to node 0 being disconnected (elapsed time since creation: 30022ms, elapsed time since send: 30022ms, request timeout: 30000ms)
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition snowflake-loader-topic-1 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
[io-compute-3] ERROR com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Shutdown - Error on sinking and checkpointing events
org.apache.kafka.common.errors.NetworkException: Disconnected from node 0
[io-compute-5] INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Shutdown - Source of events was cancelled
[io-compute-3] ERROR com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.Run - Transformer shutting down
[fs2-kafka-consumer-17] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-snowplow-snowflake-transformer-1, groupId=snowplow-snowflake-transformer] Revoke previously assigned partitions enriched-topic-0, enriched-topic-1
[fs2-kafka-consumer-17] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-snowplow-snowflake-transformer-1, groupId=snowplow-snowflake-transformer] The pause flag in partitions [enriched-topic-0] will be removed due to revocation.
[fs2-kafka-consumer-17] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-snowplow-snowflake-transformer-1, groupId=snowplow-snowflake-transformer] Member snowplow-namespace.servicebus.windows.net:c:snowplow-snowflake-transformer:I:consumer-snowplow-snowflake-transformer-1-2d9744d0a7d4417b89ddcae9044f8ea9 sending LeaveGroup request to coordinator snowplow-namespace.servicebus.windows.net:9093 (id: 2147483647 rack: null) due to the consumer is being closed
[fs2-kafka-consumer-17] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-snowplow-snowflake-transformer-1, groupId=snowplow-snowflake-transformer] Resetting generation and member id due to: consumer pro-actively leaving the group
[fs2-kafka-consumer-17] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-snowplow-snowflake-transformer-1, groupId=snowplow-snowflake-transformer] Request joining group due to: consumer pro-actively leaving the group
[fs2-kafka-consumer-17] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[fs2-kafka-consumer-17] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[fs2-kafka-consumer-17] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[fs2-kafka-consumer-17] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.consumer for consumer-snowplow-snowflake-transformer-1 unregistered
[io-compute-3] INFO org.http4s.blaze.client.PoolManager - Shutting down connection pool: curAllocated=0 idleQueues.size=0 waitQueue.size=0 maxWaitQueueLimit=256 closed=false
[io-compute-blocker-3] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 60000 ms.
[io-compute-blocker-3] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed
[io-compute-blocker-3] INFO org.apache.kafka.common.metrics.Metrics - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[io-compute-blocker-3] INFO org.apache.kafka.common.metrics.Metrics - Metrics reporters closed
[io-compute-blocker-3] INFO org.apache.kafka.common.utils.AppInfoParser - App info kafka.producer for producer-1 unregistered
org.apache.kafka.common.errors.NetworkException: Disconnected from node 0```
Hi @Chris18 I have run through these scenarios as closely as I can myself by manually killing each application in the pipeline and restarting them and they always pick up where they last managed to successfully checkpoint (which adheres to our at-least-once pipeline semantics).
Further to this the transformer which produces the shredding_complete.json files will only checkpoint the events it has consumed if it manages to write that file successfully → which I have also now validated by killing the process mid processing window and then restarting it some minutes later (without any noticeable gaps in the events which land in Snowflake).
Just to confirm are you seeing gaps in your data in Snowflake which can’t be accounted for by valid shredding_complete.json containing folders?
Hi @josh. Yes there have been cases where there is no shredding_complete.json file but more often than not there is a shredding_complete.json file in the blob but the data is not loaded into the events table.
I did recently change how often the transformer published a message for the loader from 5 minutes to 30 minutes and the frequency of the node disconnected error has decreased but we still get it every couple of days and then have missing data as a result of it. The run and its meta data isn’t added to the manifests table either. How often do you have the loader window period set to run?
Yes there have been cases where there is no shredding_complete.json file but more often than not there is a shredding_complete.json file in the blob but the data is not loaded into the events table.
And when you re-send that message into the loaders queue does that flesh out the missing data in the destination warehouse?
When I resend the shredding_complete.json as a payload to the loader queue it seems to break the loader vmss. The loader docker then just keeps restarting with the following logs:
INFO Successfully logged in.
INFO Kafka version: 3.4.0
INFO Kafka commitId: 2e1947d240607d53
INFO Kafka startTimeMs: 1695284268017
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Subscribed to topic(s): snowflake-loader-topic
INFO com.snowplowanalytics.snowplow.rdbloader: RDB Loader 5.7.1 has started.
INFO HikariPool-1 - Starting...
INFO HikariPool-1 - Start completed.
INFO Loader: Target check is completed
INFO Loader: No operation prepare step is completed
INFO Loader: Database schema initialization is completed
INFO Loader: Events table initialization is completed
INFO Manifest: No changes needed on the manifest table
INFO Loader: Manifest initialization is completed
INFO Loader: load_tstamp column already exists
INFO Loader: Adding load_tstamp column is completed
INFO FolderMonitoring: Configuration for monitoring.folders hasn't been provided - monitoring is disabled
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Cluster ID: snowplow-namespace.servicebus.windows.net
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Discovered group coordinator snowplow-namespace.servicebus.windows.net:9093 (id: 2147483647 rack: null)
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] (Re-)joining group
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Successfully joined group with generation Generation{generationId=12, memberId='snowplow-namespace.servicebus.windows.net:c:snowplow-snowflake-loader:I:consumer-snowplow-snowflake-loader-1-49aef0c6db9349ed9d6c5e8ef9ac9bf5', protocol='range'}
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Finished assignment for group at generation 12: {snowplow-namespace.servicebus.windows.net:c:snowplow-snowflake-loader:I:consumer-snowplow-snowflake-loader-1-49aef0c6db9349ed9d6c5e8ef9ac9bf5=Assignment(partitions=[snowflake-loader-topic-0, snowflake-loader-topic-1])}
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Successfully synced group in generation Generation{generationId=12, memberId='snowplow-namespace.servicebus.windows.net:c:snowplow-snowflake-loader:I:consumer-snowplow-snowflake-loader-1-49aef0c6db9349ed9d6c5e8ef9ac9bf5', protocol='range'}
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Notifying assignor about the new Assignment(partitions=[snowflake-loader-topic-0, snowflake-loader-topic-1])
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Adding newly assigned partitions: snowflake-loader-topic-0, snowflake-loader-topic-1
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Setting offset for partition snowflake-loader-topic-0 to the committed offset FetchPosition{offset=1089, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[snowplow-namespace.servicebus.windows.net:9093 (id: 0 rack: null)], epoch=absent}}
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Setting offset for partition snowflake-loader-topic-1 to the committed offset FetchPosition{offset=1164, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[snowplow-namespace.servicebus.windows.net:9093 (id: 0 rack: null)], epoch=absent}}
ERROR Loader: Loader shutting down
INFO HikariPool-1 - Shutdown initiated...
INFO HikariPool-1 - Shutdown completed.
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Revoke previously assigned partitions snowflake-loader-topic-0, snowflake-loader-topic-1
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Member snowplow-namespace.servicebus.windows.net:c:snowplow-snowflake-loader:I:consumer-snowplow-snowflake-loader-1-49aef0c6db9349ed9d6c5e8ef9ac9bf5 sending LeaveGroup request to coordinator snowplow-namespace.servicebus.windows.net:9093 (id: 2147483647 rack: null) due to the consumer is being closed
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Resetting generation and member id due to: consumer pro-actively leaving the group
INFO [Consumer clientId=consumer-snowplow-snowflake-loader-1, groupId=snowplow-snowflake-loader] Request joining group due to: consumer pro-actively leaving the group
INFO Metrics scheduler closed
INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter
INFO Metrics reporters closed
INFO App info kafka.consumer for consumer-snowplow-snowflake-loader-1 unregistered
INFO Shutting down connection pool: curAllocated=0 idleQueues.size=0 waitQueue.size=0 maxWaitQueueLimit=256 closed=false
java.lang.NullPointerException
at java.base/java.lang.String.<init>(Unknown Source)
at fs2.kafka.GenericDeserializer$.$anonfun$string$2(Deserializer.scala:213)
at cats.ApplicativeError.catchNonFatal(ApplicativeError.scala:269)
at cats.ApplicativeError.catchNonFatal$(ApplicativeError.scala:268)
at product @ fs2.concurrent.SignallingRef$.of(Signal.scala:241)
at map @ fs2.kafka.ConsumerRecord$.fromJava(ConsumerRecord.scala:184)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$2(KafkaConsumerActor.scala:265)
at map @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$records$1(KafkaConsumerActor.scala:267)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$28(KafkaConsumer.scala:267)
at traverse @ fs2.kafka.KafkaConsumer$$anon$1.$anonfun$partitionsMapStream$35(KafkaConsumer.scala:302)
at map @ fs2.kafka.internal.KafkaConsumerActor.records(KafkaConsumerActor.scala:269)
at delay @ fs2.kafka.internal.Blocking$$anon$2.apply(Blocking.scala:26)
at flatMap @ fs2.kafka.internal.KafkaConsumerActor.pollConsumer$1(KafkaConsumerActor.scala:293)
at flatMap @ fs2.kafka.internal.KafkaConsumerActor.$anonfun$poll$12(KafkaConsumerActor.scala:410)
at flatMap @ fs2.kafka.internal.KafkaConsumerActor.<init>(KafkaConsumerActor.scala:407)
NOTE: Picked up JDK_JAVA_OPTIONS: -XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75
INFO Azure Identity => EnvironmentCredential invoking ClientSecretCredential
INFO Azure Identity => EnvironmentCredential invoking ClientSecretCredential
So it doesn’t look like uploading the missing json is the solution, all I did was publish a message to the queue with the shredding_complete.json contents as the payload, did I potentially miss a step when publishing?
Hi @josh,
I did it via the azure portal on the specific snowflake-loader-topic, the picture below has the payload I sent through as well. Immediately after doing this my loader docker broke and keeps restarting.