Hello all!
We’re quite new to Snowplow, trying to set a pipeline up in AWS, but our Hadoop step 3 fails with a Not a file exception. We’re using Stream Enrich on the raw stream and Kinesis Firehose to get the enriched stream into an S3 bucket in GZIP.
When running the EmrEtlRunner the Hadoop cluster is spun up and the first 2 step completes: [staging_stream_enrich] s3-dist-cp
and [shred] s3-dist-cp
.
However, the 3rd step [shred] spark
fails with the exception found at the bottom (this is from an ec2 instance which we found in the Hadoop stderr
logs).
This is our current configuration:
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
s3:
region: eu-west-2
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
log: s3://REDACTED-snowplow-log-bucket
encrypted: false
enriched:
good: s3://REDACTED-snowplow-out-bucket/enriched/good # e.g. s3://my-out-bucket/enriched/good
archive: s3://REDACTED-snowplow-archive-bucket/enriched # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
stream: s3://REDACTED-snowplow-bucket/enriched/good # S3 Loader's OR Firehose Kinesis output folder with enriched data. If present raw buckets will be discarded
shredded:
good: s3://REDACTED-snowplow-out-bucket/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://REDACTED-snowplow-out-bucket/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://REDACTED-snowplow-archive-bucket/shredded # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
consolidate_shredded_output: false # Whether to combine files when copying from hdfs to s3
emr:
ami_version: 5.9.0
region: eu-west-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-REDACTED # Set this if running in VPC. Leave blank otherwise
ec2_key_name: snowplow-runner-key
security_configuration: # Specify your EMR security configuration if needed. Leave blank otherwise
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow ETL # Give your job a name
master_instance_type: m4.large
core_instance_count: 2
core_instance_type: m4.large
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: m4.large
task_instance_bid: # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark:
maximizeResourceAllocation: "true"
additional_info: # Optional JSON string for selecting additional features
enrich:
versions:
spark_enrich: 1.18.0 # Version of the Spark Enrichment process
output_compression: GZIP # Stream mode supports only GZIP
storage:
versions:
rdb_loader: 0.16.0-rc2
rdb_shredder: 0.15.0-rc3 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
app_id: snowplow-runner-app # e.g. snowplow
collector: SnowplowCollector-env.REDACTED.eu-west-2.elasticbeanstalk.com # e.g. d3rkrsqld9gmqf.cloudfront.net
protocol: http
port: 80
LOGS:
19/07/31 21:31:42 INFO RMProxy: Connecting to ResourceManager at ip-172-31-25-175.eu-west-2.compute.internal/172.31.25.175:8030
19/07/31 21:31:42 INFO YarnRMClient: Registering the ApplicationMaster
19/07/31 21:31:42 INFO Utils: Using initial executors = 2, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
19/07/31 21:31:42 INFO YarnAllocator: Will request 2 executor container(s), each with 4 core(s) and 5248 MB memory (including 477 MB of overhead)
19/07/31 21:31:42 INFO YarnAllocator: Submitted 2 unlocalized container requests.
19/07/31 21:31:42 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
19/07/31 21:31:43 INFO AMRMClientImpl: Received new token for : ip-172-31-24-222.eu-west-2.compute.internal:8041
19/07/31 21:31:43 INFO YarnAllocator: Launching container container_1564608487795_0003_01_000002 on host ip-172-31-24-222.eu-west-2.compute.internal for executor with ID 1
19/07/31 21:31:43 INFO YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
19/07/31 21:31:43 INFO ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
19/07/31 21:31:43 INFO ContainerManagementProtocolProxy: Opening proxy : ip-172-31-24-222.eu-west-2.compute.internal:8041
19/07/31 21:31:48 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (172.31.24.222:48674) with ID 1
19/07/31 21:31:48 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1)
19/07/31 21:31:48 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-24-222.eu-west-2.compute.internal:40069 with 2.6 GB RAM, BlockManagerId(1, ip-172-31-24-222.eu-west-2.compute.internal, 40069, None)
19/07/31 21:32:11 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
19/07/31 21:32:11 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done
19/07/31 21:32:11 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('hdfs:///user/spark/warehouse').
19/07/31 21:32:11 INFO SharedState: Warehouse path is 'hdfs:///user/spark/warehouse'.
19/07/31 21:32:11 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
19/07/31 21:32:13 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 302.6 KB, free 3.1 GB)
19/07/31 21:32:13 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 26.4 KB, free 3.1 GB)
19/07/31 21:32:13 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 172.31.19.254:46581 (size: 26.4 KB, free: 3.1 GB)
19/07/31 21:32:14 INFO SparkContext: Created broadcast 0 from textFile at ShredJob.scala:278
19/07/31 21:32:16 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
19/07/31 21:32:16 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter
19/07/31 21:32:16 INFO SparkContext: Starting job: text at ShredJob.scala:356
19/07/31 21:32:16 INFO GPLNativeCodeLoader: Loaded native gpl library
19/07/31 21:32:16 INFO LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5f779390180acb8f1d86999c0a0294917976289f]
19/07/31 21:32:16 INFO FileInputFormat: Total input paths to process : 1
19/07/31 21:32:16 WARN DAGScheduler: Creating new stage failed due to exception - job: 0
java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:472)
at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:346)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:421)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.mutable.AbstractSet.map(Set.scala:46)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:421)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:408)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:891)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1868)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
19/07/31 21:32:16 INFO DAGScheduler: Job 0 failed: text at ShredJob.scala:356, took 0.085470 s
19/07/31 21:32:16 ERROR FileFormatWriter: Aborting job null.
java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:472)
at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:346)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:421)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.mutable.AbstractSet.map(Set.scala:46)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:421)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:408)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:891)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1868)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:555)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:356)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply$mcV$sp(ShredJob.scala:162)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
at scala.util.Try$.apply(Try.scala:192)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.runJob(ShredJob.scala:162)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:155)
at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:63)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
19/07/31 21:32:16 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Job aborted.
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:474)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:217)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:555)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob.run(ShredJob.scala:356)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply$mcV$sp(ShredJob.scala:162)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$$anonfun$runJob$1.apply(ShredJob.scala:162)
at scala.util.Try$.apply(Try.scala:192)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.runJob(ShredJob.scala:162)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.run(ShredJob.scala:155)
at com.snowplowanalytics.snowplow.storage.spark.SparkJob$class.main(SparkJob.scala:32)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob$.main(ShredJob.scala:63)
at com.snowplowanalytics.snowplow.storage.spark.ShredJob.main(ShredJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
Caused by: java.io.IOException: Not a file: hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/local/snowplow/enriched-events/2019/07
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:87)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:237)
at org.apache.spark.scheduler.DAGScheduler.getShuffleDependencies(DAGScheduler.scala:472)
at org.apache.spark.scheduler.DAGScheduler.getMissingAncestorShuffleDependencies(DAGScheduler.scala:439)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getOrCreateShuffleMapStage(DAGScheduler.scala:346)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$getOrCreateParentStages$1.apply(DAGScheduler.scala:421)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:46)
at scala.collection.SetLike$class.map(SetLike.scala:92)
at scala.collection.mutable.AbstractSet.map(Set.scala:46)
at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:421)
at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:408)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:891)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1868)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1849)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:188)
... 49 more
19/07/31 21:32:16 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.SparkException: Job aborted.)
19/07/31 21:32:16 INFO SparkContext: Invoking stop() from shutdown hook
19/07/31 21:32:16 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-19-254.eu-west-2.compute.internal:42899
19/07/31 21:32:16 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
19/07/31 21:32:16 INFO YarnClusterSchedulerBackend: Shutting down all executors
19/07/31 21:32:16 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/07/31 21:32:16 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
19/07/31 21:32:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/07/31 21:32:16 INFO MemoryStore: MemoryStore cleared
19/07/31 21:32:16 INFO BlockManager: BlockManager stopped
19/07/31 21:32:16 INFO BlockManagerMaster: BlockManagerMaster stopped
19/07/31 21:32:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/07/31 21:32:16 INFO SparkContext: Successfully stopped SparkContext
19/07/31 21:32:16 INFO ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted.)
19/07/31 21:32:16 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
19/07/31 21:32:16 INFO ApplicationMaster: Deleting staging directory hdfs://ip-172-31-25-175.eu-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1564608487795_0003
19/07/31 21:32:16 INFO ShutdownHookManager: Shutdown hook called
19/07/31 21:32:16 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1564608487795_0003/spark-3b7990ca-028a-492b-a4be-217a2898bf62
Could you please help to see what we’re missing?