I have to replay some events that were already in our archive/raw. So I used grep to filter them out, added the cloudfront header and gzipped them. Now when I want to import them and run the pipeline, I get a weird error that happens everytime on the worker nodes and brings down the cluster. Any idea?
Internal state when error was thrown: recordCount=683702, recordData=[0.19.11, web, 2018-04-18 14:57:32.303, 2018-01-17 16:08:52.000, 2018-01-17 16:08:52.436, struct, 37e43d18-e270-1bd2-a4ac-4a35d84ad9fb, , cf, js-2.6.2, cloudfront, spark-1.11.0-common-0.28.0, g5gyR_VWEeevVQpYCjwFEQ, 11.11.111.x, 111032734, 00096d5d-3ef4-4e1f-baa9-4af78720cbbb
.....(basically the whole line is printed out here)
at com.univocity.parsers.common.AbstractWriter.throwExceptionAndClose(AbstractWriter.java:916)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:706)
at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.write(UnivocityGenerator.scala:82)
at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.write(CSVFileFormat.scala:139)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:327)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:258)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:261)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:191)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:190)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
at the end it says:
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1546)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:60)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
at com.univocity.parsers.common.input.WriterCharAppender.writeCharsAndReset(WriterCharAppender.java:152)
at com.univocity.parsers.common.AbstractWriter.writeRow(AbstractWriter.java:808)
... 16 more