Error On EmrEtlRunner

Hello,

I am having the following error when using emretlrunner, any idea how to solve?

Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-10-2-108.ec2.internal:8020/tmp/01c28d29-865c-461f-a2e1-a4375542ec4a/files

This is my config.yml file…

aws:
  access_key_id: xxxxxxxxxxxxxxxxxxxxxxxxxxx
  secret_access_key: xxxxxxxxxxxxxxxxxxxxxxxxxxx
  s3:
    region: us-east-1
    buckets:
      assets: xxxxxxxxxxxxxxxxxxxxxxxxxxx
      jsonpath_assets:
      log: xxxxxxxxxxxxxxxxxxxxxxxxxxx
      encrypted: false
      raw:
        in:
          - xxxxxxxxxxxxxxxxxxxxxxxxxxx
        processing: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        archive: xxxxxxxxxxxxxxxxxxxxxxxxxxx
      enriched:
        good: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        bad: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        errors: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        archive: xxxxxxxxxxxxxxxxxxxxxxxxxxx
      shredded:
        good: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        bad: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        errors: xxxxxxxxxxxxxxxxxxxxxxxxxxx
        archive: xxxxxxxxxxxxxxxxxxxxxxxxxxx
    consolidate_shredded_output: false
  emr:
    ami_version: 5.19.0
    region: us-east-1
    jobflow_role: EMR_EC2_DefaultRole
    service_role: EMR_DefaultRole
    placement:
    ec2_subnet_id: xxxxxxxxxxxxxxxxxxxxxxxxxxx
    ec2_key_name: xxxxxxxxxxxxxxxxxxxxxxxxxxx
    security_configuration:
    bootstrap: []
    software:
      hbase:
      lingual:
    jobflow:
      job_name: Snowplow ETL
      master_instance_type: m5.xlarge
      core_instance_count: 2
      core_instance_type: m5.xlarge
      core_instance_bid:
      core_instance_ebs:
        volume_size: 100
        volume_type: "gp2"
        volume_iops: 400
        ebs_optimized: false
      task_instance_count: 0
      task_instance_type: m5.xlarge
      task_instance_bid:
    bootstrap_failure_tries: 3
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:
collectors:
  format: thrift
enrich:
  versions:
    spark_enrich: 1.18.0
  continue_on_unexpected_error: false
  output_compression: NONE
storage:
  versions:
    rdb_loader: 0.16.0
    rdb_shredder: 0.15.0
    hadoop_elasticsearch: 0.1.0
monitoring:
  tags: {}
  logging:
    level: DEBUG

Hi @Mauricio_Cesar,

Your config file looks OK as far as I can tell.

Can you tell me at which emr step the error occurs? You can tell this from looking at the EmrEtlRunner logs, or by looking at the list of steps in the EMR dashboard. Any context around the error message would be helpful.

I noticed that a similar error message was reported on this forum a couple of years ago. The advice there was to check either of the following

  • EMR cluster crashing prematurely
  • There are no “enriched” data (maybe due to being rejected and all ending up as “bad” data)

Is it possible either of those could explain your error?

Hi @istreeter

Thanks for trying to help me.

This is a printout of the EMR step log. Whenever I try to run the job I have the same error. I noticed that the enrichment step doesn’t seem to be working, but Stream Enrich works perfectly.

I tried to change the process to Stream Enrich mode, but I get the error below.

This is the content of the controller.gz error log

> 2020-05-19T22:52:26.646Z INFO Ensure step 3 jar file command-runner.jar
> 2020-05-19T22:52:26.647Z INFO StepRunner: Created Runner for step 3
> INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --class com.snowplowanalytics.snowplow.storage.spark.ShredJob --master yarn --deploy-mode cluster s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar --iglu-config ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3MuaWdsdS9yZXNvbHZlci1jb25maWcvanNvbnNjaGVtYS8xLTAtMSIsCiAgImRhdGEiOiB7CiAgICAiY2FjaGVTaXplIjogNTAwLAogICAgInJlcG9zaXRvcmllcyI6IFsKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgInByaW9yaXR5IjogMCwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uc25vd3Bsb3dhbmFseXRpY3MiIF0sCiAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAiaHR0cCI6IHsKICAgICAgICAgICAgInVyaSI6ICJodHRwOi8vaWdsdWNlbnRyYWwuY29tIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCAtIEdDUCBNaXJyb3IiLAogICAgICAgICJwcmlvcml0eSI6IDEsCiAgICAgICAgInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLAogICAgICAgICJjb25uZWN0aW9uIjogewogICAgICAgICAgImh0dHAiOiB7CiAgICAgICAgICAgICJ1cmkiOiAiaHR0cDovL21pcnJvcjAxLmlnbHVjZW50cmFsLmNvbSIKICAgICAgICAgIH0KICAgICAgICB9CiAgICAgIH0KICAgIF0KICB9Cn0K --input-folder hdfs:///local/snowplow/enriched-events/* --output-folder hdfs:///local/snowplow/shredded-events/ --bad-folder s3://sanarlake-snowplow-homolog/shredded/bad/run=2020-05-19-22-47-09/'
> INFO Environment:
>   PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
>   LESS_TERMCAP_md=[01;38;5;208m
>   LESS_TERMCAP_me=[0m
>   HISTCONTROL=ignoredups
>   LESS_TERMCAP_mb=[01;31m
>   AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
>   UPSTART_JOB=rc
>   LESS_TERMCAP_se=[0m
>   HISTSIZE=1000
>   HADOOP_ROOT_LOGGER=INFO,DRFA
>   JAVA_HOME=/etc/alternatives/jre
>   AWS_DEFAULT_REGION=us-east-1
>   AWS_ELB_HOME=/opt/aws/apitools/elb
>   LESS_TERMCAP_us=[04;38;5;111m
>   EC2_HOME=/opt/aws/apitools/ec2
>   TERM=linux
>   runlevel=3
>   LANG=en_US.UTF-8
>   AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
>   MAIL=/var/spool/mail/hadoop
>   LESS_TERMCAP_ue=[0m
>   LOGNAME=hadoop
>   PWD=/
>   LANGSH_SOURCED=1
>   HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-1AOC83RFSJSWD/tmp
>   _=/etc/alternatives/jre/bin/java
>   CONSOLETYPE=serial
>   RUNLEVEL=3
>   LESSOPEN=||/usr/bin/lesspipe.sh %s
>   previous=N
>   UPSTART_EVENTS=runlevel
>   AWS_PATH=/opt/aws
>   USER=hadoop
>   UPSTART_INSTANCE=
>   PREVLEVEL=N
>   HADOOP_LOGFILE=syslog
>   HOSTNAME=ip-10-10-2-17
>   HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-1AOC83RFSJSWD
>   EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
>   SHLVL=5
>   HOME=/home/hadoop
>   HADOOP_IDENT_STRING=hadoop
> INFO redirectOutput to /mnt/var/log/hadoop/steps/s-1AOC83RFSJSWD/stdout
> INFO redirectError to /mnt/var/log/hadoop/steps/s-1AOC83RFSJSWD/stderr
> INFO Working dir /mnt/var/lib/hadoop/steps/s-1AOC83RFSJSWD
> INFO ProcessRunner started child process 11328 :
> hadoop   11328  4785  0 22:52 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --class com.snowplowanalytics.snowplow.storage.spark.ShredJob --master yarn --deploy-mode cluster s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar --iglu-config ewogICJzY2hlbWEiOiAiaWdsdTpjb20uc25vd3Bsb3dhbmFseXRpY3MuaWdsdS9yZXNvbHZlci1jb25maWcvanNvbnNjaGVtYS8xLTAtMSIsCiAgImRhdGEiOiB7CiAgICAiY2FjaGVTaXplIjogNTAwLAogICAgInJlcG9zaXRvcmllcyI6IFsKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgInByaW9yaXR5IjogMCwKICAgICAgICAidmVuZG9yUHJlZml4ZXMiOiBbICJjb20uc25vd3Bsb3dhbmFseXRpY3MiIF0sCiAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAiaHR0cCI6IHsKICAgICAgICAgICAgInVyaSI6ICJodHRwOi8vaWdsdWNlbnRyYWwuY29tIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfSwKICAgICAgewogICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCAtIEdDUCBNaXJyb3IiLAogICAgICAgICJwcmlvcml0eSI6IDEsCiAgICAgICAgInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLAogICAgICAgICJjb25uZWN0aW9uIjogewogICAgICAgICAgImh0dHAiOiB7CiAgICAgICAgICAgICJ1cmkiOiAiaHR0cDovL21pcnJvcjAxLmlnbHVjZW50cmFsLmNvbSIKICAgICAgICAgIH0KICAgICAgICB9CiAgICAgIH0KICAgIF0KICB9Cn0K --input-folder hdfs:///local/snowplow/enriched-events/* --output-folder hdfs:///local/snowplow/shredded-events/ --bad-folder s3://sanarlake-snowplow-homolog/shredded/bad/run=2020-05-19-22-47-09/
> 2020-05-19T22:52:30.653Z INFO HadoopJarStepRunner.Runner: startRun() called for s-1AOC83RFSJSWD Child Pid: 11328
> INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
> INFO waitProcessCompletion ended with exit code 1 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
> INFO total process run time: 50 seconds
> 2020-05-19T22:53:18.720Z INFO Step created jobs: 
> 2020-05-19T22:53:18.720Z WARN Step failed with exitCode 1 and took 50 seconds

This is the content of the stderr.gz error log

> log4j:ERROR setFile(null,true) call failed.
> java.io.FileNotFoundException: /stderr (Permission denied)
> 	at java.io.FileOutputStream.open0(Native Method)
> 	at java.io.FileOutputStream.open(FileOutputStream.java:270)
> 	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> 	at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
> 	at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
> 	at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
> 	at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223)
> 	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
> 	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
> 	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
> 	at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
> 	at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
> 	at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
> 	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
> 	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
> 	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
> 	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
> 	at org.apache.spark.internal.Logging$class.initializeLogging(Logging.scala:120)
> 	at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:108)
> 	at org.apache.spark.deploy.SparkSubmit$.initializeLogIfNecessary(SparkSubmit.scala:71)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:128)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> log4j:ERROR Either File or DatePattern options are not set for appender [DRFA-stderr].
> log4j:ERROR setFile(null,true) call failed.
> java.io.FileNotFoundException: /stdout (Permission denied)
> 	at java.io.FileOutputStream.open0(Native Method)
> 	at java.io.FileOutputStream.open(FileOutputStream.java:270)
> 	at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
> 	at java.io.FileOutputStream.<init>(FileOutputStream.java:133)
> 	at org.apache.log4j.FileAppender.setFile(FileAppender.java:294)
> 	at org.apache.log4j.FileAppender.activateOptions(FileAppender.java:165)
> 	at org.apache.log4j.DailyRollingFileAppender.activateOptions(DailyRollingFileAppender.java:223)
> 	at org.apache.log4j.config.PropertySetter.activate(PropertySetter.java:307)
> 	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:172)
> 	at org.apache.log4j.config.PropertySetter.setProperties(PropertySetter.java:104)
> 	at org.apache.log4j.PropertyConfigurator.parseAppender(PropertyConfigurator.java:842)
> 	at org.apache.log4j.PropertyConfigurator.parseCategory(PropertyConfigurator.java:768)
> 	at org.apache.log4j.PropertyConfigurator.parseCatsAndRenderers(PropertyConfigurator.java:672)
> 	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:516)
> 	at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:580)
> 	at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
> 	at org.apache.log4j.LogManager.<clinit>(LogManager.java:127)
> 	at org.apache.spark.internal.Logging$class.initializeLogging(Logging.scala:120)
> 	at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:108)
> 	at org.apache.spark.deploy.SparkSubmit$.initializeLogIfNecessary(SparkSubmit.scala:71)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:128)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> log4j:ERROR Either File or DatePattern options are not set for appender [DRFA-stdout].
> Warning: Skip remote jar s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar.
> 20/05/19 22:52:31 INFO RMProxy: Connecting to ResourceManager at ip-10-10-2-17.ec2.internal/10.10.2.17:8032
> 20/05/19 22:52:31 INFO Client: Requesting a new application from cluster with 2 NodeManagers
> 20/05/19 22:52:32 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
> 20/05/19 22:52:32 INFO Client: Will allocate AM container, with 12288 MB memory including 1117 MB overhead
> 20/05/19 22:52:32 INFO Client: Setting up container launch context for our AM
> 20/05/19 22:52:32 INFO Client: Setting up the launch environment for our AM container
> 20/05/19 22:52:32 INFO Client: Preparing resources for our AM container
> 20/05/19 22:52:33 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
> 20/05/19 22:52:35 INFO Client: Uploading resource file:/mnt/tmp/spark-263a446d-3b83-4336-93c9-fe504445c686/__spark_libs__7695959324701782710.zip -> hdfs://ip-10-10-2-17.ec2.internal:8020/user/hadoop/.sparkStaging/application_1589928613021_0003/__spark_libs__7695959324701782710.zip
> 20/05/19 22:52:37 INFO Client: Uploading resource s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar -> hdfs://ip-10-10-2-17.ec2.internal:8020/user/hadoop/.sparkStaging/application_1589928613021_0003/snowplow-rdb-shredder-0.15.0.jar
> 20/05/19 22:52:37 INFO S3NativeFileSystem: Opening 's3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-0.15.0.jar' for reading
> 20/05/19 22:52:38 INFO Client: Uploading resource file:/mnt/tmp/spark-263a446d-3b83-4336-93c9-fe504445c686/__spark_conf__3844467823067685141.zip -> hdfs://ip-10-10-2-17.ec2.internal:8020/user/hadoop/.sparkStaging/application_1589928613021_0003/__spark_conf__.zip
> 20/05/19 22:52:38 INFO SecurityManager: Changing view acls to: hadoop
> 20/05/19 22:52:38 INFO SecurityManager: Changing modify acls to: hadoop
> 20/05/19 22:52:38 INFO SecurityManager: Changing view acls groups to: 
> 20/05/19 22:52:38 INFO SecurityManager: Changing modify acls groups to: 
> 20/05/19 22:52:38 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
> 20/05/19 22:52:38 INFO Client: Submitting application application_1589928613021_0003 to ResourceManager
> 20/05/19 22:52:38 INFO YarnClientImpl: Submitted application application_1589928613021_0003
> 20/05/19 22:52:39 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:39 INFO Client: 
> 	 client token: N/A
> 	 diagnostics: AM container is launched, waiting for AM container to Register with RM
> 	 ApplicationMaster host: N/A
> 	 ApplicationMaster RPC port: -1
> 	 queue: default
> 	 start time: 1589928758710
> 	 final status: UNDEFINED
> 	 tracking URL: http://ip-10-10-2-17.ec2.internal:20888/proxy/application_1589928613021_0003/
> 	 user: hadoop
> 20/05/19 22:52:40 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:41 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:42 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:43 INFO Client: Application report for application_1589928613021_0003 (state: ACCEPTED)
> 20/05/19 22:52:44 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:44 INFO Client: 
> 	 client token: N/A
> 	 diagnostics: N/A
> 	 ApplicationMaster host: 10.10.2.36
> 	 ApplicationMaster RPC port: 0
> 	 queue: default
> 	 start time: 1589928758710
> 	 final status: UNDEFINED
> 	 tracking URL: http://ip-10-10-2-17.ec2.internal:20888/proxy/application_1589928613021_0003/
> 	 user: hadoop
> 20/05/19 22:52:45 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:46 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:47 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:48 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:49 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:50 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:51 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:52 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:53 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:54 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:55 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:56 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:57 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:58 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:52:59 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:00 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:01 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:02 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:03 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:04 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:05 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:06 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:07 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:08 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:09 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:10 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:11 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:12 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:13 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:14 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:15 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:16 INFO Client: Application report for application_1589928613021_0003 (state: RUNNING)
> 20/05/19 22:53:17 INFO Client: Application report for application_1589928613021_0003 (state: FINISHED)
> 20/05/19 22:53:17 INFO Client: 
> 	 client token: N/A
> 	 diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted.
> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:224)
> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
> 	at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:597)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:358)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply$mcV$sp(ShredJob.scala:164)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:164)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:164)
> 	at scala.util.Try$.apply(Try.scala:192)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.runJob(ShredJob.scala:164)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:157)
> 	at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:63)
> 	at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:721)
> Caused by: java.io.IOException: Not a file: hdfs://ip-10-10-2-17.ec2.internal:8020/local/snowplow/enriched-events/2020/05
> 	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:295)
> 	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> 	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> 	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> 	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> 	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:46)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
> 	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
> 	at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
> 	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:240)
> 	at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:238)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.spark.rdd.RDD.dependencies(RDD.scala:238)
> 	at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:483)
> 	at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:450)
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:368)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:433)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:432)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> 	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> 	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
> 	at scala.collection.SetLike$class.map(SetLike.scala:92)
> 	at scala.collection.mutable.AbstractSet.map(Set.scala:46)
> 	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:432)
> 	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:419)
> 	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:907)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1981)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1973)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1962)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
> 	... 35 more
> 
> 	 ApplicationMaster host: 10.10.2.36
> 	 ApplicationMaster RPC port: 0
> 	 queue: default
> 	 start time: 1589928758710
> 	 final status: FAILED
> 	 tracking URL: http://ip-10-10-2-17.ec2.internal:20888/proxy/application_1589928613021_0003/
> 	 user: hadoop
> Exception in thread "main" org.apache.spark.SparkException: Application application_1589928613021_0003 finished with failed status
> 	at org.apache.spark.deploy.yarn.Client.run(Client.scala:1165)
> 	at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1520)
> 	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
> 	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
> 	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 20/05/19 22:53:17 INFO ShutdownHookManager: Shutdown hook called
> 20/05/19 22:53:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-054a5375-aee1-4db4-8d1e-840536ba4d89
> 20/05/19 22:53:17 INFO ShutdownHookManager: Deleting directory /mnt/tmp/spark-263a446d-3b83-4336-93c9-fe504445c686
> Command exiting with ret '1'

I’ve been trying to solve this problem for many days and I’m almost giving up on using Snowplow. I appreciate all the help I can get here.

@Mauricio_Cesar, I can see this error

Caused by: java.io.IOException: Not a file: hdfs://ip-10-10-2-17.ec2.internal:8020/local/snowplow/enriched-events/2020/05

Note the partitioning path 2020/05.

Also, I can see this in the EmrEtlRunner

collectors:
  format: thrift

Questions:

  1. Are you using Scale Stream Collector and load the raw data to S3?
  2. How did you configure S3 Loader or you are using something else instead?

Note the data is expected to be in the folder following this format, run=YYYY-MM-DD-hh-mm-ss.

@ihor @istreeter

Thank you so much for helping me. I had been trying to adjust the configuration for seven days without success. I’m thrilled that I finally made it.

I have two streams working. My first flow is as follows:

Collector -> Kinesis Raw Events Stream -> S3 Loader Raw Events -> Bucket S3 Raw Events -> EMR ETL Runner -> Bucket S3 Shredded Events

I realized that I was storing Raw events in GZIP instead of LZO and besides that the date format was wrong too.

After adjusting the S3 Loader settings, the EMR ERL Runner stopped giving an error.

My second flow is as follows:

Collector -> Stream Kinesis Raw Events -> Stream Enrich Raw Events -> Stream Kinesis Enriched Events -> S3 Loader Enriched Events -> Bucket S3 Enriched Events -> EMR ETL Runner -> Bucket S3 Shredded Events

I didn’t test the second flow after the first one worked.

I would like your help in one more problem.

When the EMR ETL Runner finishes processing it generates a .txt file with the data. I wanted to generate a .csv with the data and the names of each column to be stored in S3. The only option I have to have the name of each column in the same file is to throw the dice in Redshift and then to throw everything in S3?

Hi @Mauricio_Cesar

You can take a look at AWS Glue to help you convert the .txt files into .csv files with column names. We have a blog post which might help you with this: Using AWS Glue and AWS Athena with Snowplow data

The basic approach is

  • In Glue create an archive table that points towards your enriched archive files in S3
  • Set up a Glue job to format shift to csv.

The example in the blog shows a format shift to parquet, but the steps for csv would be exactly the same.

An alternative approach to your problem is to use the snowplow analytics sdks which are able to read and parse the txt files. That wiki link includes examples of using spark to read all the events and write out as json.