Missing shredding_complete.json files during high volume

We observed a sudden spike in event volume, with approximately 108,000 events pushed to our Snowplow pipeline within a 2-minute window. Our current setup includes the Databricks Loader, which utilizes an SQS queue to generate COPY INTO SQL queries for loading data into our Databricks cluster.

During this high volume period, the pipeline appeared to function correctly up to a certain point. However, we encountered an issue where the shredding_complete.json files were no longer being generated for some partitions. This suggests that the generated files might not have been pushed to the SQS queue.

For reference, the shredding complete file was generated and visible initially:

Subsequently, no shredding_complete.json files were generated for certain partitions:

Notably, while some partitions contain the shredding_complete.json files, others do not. We are concerned that this may result in data being missed or not fully processed.

Could you please confirm if this behavior is indicative of a bug or if there are additional steps we should take to ensure all data is being correctly processed during high-volume periods?

Thank you for your assistance in resolving this matter.

From the Log -
[WARNING] Your app’s responsiveness to a new asynchronous event (such as a new connection, an upstream response, or a timer) was in excess of 100 milliseconds.
Your CPU is probably starving. Consider increasing the granularity of your delays or adding more cedes. This may also be a sign that you are unintentionally running blocking I/O operations (such as File or InetAddress) without the blocking combinator.

Sometimes it is showing this also:
INFO software.amazon.kinesis.coordinator.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=1, coreThreads=0, leasesOwned=1, largestPoolSize=2, maximumPoolSize=2147483647)
WARN org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport - Bulk delete operation failed to delete all objects; failure count = 3
WARN org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport - AccessDenied: transformed/good/: Access Denied
WARN org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport - AccessDenied: transformed/: Access Denied
WARN org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport - AccessDenied: transformed/good/widerow/: Access Denied
INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.generic.Partitioned - Closing window run=2024-08-12-07-44-00-094f5feb-a3b4-4a5e-b22e-b4abe47c21f2
INFO com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.sinks.generic.Partitioned - Opening window run=2024-08-12-07-46-00-094f5feb-a3b4-4a5e-b22e-b4abe47c21f2

Hi @Harshit do you mind sharing the details of:

  1. What versions of the apps you are using?
  2. How are they running currently? (ECS, EKS, EC2?)
  3. What is the current CPU / Memory allocation to the apps?
  4. The config settings of the applications
  5. Any command line / environment settings like JAVA_OPTS?

As much detail as possible please!

  1. What versions of the apps you are using? - transformer-kinesis:5.6.0
  2. How are they running currently? (ECS, EKS, EC2?) - EC2
  3. What is the current CPU / Memory allocation to the apps? - t3a.small instance type which is 2vCPU and 2 GiB memory.
  4. The config settings of the applications -
{
  "input": {
    "type": "kinesis",
    "appName": "sp-transformer-server-wrp",
    "streamName": "sp-enriched-stream",
    "region": "<region_name>",
    "position": "TRIM_HORIZON"
    "retrievalMode": {
      "type": "Polling"
      "maxRecords": 10000
    }
  }

  "output": {
    "path": "s3a://<bucket_name>/transformed/good/widerow/parquet",
    "compression": "GZIP",
    "region": "<region_name>"
  }
  "windowing": "2 minutes"

    "queue": {
    "type": "sqs",
    "queueName": "sp-db-loader.fifo",
    "region": "<region_name>"
  }

  "formats": {
    "transformationType": "widerow"
    "fileFormat": "parquet"
  }

  "monitoring": {
    "metrics": {
      "cloudWatch": false
    }
  }

  "telemetry": {
    "disable": true
    "interval": 15 minutes
    "method": "POST"
    "collectorUri": ""
    "collectorPort": 443
    "secure": true
    "userProvidedId": ""
    "autoGeneratedId": ""
    "moduleName": "transformer-kinesis-ec2"
    "moduleVersion": "0.3.4"
  }
}
  1. Any command line / environment settings like JAVA_OPTS? -
    'JAVA_OPTS=-Dconfig.override_with_env_vars=true -Dorg.slf4j.simpleLogger.defaultLogLevel=info -XX:MinRAMPercentage=50 -XX:MaxRAMPercentage=75'

Last one do you have graphs of the CPU usage over the peak periods? You should be able to find those in CloudWatch.

I didn’t find the log from that day. So I just try to recreate this issue by pushing the events and I was able to recreate in just sending around 46k events in 10 minutes interval.

Enriched Stream (Kinesis):

Transformer Server (EC2):

Hey @Harshit so it looks like your Streaming Transformer is getting overwhelmed by the traffic volume and eventually is starting to crash.

Generally those folders where you do not have shredding_complete.json are for buckets that were starting to be populated but the app has crashed before it could complete - the good news in that case it will not checkpoint so on restart should start processing from the last successful checkpoint.

For your volumes and to add some safety I would recommend either a t3a.medium or better would be an m5a.large which should allow you to increase the throughput quite dramatically.