Collector: Clojure-Connector
EMR ETL Runner is failing at the “Elasticity Spark Step: Enrich Raw Events” step. I have reviewed the container logs associated with this job and it appears to be failing due to (please see error below). Any help will be greatly appreciated.
Error
ERROR ApplicationMaster: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-5.ec2.internal:8020/local/snowplow/raw-events/resources/environments
java.io.IOException: Not a file: hdfs://ip-10-97-58-5.ec2.internal:8020/local/snowplow/raw-events/resources/environments
Command line
./snowplow-emr-etl-runner run -c ./snowplow/3-enrich/emr-etl-runner/config/config.yml -r ./resolver.json --enrichments ./snowplow/3-enrich/config/enrichments/ --targets snowplow/4-storage/ --debug
Conf file
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: ***********
secret_access_key: **********
s3:
region: us-east-1
buckets:
assets: s3n://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3n://******-snowplow-etl/logs
raw:
in: # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
- s3n://elasticbeanstalk-us-east-1-********** # e.g. s3://my-old-collector-bucket
processing: s3://*******-snowplow-etl-archive/processing
archive: s3://******-snowplow-etl-archive/raw # e.g. s3://my-archive-bucket/raw
enriched:
good: s3://********-snowplow-etl-data/enriched/good # e.g. s3://my-out-bucket/enriched/good
bad: s3://********-snowplow-etl-data/enriched/bad # e.g. s3://my-out-bucket/enriched/bad
errors: s3://********-snowplow-etl-data/enriched/errors
archive: s3://********-snowplow-etl-data/enriched/archive # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
shredded:
good: s3://********-snowplow-etl-data/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://********-snowplow-etl-data/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: s3://********-snowplow-etl-data/shredded/errors # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://********-snowplow-etl-data/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 5.8.0
region: us-east-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-********* # Set this if running in VPC. Leave blank otherwise
ec2_key_name: ************
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow # Give your job a name
master_instance_type: c4.large
core_instance_count: 2
core_instance_type: c4.large
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 200 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: c4.large
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark:
maximizeResourceAllocation: "true"
additional_info: # Optional JSON string for selecting additional features
collectors:
format: clj-tomcat # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
versions:
spark_enrich: 1.9.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: true # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.12.0
rdb_shredder: 0.12.0 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
app_id: snowplow # e.g. snowplow
collector: ********.us-east-1.elasticbeanstalk.com # e.g. d3rkrsqld9gmqf.cloudfront.net
Could you give us the detail of the step prior to the enrich one?
That should be a step named “Elasticity S3DistCp Step: Raw S3 -> Raw HDFS”? I’m particularly interested in the arguments.
Thank you, I was able to confirm by cloning the terminated cluster that the files are indeed being copied over to the cluster.
Elasticity S3DistCp Step: Raw S3 -> Raw HDFS
JAR location : /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
Main class : None
Arguments : --src s3://********-snowplow-etl-archive/processing/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com
Action on failure: Terminate cluster
Could you check the contents of the s3://********-snowplow-etl-archive/processing/
bucket or alternatively, the hdfs:///local/snowplow/raw-events/
HDFS folder.
I find it weird that there would be a /local/snowplow/raw-events/resources/environments
folder in there.
I confirmed the logs are in both the S3 archive/processing bucket and hdfs, I checked the format in the S3 bucket and it matches the example in the documentation for tomcat clojure-connector.
HDFS
hdfs dfs -ls /local/snowplow/raw-events/resources/environments/logs/publish/e-******/i-******
single log output just as an example: var_log_tomcat8_rotated_localhost_access_log.txt1506348061.gz
Hi Ben, also here is the stacktrace from the error. Oh, and Nick Chammas says hi!
17/09/26 14:31:08 INFO FileInputFormat: Total input paths to process : 26
17/09/26 14:31:08 ERROR ApplicationMaster: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-243.ec2.internal:8020/local/snowplow/raw-events/resources/environments
java.io.IOException: Not a file: hdfs://ip-10-97-58-243.ec2.internal:8020/local/snowplow/raw-events/resources/environments
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.run(EnrichJob.scala:198)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.run(EnrichJob.scala:84)
at com.snowplowanalytics.snowplow.enrich.spark.SparkJob$class.main(SparkJob.scala:32)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.main(EnrichJob.scala:51)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.main(EnrichJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Ah, the problem comes from the fact that you should specify your in
buckets as:
raw:
in:
- s3n://elasticbeanstalk-us-east-1-**********/resources/environments/logs/publish/e-******
for each of your collector which can be identified with e-*****
.
Indeed, the folder hierarchy needs to be mostly flat to pick up every log file.
Say hi to Nick from me as well! Small world!
Will do, I should have updated. I had actually made that change to the config after I submitted my original post for review. I wonder if this could be tied to the S3 bucket lay out. This might not be correct, the processing bucket has two directories in it.
******-snowplow-etl-archive/processing/resources/environments/logs/publish
e-*******q
e-*******a
- s3n://elasticbeanstalk-us-east-1-*******/resources/environments/logs/publish/e-*****
Then you’ll need something like:
raw:
in:
- "s3n://elasticbeanstalk-us-east-1-**********/resources/environments/logs/publish/e-******q"
- "s3n://elasticbeanstalk-us-east-1-**********/resources/environments/logs/publish/e-******a"
I ran the job again but unfortunately it produces the same stacktrace.
17/09/26 14:06:54 ERROR ApplicationMaster: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-60.ec2.internal:8020/local/snowplow/raw-events/resources/environments
java.io.IOException: Not a file: hdfs://ip-10-97-58-60.ec2.internal:8020/local/snowplow/raw-events/resources/environments
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.run(EnrichJob.scala:198)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.run(EnrichJob.scala:84)
at com.snowplowanalytics.snowplow.enrich.spark.SparkJob$class.main(SparkJob.scala:32)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.main(EnrichJob.scala:51)
at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.main(EnrichJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
17/09/26 14:06:54 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-60.ec2.internal:8020/local/snowplow/raw-events/resources/environments)
It made it past the enrichment step. I added the extra bucket in the config, and then I cleaned up my buckets, reran and it worked. Thanks for all the help! Much appreciated.