Transformer batch don't actually do anything. Empty shred bucket

Hello after managing to overcome the issues reported here: dataflow-runner-error-on-batch-transform
and the Transformer stop throwing errors, it doesn’t do anything at all. No transformed data is set on the output bucket. I do have enriched data on the archive bucket with the run=2006-01-02-15-04-05 format. I’m pretty sure I’m missing something…

This is the transform log:

23/05/24 18:44:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/24 18:44:43 WARN DependencyUtils: Skip remote jar s3://snowplow-hosted-assets/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar.
23/05/24 18:44:43 INFO DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-X-X-X-X.eu-central-1.compute.internal/X-X-X-X:8032
23/05/24 18:44:44 INFO Configuration: resource-types.xml not found
23/05/24 18:44:44 INFO ResourceUtils: Unable to find 'resource-types.xml'.
23/05/24 18:44:44 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (57344 MB per container)
23/05/24 18:44:44 INFO Client: Will allocate AM container, with 8192 MB memory including 1024 MB overhead
23/05/24 18:44:44 INFO Client: Setting up container launch context for our AM
23/05/24 18:44:44 INFO Client: Setting up the launch environment for our AM container
23/05/24 18:44:44 INFO Client: Preparing resources for our AM container
23/05/24 18:44:45 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/05/24 18:44:49 INFO Client: Uploading resource file:/mnt/tmp/spark-a6524803-b4dd-42e8-8b1b-ee1a5a64727f/__spark_libs__5190507179232416003.zip -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684953255996_0002/__spark_libs__5190507179232416003.zip
23/05/24 18:44:54 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
23/05/24 18:44:54 INFO Client: Uploading resource s3://snowplow-hosted-assets/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684953255996_0002/snowplow-transformer-batch-5.4.1.jar
23/05/24 18:44:56 INFO S3NativeFileSystem: Opening 's3://snowplow-hosted-assets/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar' for reading
23/05/24 18:45:00 WARN Client: Same name resource file:///home/hadoop/snowplow-transformer-batch-5.4.1.jar added multiple times to distributed cache
23/05/24 18:45:00 INFO Client: Uploading resource file:/etc/hudi/conf.dist/hudi-defaults.conf -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684953255996_0002/hudi-defaults.conf
23/05/24 18:45:00 INFO Client: Uploading resource file:/mnt/tmp/spark-a6524803-b4dd-42e8-8b1b-ee1a5a64727f/__spark_conf__7220764117677799891.zip -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684953255996_0002/__spark_conf__.zip
23/05/24 18:45:00 INFO SecurityManager: Changing view acls to: hadoop
23/05/24 18:45:00 INFO SecurityManager: Changing modify acls to: hadoop
23/05/24 18:45:00 INFO SecurityManager: Changing view acls groups to: 
23/05/24 18:45:00 INFO SecurityManager: Changing modify acls groups to: 
23/05/24 18:45:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
23/05/24 18:45:01 INFO Client: Submitting application application_1684953255996_0002 to ResourceManager
23/05/24 18:45:01 INFO YarnClientImpl: Submitted application application_1684953255996_0002
23/05/24 18:45:02 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:02 INFO Client: 
	 client token: N/A
	 diagnostics: AM container is launched, waiting for AM container to Register with RM
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: default
	 start time: 1684953901156
	 final status: UNDEFINED
	 tracking URL: http://ip-X-X-X-X.eu-central-1.compute.internal:20888/proxy/application_1684953255996_0002/
	 user: hadoop
23/05/24 18:45:03 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:04 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:05 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:06 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:07 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:08 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:09 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:10 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:11 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:12 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:13 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:14 INFO Client: Application report for application_1684953255996_0002 (state: ACCEPTED)
23/05/24 18:45:15 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:15 INFO Client: 
	 client token: N/A
	 diagnostics: N/A
	 ApplicationMaster host: ip-X-X-X-X.eu-central-1.compute.internal
	 ApplicationMaster RPC port: 41047
	 queue: default
	 start time: 1684953901156
	 final status: UNDEFINED
	 tracking URL: http://ip-X-X-X-X.eu-central-1.compute.internal:20888/proxy/application_1684953255996_0002/
	 user: hadoop
23/05/24 18:45:16 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:17 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:18 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:19 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:20 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:21 INFO Client: Application report for application_1684953255996_0002 (state: RUNNING)
23/05/24 18:45:22 INFO Client: Application report for application_1684953255996_0002 (state: FINISHED)
23/05/24 18:45:22 INFO Client: 
	 client token: N/A
	 diagnostics: N/A
	 ApplicationMaster host: ip-X-X-X-X.eu-central-1.compute.internal
	 ApplicationMaster RPC port: 41047
	 queue: default
	 start time: 1684953901156
	 final status: SUCCEEDED
	 tracking URL: http://ip-X-X-X-X.eu-central-1.compute.internal:20888/proxy/application_1684953255996_0002/
	 user: hadoop
23/05/24 18:45:22 INFO ShutdownHookManager: Shutdown hook called
23/05/24 18:45:22 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-a6524803-b4dd-42e8-8b1b-ee1a5a64727f
23/05/24 18:45:22 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-b1ebc2e6-9f05-4b55-ac91-64abb760d6b9
Command exiting with ret '0'

This is my transformer config:

{
  "input": "s3://{ARCHIVE_BUCKET}/",
  "output": {
    "path": "s3://{TRANSFORMER_BUCKET}/",
    "compression": "GZIP",
    "region": "{REGION}",
  }
  "queue": {
    "type": "sqs",
    "queueName": "{QUEUE}",
    "region": "{REGION}"
  }
  "formats": {
    "transformationType": "shred",
  }
}

Thank you

I dont quite get why, but only after I defined /input on the input S3 bucket path and /output on the output bucket path it started to work…

Also, the SQS needs to be FIFO otherwise won’t work. Don’t remember seeing that on the documentation…

{
  "input": "s3://{ARCHIVE_BUCKET}/input/",
  "output": {
    "path": "s3://{TRANSFORMER_BUCKET}/output/",
    "compression": "GZIP",
    "region": "{REGION}",
  }
  "queue": {
    "type": "sqs",
    "queueName": "{QUEUE}",
    "region": "{REGION}"
  }
  "formats": {
    "transformationType": "shred",
  }
}