Snowflake Transformer Step HDFS Problems

Here is my setup

tracker -> collector-kinesis -> enricher-kinesis -> s3-loader-kinesis -> s3 -> s3-dist-cp.jar -> snowplow-snowflake-transformer-0.8.0.jar -> snowplow-snowflake-loader-0.8.0.jar

everything seems to work well up till s3-dist-cp.jar.

All the jars are run through dataflow-runner on aws emr cluster.

here is my playbook

{
“schema”:“iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1”,
“data”:{
“region”:"<my_aws_region>",
“credentials”:{
“accessKeyId”:“default”,
“secretAccessKey”:“default”
},
“steps”:[
{
“type”: “CUSTOM_JAR”,
“name”: “Staging enriched data”,
“actionOnFailure”: “CANCEL_AND_WAIT”,
“jar”: “/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar”,
“arguments”: [
“–src”, “s3://<my_bucekt_name>/enriched-sink/”,
“–dest”, “s3://<my_bucekt_name>/enriched/archive/run={{nowWithFormat “2006-01-02-15-04-05”}}/”,
“–s3Endpoint”, “s3.amazonaws.com”,
“–srcPattern”, “.*\.gz”,
“–deleteOnSuccess”,
“–s3ServerSideEncryption”
]
},
{
“type”:“CUSTOM_JAR”,
“name”:“Snowflake Transformer”,
“actionOnFailure”:“CANCEL_AND_WAIT”,
“jar”:“command-runner.jar”,
“arguments”:[
“spark-submit”,
“–conf”,
“spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat”,
“–deploy-mode”,
“cluster”,
“–class”,
“com.snowplowanalytics.snowflake.transformer.Main”,

           "s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.8.0.jar",

           "--config",
           "{{base64File "/snowplow/config/snowflake/snowflake.json"}}",
           "--resolver",
           "{{base64File "/snowplow/config/resolver/resolver.json"}}"
        ]
    },
    {
        "type":"CUSTOM_JAR",
        "name":"Snowflake Loader",
        "actionOnFailure":"CANCEL_AND_WAIT",
        "jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.8.0.jar",
        "arguments":[
           "load",
           "--base64",
           "--config",
           "{{base64File "/snowplow/config/snowflake/snowflake.json"}}",
           "--resolver",
           "{{base64File "/snowplow/config/resolver/resolver.json"}}"
        ]
    }
  ],
  "tags":[ ]

}
}

Notable in the stack trace I see 2 HDFS issues

java.io.IOException: Failed to rename HdfsNamedFileStatus{

and

java.io.FileNotFoundException: File hdfs://ip-

any ideas on what I am doing wrong ?

I provided a trimmed stack trace below

21/01/03 20:25:53 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-172-31-85-111.ec2.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to rename HdfsNamedFileStatus{path=hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads/_temporary/0/_temporary/attempt_20210103202544_0000_m_000000_0; isDirectory=false; length=960; replication=1; blocksize=134217728; modification_time=1609705553258; access_time=1609705552227; owner=hadoop; group=hadoop; permission=rw-r–r--; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} to hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:478)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:608)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:571)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.commitTaskInternal(S3MultipartOutputCommitter.java:472)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.commitTask(S3MultipartOutputCommitter.java:399)
at com.snowplowanalytics.snowflake.transformer.s3.S3DirectoryOutputCommitter.commitTask(S3DirectoryOutputCommitter.java:28)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:197)
at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:112)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
… 9 more
Caused by: java.io.IOException: Failed to rename HdfsNamedFileStatus{path=hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads/_temporary/0/_temporary/attempt_20210103202544_0000_m_000000_3; isDirectory=false; length=960; replication=1; blocksize=134217728; modification_time=1609705554438; access_time=1609705554258; owner=hadoop; group=hadoop; permission=rw-r–r--; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} to hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:478)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:608)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:571)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.commitTaskInternal(S3MultipartOutputCommitter.java:472)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.commitTask(S3MultipartOutputCommitter.java:399)
at com.snowplowanalytics.snowflake.transformer.s3.S3DirectoryOutputCommitter.commitTask(S3DirectoryOutputCommitter.java:28)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:197)
at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:112)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
… 9 more

Driver stacktrace:
21/01/03 20:25:54 INFO DAGScheduler: Job 0 failed: text at TransformerJob.scala:137, took 10.386409 s
21/01/03 20:25:54 ERROR FileFormatWriter: Aborting job 7b3020dd-efe9-4729-bf54-8334bd900cd3.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, ip-172-31-85-111.ec2.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to rename HdfsNamedFileStatus{path=hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads/_temporary/0/_temporary/attempt_20210103202544_0000_m_000000_3; isDirectory=false; length=960; replication=1; blocksize=134217728; modification_time=1609705554438; access_time=1609705554258; owner=hadoop; group=hadoop; permission=rw-r–r--; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false} to hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:478)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:608)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:571)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.commitTaskInternal(S3MultipartOutputCommitter.java:472)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.commitTask(S3MultipartOutputCommitter.java:399)
at com.snowplowanalytics.snowflake.transformer.s3.S3DirectoryOutputCommitter.commitTask(S3DirectoryOutputCommitter.java:28)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:197)
at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:112)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:79)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
… 9 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2215)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2164)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2163)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2163)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1013)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1013)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1013)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2395)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2344)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2333)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:815)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:195)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:181)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:897)
at com.snowplowanalytics.snowflake.transformer.TransformerJob$.process(TransformerJob.scala:137)
at com.snowplowanalytics.snowflake.transformer.TransformerJob$.$anonfun$run$5(TransformerJob.scala:66)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:141)
at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:90)
at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:70)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:50)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:141)
at cats.effect.internals.IORunLoop$.start(IORunLoop.scala:34)
at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:43)
at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:33)
at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:352)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:124)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
… 3 more

21/01/03 20:25:54 WARN CommitProtocolUtils: Exception while aborting null
java.io.FileNotFoundException: File hdfs://ip-172-31-86-211.ec2.internal:8020/tmp/application_1609705306109_0002/pending-uploads/_temporary/0 does not exist.
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1052)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:131)
at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1112)
at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1109)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1119)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1884)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1912)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1962)
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1956)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.getPendingUploads(S3MultipartOutputCommitter.java:251)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.getPendingUploadsIgnoreErrors(S3MultipartOutputCommitter.java:243)
at com.snowplowanalytics.snowflake.transformer.s3.S3MultipartOutputCommitter.abortJob(S3MultipartOutputCommitter.java:326)
at com.snowplowanalytics.snowflake.transformer.s3.S3DirectoryOutputCommitter.abortJob(S3DirectoryOutputCommitter.java:28)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.$anonfun$abortJob$1(HadoopMapReduceCommitProtocol.scala:175)
at org.apache.spark.internal.io.CommitProtocolUtils$.catchAndLogIOE(CommitProtocolUtils.scala:105)
at org.apache.spark.internal.io.CommitProtocolUtils$.catchIOEWhenAbortJob(CommitProtocolUtils.scala:99)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:175)
at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.abortJob(SQLEmrOptimizedCommitProtocol.scala:128)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:225)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:181)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:897)
at com.snowplowanalytics.snowflake.transformer.TransformerJob$.process(TransformerJob.scala:137)
at com.snowplowanalytics.snowflake.transformer.TransformerJob$.$anonfun$run$5(TransformerJob.scala:66)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:141)
at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:90)
at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:70)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:50)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:141)
at cats.effect.internals.IORunLoop$.start(IORunLoop.scala:34)
at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:43)
at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:33)
at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:352)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:124)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:181)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:124)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:123)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:132)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:104)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:227)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:132)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:248)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:131)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at org.apache.spark.sql.DataFrameWriter.text(DataFrameWriter.scala:897)
at com.snowplowanalytics.snowflake.transformer.TransformerJob$.process(TransformerJob.scala:137)
at com.snowplowanalytics.snowflake.transformer.TransformerJob$.$anonfun$run$5(TransformerJob.scala:66)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:141)
at cats.effect.internals.IORunLoop$.startCancelable(IORunLoop.scala:41)
at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:90)
at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:70)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:50)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:141)
at cats.effect.internals.IORunLoop$.start(IORunLoop.scala:34)
at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:43)
at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:33)
at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:352)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:124)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330)
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

1 Like

Hi @tjtaill,

We can see in the error message java.io.FileNotFoundException: File hdfs://ip- which means that Snowflake transformer is reading data from hdfs://. In your playbook we can see that the first S3DistCp doesn’t copy data to HDFS : "–dest", "s3://<my_bucekt_name>/enriched/archive/run={{nowWithFormat “2006-01-02-15-04-05”}}/", but to another path on S3.

You need to copy enriched events from S3 to the HDFS path defined in your Snowflake transformer config (hdfs://ip-). HDFS is the local storage used by Snowflake transformer, that’s why we want to put data there before starting to process them.

@BenB thanks for replying turns out that there is a problem in my playbook for the transformer step I needed to remove the following lines

“–conf”,
“spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat”,

Now everything is working fine