I am having the following error when using emretlrunner, any idea how to solve?
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-10-2-108.ec2.internal:8020/tmp/01c28d29-865c-461f-a2e1-a4375542ec4a/files
Can you tell me at which emr step the error occurs? You can tell this from looking at the EmrEtlRunner logs, or by looking at the list of steps in the EMR dashboard. Any context around the error message would be helpful.
This is a printout of the EMR step log. Whenever I try to run the job I have the same error. I noticed that the enrichment step doesn’t seem to be working, but Stream Enrich works perfectly.
This is the content of the controller.gz error log
> 2020-05-19T22:52:26.646Z INFO Ensure step 3 jar file command-runner.jar
> 2020-05-19T22:52:26.647Z INFO StepRunner: Created Runner for step 3
> INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --class com.snowplowanalytics.snowplow.storage.spark.ShredJob --master yarn --deploy-mode cluster s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar --iglu-config ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3MuaWdsdS9yZXNvbHZlci1jb25maWcvanNvbnNjaGVtYS8xLTAtMSIsCiAgImRhdGEiOiB7CiAgICAiY2FjaGVTaXplIjogNTAwLAogICAgInJlcG9zaXRvcmllcyI6IFsKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgInByaW9yaXR5IjogMCwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uc25vd3Bsb3dhbmFseXRpY3MiIF0sCiAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAiaHR0cCI6IHsKICAgICAgICAgICAgInVyaSI6ICJodHRwOi8vaWdsdWNlbnRyYWwuY29tIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCAtIEdDUCBNaXJyb3IiLAogICAgICAgICJwcmlvcml0eSI6IDEsCiAgICAgICAgInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLAogICAgICAgICJjb25uZWN0aW9uIjogewogICAgICAgICAgImh0dHAiOiB7CiAgICAgICAgICAgICJ1cmkiOiAiaHR0cDovL21pcnJvcjAxLmlnbHVjZW50cmFsLmNvbSIKICAgICAgICAgIH0KICAgICAgICB9CiAgICAgIH0KICAgIF0KICB9Cn0K --input-folder hdfs:///local/snowplow/enriched-events/* --output-folder hdfs:///local/snowplow/shredded-events/ --bad-folder s3://sanarlake-snowplow-homolog/shredded/bad/run=2020-05-19-22-47-09/'
> INFO Environment:
> PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
> LESS_TERMCAP_md=[01;38;5;208m
> LESS_TERMCAP_me=[0m
> HISTCONTROL=ignoredups
> LESS_TERMCAP_mb=[01;31m
> AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
> UPSTART_JOB=rc
> LESS_TERMCAP_se=[0m
> HISTSIZE=1000
> HADOOP_ROOT_LOGGER=INFO,DRFA
> JAVA_HOME=/etc/alternatives/jre
> AWS_DEFAULT_REGION=us-east-1
> AWS_ELB_HOME=/opt/aws/apitools/elb
> LESS_TERMCAP_us=[04;38;5;111m
> EC2_HOME=/opt/aws/apitools/ec2
> TERM=linux
> runlevel=3
> LANG=en_US.UTF-8
> AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
> MAIL=/var/spool/mail/hadoop
> LESS_TERMCAP_ue=[0m
> LOGNAME=hadoop
> PWD=/
> LANGSH_SOURCED=1
> HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-1AOC83RFSJSWD/tmp
> _=/etc/alternatives/jre/bin/java
> CONSOLETYPE=serial
> RUNLEVEL=3
> LESSOPEN=||/usr/bin/lesspipe.sh %s
> previous=N
> UPSTART_EVENTS=runlevel
> AWS_PATH=/opt/aws
> USER=hadoop
> UPSTART_INSTANCE=
> PREVLEVEL=N
> HADOOP_LOGFILE=syslog
> HOSTNAME=ip-10-10-2-17
> HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-1AOC83RFSJSWD
> EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
> SHLVL=5
> HOME=/home/hadoop
> HADOOP_IDENT_STRING=hadoop
> INFO redirectOutput to /mnt/var/log/hadoop/steps/s-1AOC83RFSJSWD/stdout
> INFO redirectError to /mnt/var/log/hadoop/steps/s-1AOC83RFSJSWD/stderr
> INFO Working dir /mnt/var/lib/hadoop/steps/s-1AOC83RFSJSWD
> INFO ProcessRunner started child process 11328 :
> hadoop 11328 4785 0 22:52 ? 00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --class com.snowplowanalytics.snowplow.storage.spark.ShredJob --master yarn --deploy-mode cluster s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar --iglu-config ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3MuaWdsdS9yZXNvbHZlci1jb25maWcvanNvbnNjaGVtYS8xLTAtMSIsCiAgImRhdGEiOiB7CiAgICAiY2FjaGVTaXplIjogNTAwLAogICAgInJlcG9zaXRvcmllcyI6IFsKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgInByaW9yaXR5IjogMCwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uc25vd3Bsb3dhbmFseXRpY3MiIF0sCiAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAiaHR0cCI6IHsKICAgICAgICAgICAgInVyaSI6ICJodHRwOi8vaWdsdWNlbnRyYWwuY29tIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCAtIEdDUCBNaXJyb3IiLAogICAgICAgICJwcmlvcml0eSI6IDEsCiAgICAgICAgInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLAogICAgICAgICJjb25uZWN0aW9uIjogewogICAgICAgICAgImh0dHAiOiB7CiAgICAgICAgICAgICJ1cmkiOiAiaHR0cDovL21pcnJvcjAxLmlnbHVjZW50cmFsLmNvbSIKICAgICAgICAgIH0KICAgICAgICB9CiAgICAgIH0KICAgIF0KICB9Cn0K --input-folder hdfs:///local/snowplow/enriched-events/* --output-folder hdfs:///local/snowplow/shredded-events/ --bad-folder s3://sanarlake-snowplow-homolog/shredded/bad/run=2020-05-19-22-47-09/
> 2020-05-19T22:52:30.653Z INFO HadoopJarStepRunner.Runner: startRun() called for s-1AOC83RFSJSWD Child Pid: 11328
> INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
> INFO waitProcessCompletion ended with exit code 1 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
> INFO total process run time: 50 seconds
> 2020-05-19T22:53:18.720Z INFO Step created jobs:
> 2020-05-19T22:53:18.720Z WARN Step failed with exitCode 1 and took 50 seconds
This is the content of the stderr.gz error log
> log4j:ERROR setFile(null,true) call failed.
> java.io.FileNotFoundException: /stderr (Permission denied)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
> at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
> at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
> at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223)
> at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
> at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
> at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
> at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
> at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
> at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
> at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
> at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
> at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
> at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
> at org.apache.spark.internal.Logging$class.initializeLogging(Logging.scala:120)
> at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:108)
> at org.apache.spark.deploy.SparkSubmit$.initializeLogIfNecessary(SparkSubmit.scala:71)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:128)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> log4j:ERROR Either File or DatePattern options are not set for appender [DRFA-stderr].
> log4j:ERROR setFile(null,true) call failed.
> java.io.FileNotFoundException: /stdout (Permission denied)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
> at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
> at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
> at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223)
> at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
> at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
> at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
> at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
> at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
> at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
> at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
> at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
> at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
> at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
> at org.apache.spark.internal.Logging$class.initializeLogging(Logging.scala:120)
> at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:108)
> at org.apache.spark.deploy.SparkSubmit$.initializeLogIfNecessary(SparkSubmit.scala:71)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:128)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> log4j:ERROR Either File or DatePattern options are not set for appender [DRFA-stdout].
> Warning: Skip remote jar s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar.
> 20/05/19 22:52:31 INFO RMProxy: Connecting to ResourceManager at ip-10-10-2-17.ec2.internal/10.10.2.17:8032
> 20/05/19 22:52:31 INFO Client: Requesting a new application from cluster with 2 NodeManagers
> 20/05/19 22:52:32 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
> 20/05/19 22:52:32 INFO Client: Will allocate AM container, with 12288 MB memory including 1117 MB overhead
> 20/05/19 22:52:32 INFO Client: Setting up container launch context for our AM
> 20/05/19 22:52:32 INFO Client: Setting up the launch environment for our AM container
> 20/05/19 22:52:32 INFO Client: Preparing resources for our AM container
> 20/05/19 22:52:33 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
> 20/05/19 22:52:35 INFO Client: Uploading resource file:/mnt/tmp/spark-263a446d-3b83-4336-93c9-fe504445c686/__spark_libs__7695959324701782710.zip -> hdfs://ip-10-10-2-17.ec2.internal:8020/user/hadoop/.sparkStaging/application_1589928613021_0003/__spark_libs__7695959324701782710.zip
> 20/05/19 22:52:37 INFO Client: Uploading resource s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar -> hdfs://ip-10-10-2-17.ec2.internal:8020/user/hadoop/.sparkStaging/application_1589928613021_0003/snowplow-rdb-shredder-0.15.0.jar
> 20/05/19 22:52:37 INFO S3NativeFileSystem: Opening 's3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar' for reading
> 20/05/19 22:52:38 INFO Client: Uploading resource file:/mnt/tmp/spark-263a446d-3b83-4336-93c9-fe504445c686/__spark_conf__3844467823067685141.zip -> hdfs://ip-10-10-2-17.ec2.internal:8020/user/hadoop/.sparkStaging/application_1589928613021_0003/__spark_conf__.zip
> 20/05/19 22:52:38 INFO SecurityManager: Changing view acls to: hadoop
> 20/05/19 22:52:38 INFO SecurityManager: Changing modify acls to: hadoop
> 20/05/19 22:52:38 INFO SecurityManager: Changing view acls groups to:
> 20/05/19 22:52:38 INFO SecurityManager: Changing modify acls groups to:
> 20/05/19 22:52:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); groups with view permissions: Set(); users with modify permissions: Set(hadoop); groups with modify permissions: Set()
> 20/05/19 22:52:38 INFO Client: Submitting application application_1589928613021_0003 to ResourceManager
> 20/05/19 22:52:38 INFO YarnClientImpl: Submitted application application_1589928613021_0003
> 20/05/19 22:52:39 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:39 INFO Client:
> client token: N/A
> diagnostics: AM container is launched, waiting for AM container to Register with RM
> ApplicationMaster host: N/A
> ApplicationMaster RPC port: -1
> queue: default
> start time: 1589928758710
> final status: UNDEFINED
> tracking URL: http://ip-10-10-2-17.ec2.internal:20888/proxy/application_1589928613021_0003/
> user: hadoop
> 20/05/19 22:52:40 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:41 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:42 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:43 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:44 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:44 INFO Client:
> client token: N/A
> diagnostics: N/A
> ApplicationMaster host: 10.10.2.36
> ApplicationMaster RPC port: 0
> queue: default
> start time: 1589928758710
> final status: UNDEFINED
> tracking URL: http://ip-10-10-2-17.ec2.internal:20888/proxy/application_1589928613021_0003/
> user: hadoop
> 20/05/19 22:52:45 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:46 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:47 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:48 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:49 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:50 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:51 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:52 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:53 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:54 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:55 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:56 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:57 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:58 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:59 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:00 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:01 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:02 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:03 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:04 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:05 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:06 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:07 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:08 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:09 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:10 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:11 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:12 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:13 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:14 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:15 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:16 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:17 INFO Client: Application report for application_1589928613021_0003 (state: FINISHED)
> 20/05/19 22:53:17 INFO Client:
> client token: N/A
> diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted.
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
> at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
> at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
> at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
> at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:597)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:358)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply$mcV$sp(ShredJob.scala:164)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:164)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:164)
> at scala.util.Try$.apply(Try.scala:192)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.runJob(ShredJob.scala:164)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:157)
> at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:63)
> at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
> Caused by: java.io.IOException: Not a file: hdfs://ip-10-10-2-17.ec2.internal:8020/local/snowplow/enriched-events/2020/05
> at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:295)
> at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
> at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:238)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:238)
> at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:483)
> at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:450)
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:368)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:433)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:432)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
> at scala.collection.SetLike$class.map(SetLike.scala:92)
> at scala.collection.mutable.AbstractSet.map(Set.scala:46)
> at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:432)
> at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:419)
> at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:907)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1981)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1973)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1962)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
> ... 35 more
>
> ApplicationMaster host: 10.10.2.36
> ApplicationMaster RPC port: 0
> queue: default
> start time: 1589928758710
> final status: FAILED
> tracking URL: http://ip-10-10-2-17.ec2.internal:20888/proxy/application_1589928613021_0003/
> user: hadoop
> Exception in thread "main" org.apache.spark.SparkException: Application application_1589928613021_0003 finished with failed status
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1165)
> at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1520)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 20/05/19 22:53:17 INFO ShutdownHookManager: Shutdown hook called
> 20/05/19 22:53:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-054a5375-aee1-4db4-8d1e-840536ba4d89
> 20/05/19 22:53:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-263a446d-3b83-4336-93c9-fe504445c686
> Command exiting with ret '1'
I’ve been trying to solve this problem for many days and I’m almost giving up on using Snowplow. I appreciate all the help I can get here.
I didn’t test the second flow after the first one worked.
I would like your help in one more problem.
When the EMR ETL Runner finishes processing it generates a .txt file with the data. I wanted to generate a .csv with the data and the names of each column to be stored in S3. The only option I have to have the name of each column in the same file is to throw the dice in Redshift and then to throw everything in S3?
In Glue create an archive table that points towards your enriched archive files in S3
Set up a Glue job to format shift to csv.
The example in the blog shows a format shift to parquet, but the steps for csv would be exactly the same.
An alternative approach to your problem is to use the snowplow analytics sdks which are able to read and parse the txt files. That wiki link includes examples of using spark to read all the events and write out as json.