Hi all!
I’m having trouble sending bad rows to elasticsearch from the emr etl runner.
I’m using r83, and the closest to the root cause that I can find is this:
cascading.tuple.TupleException: unable to sink into output identifier: snowplow/bad_rows
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:160)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:95)
at cascading.tuple.TupleEntrySchemeCollector.add(TupleEntrySchemeCollector.java:134)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:90)
at cascading.flow.stream.SinkStage.receive(SinkStage.java:37)
at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
at com.twitter.scalding.MapFunction.operate(Operations.scala:59)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:130)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:455)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:166)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: null
null
at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:427)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:385)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:363)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:367)
at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:121)
at org.elasticsearch.hadoop.rest.RestClient.esVersion(RestClient.java:513)
at org.elasticsearch.hadoop.rest.InitializationUtils.discoverEsVersion(InitializationUtils.java:177)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:378)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.init(EsOutputFormat.java:173)
at org.elasticsearch.hadoop.mr.EsOutputFormat$EsRecordWriter.write(EsOutputFormat.java:149)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:867)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:619)
at cascading.tap.hadoop.util.MeasuredOutputCollector.collect(MeasuredOutputCollector.java:69)
at org.elasticsearch.hadoop.cascading.EsHadoopScheme.sink(EsHadoopScheme.java:205)
at cascading.tuple.TupleEntrySchemeCollector.collect(TupleEntrySchemeCollector.java:153)
… 21 more
which I found in the EMR logs for the job here:
s3://…/snowplow/logs/j-25SOPSJS5RE05/containers/application_1480715933846_0001/container_1480715933846_0001_01_000001/syslog.gz
I’ve added the EMR role and EC2 instance profile role IAM roles to the access policy for the elasticsearch service thinking it may be permissions, but it’s a rather cryptic error that hopefully others have seen.
I’ve manually created the index (snowplow), but haven’t found any information about the type mapping, so I’m assuming it’s auto created?
Thanks
rick