Dataflow runner error on batch transform

Hello,

I’m trying to setup an AWS EMR with Step jobs for enrich data transformations (to be afterwards loaded into RedShift) but when running my steps, I’m getting the following error:

INFO[0451] 23/05/22 16:21:30 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/05/22 16:21:30 WARN SparkConf: The configuration key 'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.driver.memoryOverhead' instead.
23/05/22 16:21:30 WARN SparkConf: The configuration key 'spark.yarn.executor.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.executor.memoryOverhead' instead.
23/05/22 16:21:30 WARN SparkConf: The configuration key 'spark.yarn.driver.memoryOverhead' has been deprecated as of Spark 2.3 and may be removed in the future. Please use the new key 'spark.driver.memoryOverhead' instead.
23/05/22 16:21:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/22 16:21:31 WARN DependencyUtils: Skip remote jar s3://snowplow-hosted-assets-eu-central-1/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar.
23/05/22 16:21:32 INFO RMProxy: Connecting to ResourceManager at ip-X-X-X-X.eu-central-1.compute.internal/10.0.3.61:8032
23/05/22 16:21:32 INFO Client: Requesting a new application from cluster with 1 NodeManagers
23/05/22 16:21:33 INFO Configuration: resource-types.xml not found
23/05/22 16:21:33 INFO ResourceUtils: Unable to find 'resource-types.xml'.
23/05/22 16:21:33 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (57344 MB per container)
23/05/22 16:21:33 INFO Client: Will allocate AM container, with 8192 MB memory including 1024 MB overhead
23/05/22 16:21:33 INFO Client: Setting up container launch context for our AM
23/05/22 16:21:33 INFO Client: Setting up the launch environment for our AM container
23/05/22 16:21:33 INFO Client: Preparing resources for our AM container
23/05/22 16:21:33 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
23/05/22 16:21:36 INFO Client: Uploading resource file:/mnt/tmp/spark-0bd94759-f8a1-48df-a7b8-53ffad782746/__spark_libs__6228731312099573208.zip -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684769856689_0004/__spark_libs__6228731312099573208.zip
23/05/22 16:21:40 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
23/05/22 16:21:40 INFO Client: Uploading resource s3://snowplow-hosted-assets-eu-central-1/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684769856689_0004/snowplow-transformer-batch-5.4.1.jar
23/05/22 16:21:41 INFO S3NativeFileSystem: Opening 's3://snowplow-hosted-assets-eu-central-1/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar' for reading
23/05/22 16:21:44 INFO Client: Uploading resource file:/mnt/tmp/spark-0bd94759-f8a1-48df-a7b8-53ffad782746/__spark_conf__9057109694492654217.zip -> hdfs://ip-X-X-X-X.eu-central-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1684769856689_0004/__spark_conf__.zip
23/05/22 16:21:44 INFO SecurityManager: Changing view acls to: hadoop
23/05/22 16:21:44 INFO SecurityManager: Changing modify acls to: hadoop
23/05/22 16:21:44 INFO SecurityManager: Changing view acls groups to: 
23/05/22 16:21:44 INFO SecurityManager: Changing modify acls groups to: 
23/05/22 16:21:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
23/05/22 16:21:44 INFO Client: Submitting application application_1684769856689_0004 to ResourceManager
23/05/22 16:21:44 INFO YarnClientImpl: Submitted application application_1684769856689_0004
23/05/22 16:21:45 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:45 INFO Client: 
	 client token: N/A
	 diagnostics: AM container is launched, waiting for AM container to Register with RM
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: default
	 start time: 1684772504807
	 final status: UNDEFINED
	 tracking URL: http://ip-X-X-X-X.eu-central-1.compute.internal:20888/proxy/application_1684769856689_0004/
	 user: hadoop
23/05/22 16:21:46 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:47 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:48 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:49 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:50 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:51 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:52 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:53 INFO Client: Application report for application_1684769856689_0004 (state: ACCEPTED)
23/05/22 16:21:54 INFO Client: Application report for application_1684769856689_0004 (state: FAILED)
23/05/22 16:21:54 INFO Client: 
	 client token: N/A
	 diagnostics: Application application_1684769856689_0004 failed 2 times due to AM Container for appattempt_1684769856689_0004_000002 exited with  exitCode: 13
Failing this attempt.Diagnostics: [2023-05-22 16:21:53.930]Exception from container-launch.
Container id: container_1684769856689_0004_02_000001
Exit code: 13

And then, this one:

[2023-05-22 16:21:53.933]Container exited with a non-zero exit code 13. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :
o: yarn,hadoop
23/05/22 16:21:52 INFO SecurityManager: Changing view acls groups to: 
23/05/22 16:21:52 INFO SecurityManager: Changing modify acls groups to: 
23/05/22 16:21:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
23/05/22 16:21:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
23/05/22 16:21:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
23/05/22 16:21:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
23/05/22 16:21:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
23/05/22 16:21:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
23/05/22 16:21:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
23/05/22 16:21:52 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1684769856689_0004_000002
23/05/22 16:21:52 INFO ApplicationMaster: Starting the user application in a separate Thread
23/05/22 16:21:52 ERROR ApplicationMaster: Uncaught exception: 
java.lang.ClassNotFoundException: com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.deploy.yarn.ApplicationMaster.startUserApplication(ApplicationMaster.scala:718)
	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:492)
	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:264)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:890)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:889)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:889)
	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
23/05/22 16:21:52 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Uncaught exception: java.lang.ClassNotFoundException: com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main

These are the configs I’m using:

cluster.config

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data": {
    "name": "RDB Transformer",
    "region": "{REGION}",
    "logUri": "s3://{LOG_BUCKET}/",
    "credentials": {
      "accessKeyId": "{KEY}",
      "secretAccessKey": "{SECRET}"
    },
    "roles": {
      "jobflow": "{EMR_EC2_ROLE}",
      "service": "{EMR_ROLE}"
    },
    "ec2": {
      "amiVersion": "6.2.0",
      "keyName": "{EMR_EC2_KEYNAME}",
      "location": {
        "vpc": {
          "subnetId": "{EMR_SUBNET_ID}"
        }
      },
      "instances": {
        "master": {
          "type": "m4.large",
          "ebsConfiguration": {
            "ebsOptimized": true,
            "ebsBlockDeviceConfigs": []
          }
        },
        "core": {
          "type": "r4.xlarge",
          "count": 1
        },
        "task": {
          "type": "m4.large",
          "count": 0,
          "bid": "0.015"
        }
      }
    },
    "tags": [],
    "bootstrapActionConfigs": [],
    "configurations": [
      {
        "classification": "core-site",
        "properties": {
          "Io.file.buffer.size": "65536"
        },
        "configurations": []
      },
      {
        "classification": "yarn-site",
        "properties": {
          "yarn.nodemanager.resource.memory-mb": "57344",
          "yarn.scheduler.maximum-allocation-mb": "57344",
          "yarn.nodemanager.vmem-check-enabled": "false"
        },
        "configurations": []
      },
      {
        "classification": "spark",
        "properties": {
          "maximizeResourceAllocation": "false"
        },
        "configurations": []
      },
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.executor.memory": "7G",
          "spark.driver.memory": "7G",
          "spark.driver.cores": "3",
          "spark.yarn.driver.memoryOverhead": "1024",
          "spark.default.parallelism": "24",
          "spark.executor.cores": "1",
          "spark.executor.instances": "6",
          "spark.yarn.executor.memoryOverhead": "1024",
          "spark.dynamicAllocation.enabled": "false"
        },
        "configurations": []
      }
    ],
    "applications": [
      "Hadoop",
      "Spark"
    ]
  }
}

playbook.config


{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data": {
    "region": "{REGION}",
    "credentials": {
      "accessKeyId": "{KEY}",
      "secretAccessKey": "{SECRET}"
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "S3DistCp enriched data archiving",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src", "s3://{ENRICH_BUCKET}/",
          "--dest", "s3://{ARCHIVE_BUCKET}/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
          "--s3Endpoint", "s3-{REGION}.amazonaws.com",
          "--srcPattern", ".*",
          "--outputCodec", "gz",
          "--deleteOnSuccess"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "RDB Transformer",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "command-runner.jar",
        "arguments": [
          "spark-submit",
          "--class", "com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main",
          "--master", "yarn",
          "--deploy-mode", "cluster",
          "s3://snowplow-hosted-assets-{REGION}/4-storage/transformer-batch/snowplow-transformer-batch-5.4.1.jar",
          "--iglu-config", "{{base64File "/home/ec2-user/iglu_resolver.json"}}",
          "--config", "{{base64File "/home/ec2-user/configs/transformer.config"}}"
        ]
      }
    ],
    "tags": []
  }
}

transformer.config

{
  "input": "s3://{ARCHIVE_BUCKET}/",
  "output": {
    "path": "s3://{TRANSFORMER_BUCKET}/",
    "compression": "GZIP",
    "maxRecordsPerFile": 10000,
    "region": "{REGION}",
    "bad": {
      "type": "kinesis",
      "streamName": "{BAD_STREAM}"
      "region": "{REGION}"
      "recordLimit": 500
      "byteLimit": 5242880
      "backoffPolicy": {
        "minBackoff": 100 milliseconds
        "maxBackoff": 10 seconds
        "maxRetries": 10
      }
      "throttledBackoffPolicy": {
        "minBackoff": 100 milliseconds
        "maxBackoff": 1 second
      }
    }
  }
  "queue": {
    "type": "sqs",
    "queueName": "{QUEUE}",
    "region": "{REGION}"
  }
  "deduplication": {
    "synthetic": {
      "type": "BROADCAST"
      "cardinality": 1
    }
    "natural": true
  }
  "formats": {
    "transformationType": "shred",
    "default": "TSV",
    "json": [ ],
    "tsv": [ ],
    "skip": [ ]
  }
  "runInterval": {
    "sinceTimestamp": "2021-10-12-14-55-22",
  }
  "validations": {
    "minimumTimestamp": "2021-11-18T11:00:00.00Z"
  }
}

Thank you!

The main issue here was misconfiguration of the configs files. Unfortunately, the documentation for the open source project side is quite outdated on the basic setup, too generic and prone to error.

Example: Spark transformer | Snowplow Documentation

The playbook example targets snowplow-transformer-batch-5.4.1.jar … But then the class name is wrong: “com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main” when it should be “com.snowplowanalytics.snowplow.rdbloader.transformer.batch.Main”.

Hope Snowplow address this in the future.

Hi @MP1,

Many thanks for pointing this out. I just opened a PR for the docs to fix this: Fix Spark Transformer class name by stanch · Pull Request #461 · snowplow/documentation · GitHub.

Feel free to comment on the PR with any other issue or inconsistency you found on this page. (You can also make the changes yourself! Just click “edit this page” at the bottom of any page in the docs.)