Frequently failing in the 4th steps of storage process

Hi All,

Couple of weeks back everything was working fine…
But now it is frequently failing in the 4th step of the storage process…
Below is the architecture i am following.

javascript tracker -> scala stream collector ->Kinesis S3 -> S3 -> EmrEtlRunner (shredding+enrich) -> Redshift

i am getting below error…

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-12-31.ec2.internal:8020/tmp/0f4fda2c-b209-452d-9aa6-261ca887c175/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

Every time the events are ending in bad bucket during the enrichment process…
please help me out to resolve this error.
Let know any changes i need to do?

Below is my config.yml file

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: xxxxxx
  secret_access_key: xxxxxxxx
  #keypair: Snowplowkeypair
  #key-pair-file: /home/ubuntu/snowplow/4-storage/config/Snowplowkeypair.pem
  region: us-east-1
  s3:
	region: us-east-1
	buckets:
	  assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
	  jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
	  log: s3://dataobjecteventsstorage/logs
	  raw:
		in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
		  - s3://dataobjecteventsstorage/      # e.g. s3://my-old-collector-bucket
		processing: s3://dataobjecteventsstorage/raw/processing
		archive: s3://dataobjecteventsstorage/raw/archive   # e.g. s3://my-archive-bucket/raw
	  enriched:
		good: s3://dataobjecteventsstorage/enriched/good        # e.g. s3://my-out-bucket/enriched/good
		bad: s3://dataobjecteventsstorage/enriched/bad       # e.g. s3://my-out-bucket/enriched/bad
		errors: s3://dataobjecteventsstorage/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
		archive: s3://dataobjecteventsstorage/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
	  shredded:
		good: s3://dataobjecteventsstorage/shredded/good        # e.g. s3://my-out-bucket/shredded/good
		bad: s3://dataobjecteventsstorage/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
		errors: s3://dataobjecteventsstorage/shredded/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
		archive: s3://dataobjecteventsstorage/shredded/archive     # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
	ami_version: 5.5.0
	region: us-east-1       # Always set this
	jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
	service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
	placement: us-east-1a      # Set this if not running in VPC. Leave blank otherwise
	ec2_subnet_id:  # Set this if running in VPC. Leave blank otherwise
	ec2_key_name: Snowplowkeypair
	bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
	software:
	  hbase:              # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
	  lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
	# Adjust your Hadoop cluster below
	jobflow:
	  job_name: Snowplow ETL # Give your job a name
	  master_instance_type: m1.medium
	  core_instance_count: 2
	  core_instance_type: m1.medium
	  core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
		volume_size: 100    # Gigabytes
		volume_type: "gp2"
		volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
		ebs_optimized: false # Optional. Will default to true
	  task_instance_count: 0 # Increase to use spot instances
	  task_instance_type: m1.medium
	  task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
	bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
	configuration:
	  yarn-site:
		yarn.resourcemanager.am.max-attempts: "1"
	  spark:
		maximizeResourceAllocation: "true"
	additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
	spark_enrich: 1.9.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
	rdb_loader: 0.12.0
	rdb_shredder: 0.12.0        # Version of the Spark Shredding process
	hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
	level: DEBUG # You can optionally switch to INFO for production
  #snowplow:
	#method: get
	#app_id: unilog # e.g. snowplow
	#collector: 172.31.38.39:8082 # e.g. d3rkrsqld9gmqf.cloudfront.net

@sandesh to figure out why events end up in bad bucket you need to look at those bad-bucket events. Those contain detailed error message.

Hey @anton,
I checked in the enriched bad bucket, it has 1 file and 1 folder(inside the folder another file was their)
As show in the screenshot.
I checked both the file ,but it was empty.

Please give me proper direction.

For my AWS server it is charging on daily basis, i cant able to bear it.

I’m not sure I’m following @sandesh. You said that events ending up in bad bucket, but at the same time it’s empty, so it means there’s no events at all.

Hey @anton SORRY for the confusion.
Buckets are generating but when i check the bad bucket as well as good bucket both are empty…
In the Enrich bucket under the archive .csv file is generating, below is the data of the event present in the csv file

"13	web	2017-11-22 08:01:15.420	2017-11-20 06:34:45.614	2017-11-20 06:35:59.340	page_view	a540f3be-12e5-4997-b2de-4ed3068e28d3		cf	js-2.8.0	ssc-0.9.0-kinesis	spark-1.9.0-common-0.25.0		172.31.38.x	4265106636	2aa179fb-7a7e-4537-a716-b953bc7e0575	1	fe344181-115b-4285-aad7-d17b8a1e463f												http://localhost:8085/ChangesHTML/SampleExampleTracker.html	Fixed Width 2 Blue		http	localhost	8085	/ChangesHTML/SampleExampleTracker.html																																										Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML"

Please help me out to resolve this error.

Exception in thread "main" java.lang.RuntimeException: Error running job
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:927)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:705)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
	at com.amazon.elasticmapreduce.s3distcp.Main.main(Main.java:22)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
	at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-172-31-2-208.ec2.internal:8020/tmp/029fefc3-5b34-4f1d-87ec-84d2fd7798d9/files
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:317)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265)
	at org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.java:59)
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:352)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:301)
	at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:318)
	at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)
	at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
	at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)
	at com.amazon.elasticmapreduce.s3distcp.S3DistCp.run(S3DistCp.java:901)
	... 10 more

Why the buckets are getting empty.
Do i need to do any changes in configuration?
Do i need to rebuild the server please let me know…

Below is my config.yml file

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: xxxxxxxxx
  secret_access_key: xxxxxxxxx
  #keypair: Snowplowkeypair
  #key-pair-file: /home/ubuntu/snowplow/4-storage/config/Snowplowkeypair.pem
  region: us-east-1
  s3:
	region: us-east-1
	buckets:
	  assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
	  jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
	  log: s3://dataobjecteventsstorage/logs
	  raw:
		in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
		  - s3://dataobjecteventsstorage/      # e.g. s3://my-old-collector-bucket
		processing: s3://dataobjecteventsstorage/raw/processing
		archive: s3://dataobjecteventsstorage/raw/archive   # e.g. s3://my-archive-bucket/raw
	  enriched:
		good: s3://dataobjecteventsstorage/enriched/good        # e.g. s3://my-out-bucket/enriched/good
		bad: s3://dataobjecteventsstorage/enriched/bad       # e.g. s3://my-out-bucket/enriched/bad
		errors: s3://dataobjecteventsstorage/enriched/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
		archive: s3://dataobjecteventsstorage/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
	  shredded:
		good: s3://dataobjecteventsstorage/shredded/good        # e.g. s3://my-out-bucket/shredded/good
		bad: s3://dataobjecteventsstorage/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
		errors: s3://dataobjecteventsstorage/shredded/errors     # Leave blank unless :continue_on_unexpected_error: set to true below
		archive: s3://dataobjecteventsstorage/shredded/archive     # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
	ami_version: 5.5.0
	region: us-east-1       # Always set this
	jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
	service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
	placement: us-east-1a      # Set this if not running in VPC. Leave blank otherwise
	ec2_subnet_id:  # Set this if running in VPC. Leave blank otherwise
	ec2_key_name: Snowplowkeypair
	bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
	software:
	  hbase:              # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
	  lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
	# Adjust your Hadoop cluster below
	jobflow:
	  job_name: Snowplow ETL # Give your job a name
	  master_instance_type: m1.medium
	  core_instance_count: 2
	  core_instance_type: m1.medium
	  core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
		volume_size: 100    # Gigabytes
		volume_type: "gp2"
		volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
		ebs_optimized: false # Optional. Will default to true
	  task_instance_count: 0 # Increase to use spot instances
	  task_instance_type: m1.medium
	  task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
	bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
	configuration:
	  yarn-site:
		yarn.resourcemanager.am.max-attempts: "1"
	  spark:
		maximizeResourceAllocation: "true"
	additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: thrift # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
	spark_enrich: 1.9.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
	rdb_loader: 0.12.0
	rdb_shredder: 0.12.0        # Version of the Spark Shredding process
	hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
	level: DEBUG # You can optionally switch to INFO for production
  #snowplow:
	#method: get
	#app_id: unilog # e.g. snowplow
	#collector: 172.31.38.39:8082 # e.g. d3rkrsqld9gmqf.cloudfront.net

Thanks,
Sandesh P