DataFlow Runner problem

Hi All,

I’m new to dataflow runner. I experienced an error while running the step. I made the step based on the step that is created by EmrEtlRunner which is worked but error while doing with dataflow runner.

This is what shown from stderr

Exception in thread “main” java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at com.twitter.scalding.Job$.apply(Job.scala:47)
at com.twitter.scalding.Tool.getJob(Tool.scala:48)
at com.twitter.scalding.Tool.run(Tool.scala:68)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.snowplowanalytics.snowplow.enrich.hadoop.JobRunner$.main(JobRunner.scala:33)
at com.snowplowanalytics.snowplow.enrich.hadoop.JobRunner.main(JobRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: com.snowplowanalytics.snowplow.enrich.common.FatalEtlError: NonEmptyList(error: Required argument [input_format] not found
level: “error”
)
at com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob$$anonfun$2.apply(EtlJob.scala:140)
at com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob$$anonfun$2.apply(EtlJob.scala:140)
at scalaz.Validation$class.fold(Validation.scala:64)
at scalaz.Failure.fold(Validation.scala:330)
at com.snowplowanalytics.snowplow.enrich.hadoop.EtlJob.(EtlJob.scala:139)
… 16 more

The step I try to recreate is Elasticity Scalding Step: Enrich Raw Events.

Thank you for your help

Hey - as per the error message above, you are missing the input_format argument:

In an upcoming release we will do this migration for you - generating Dataflow Runner playbooks that correspond to the EmrEtlRunner jobflow - so it might be worth waiting for that.

Hi alex,

Thank you for reply. But in the sample and the avro schema doesn’t need input_format. The only difference that I made from the sample is in the input. In the sample schema, it uses base64 while I’m not. I’m directly paste the encoded string. Does the error refer to this thing?

Yes, I’m also waiting for snowplowctl.

The Spark Enrich job takes a set of command-line arguments - you will see these if you look in your EMR console for a previous successful job run. It sounds like you aren’t providing these arguments to the Spark Enrich job in your Dataflow Runner playbook.

Hi Alex,

Thank you for reply. After I crosschecked, I miss config the input format. I made it in one line instead make it into 2 arguments that passed.

Thank you for your help