GC overhead limit exceeded due to large amount of data. How do we process data in small parts?


Our data enrichment process has not been running for 4 months using snowplow.
And at there moment there are a lot of it for that period.

We are using AWS Kinesis as data source.
First staging step has been successfull.(Fetched all the data from kinesis and stored in our S3 bucket.)

However enrichment process keeps failing with error:

java.lang.OutOfMemoryError: GC overhead limit exceeded

There are plenty of disk space and memory.
Logs from YARN:

2019-04-07 10:26:34,881 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 7512 for container-id container_1554632190123_0001_01_000086: 624.1 MB of 13.3 GB physical memory used; 12.9 GB of 66.3 GB virtual memory used

2019-04-07 10:26:36,061 INFO SecurityLogger.org.apache.hadoop.ipc.Server (Socket Reader #1 for port 8041): Auth successful for appattempt_1554632190123_0001_000001 (auth:SIMPLE)
2019-04-07 10:26:36,063 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl (IPC Server handler 5 on 8041): Stopping container with container Id: container_1554632190123_0001_01_000086
2019-04-07 10:26:36,063 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger (IPC Server handler 5 on 8041): USER=hadoop IP= OPERATION=Stop Container Request TARGET=ContainerManageImpl RESULT=SUCCESS APPID=application_1554632190123_0001 CONTAINERID=container_1554632190123_0001_01_000086
2019-04-07 10:26:36,063 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl (AsyncDispatcher event handler): Container container_1554632190123_0001_01_000086 transitioned from RUNNING to KILLING

Config file which we are using:

access_key_id: “string”
secret_access_key: “string”
region: region
assets: s3://bucket/snowplow-hosted-assets
jsonpath_assets: s3://bucket/jsonpaths
log: s3://bucket/log
- s3://bucket
processing: s3://bucket/raw/processing
archive: s3://bucket/archive/raw
good: s3://bucket/enrich/good
bad: s3://bucket/enrich/bad
errors: s3://bucket/enrich/errors
archive: s3://bucket/archive/enriched
good: s3://bucket/shredded/good
bad: s3://bucket/shredded/bad
errors: s3://bucket/shredded/errors
archive: s3://bucket/archive/shredded
ami_version: 4.5.0
region: region
jobflow_role: EMR_EC2_DefaultRole
service_role: EMR_DefaultRole
ec2_subnet_id: subnet
ec2_key_name: key
master_instance_type: c4.4xlarge
core_instance_count: 30
volume_size: 300
volume_type: “gp2”
core_instance_type: c4.4xlarge
task_instance_count: 0
task_instance_type: c4.4xlarge
task_instance_bid: 0.015
bootstrap_failure_tries: 3
format: thrift
job_name: snowplow-enrich-emretl
hadoop_enrich: 1.8.0
hadoop_shred: 0.10.0
hadoop_elasticsearch: 0.1.0
continue_on_unexpected_error: true
output_compression: GZIP
- name: name
type: redshift
host: redshift_host
database: database
port: 5432
ssl_mode: require
table: atomic.events
username: username
password: password
maxerror: 5
comprows: 200000
tags: {}
level: DEBUG
snowplow: ~

Command which i use to re-start EMR process:

/var/snowplow/snowplow-emr-etl-runner --config /var/snowplow/conf.yaml --resolver /var/snowplow/resolver.json --enrichments /var/snowplow/enrichments --skip staging

Taking all this into account i dont see a way to enrich, shred and load the data into database without splitting the process into several time periods.
I just don’t know how to do it properly.

Lets say we have data for 4 months and it is being fetched and stored already in S3 bucket:

December, January, February, March

The question is how properly to run enrich and shred 4 times without breaking/losing the data?
Assuming this is stack: [December|January|February|March] we pop data for the last month from this stack.
Initially we need to process data from March till the end
Then for whole February and same for January and December.
If i recall correctly this requires manipulation with:

processing: s3://bucket/raw/processing

Should i put for the first run:

processing: s3://bucket/raw/processing/run=2019.03.01-time
and then for the second run:
processing: s3://bucket/raw/processing/run=2019.02.01-time
until all the data will be in shredded bucket and then just run storage loader?

I hope i make my issue clear in the post.
If you need any data please let me know.


@xinitrc, you seem to be using a very old version of the pipeline (still on Hadoop framework), presumably R87. Your cluster is quite big - 30x c4.4xlarge. However, the cluster size depends on the volume of events to be processed. Rough estimation is 1x c4.4xlarge is fine to process ~4,000,000 events.

In any case, breaking down the payload/volume is a good idea. If you do have the logs/events present in dedicated folders (broken down per month) then indeed, you could amend the path to the processing bucket they way you described it yourself. Probably, the EMR cluster could be downscaled as well.

I would, however, load the data into Redshift too as part of a single (month-worth) run rather than accumulating shredded types.

Depending on the volume of self-describing events and contexts captured, 1K of log size roughly corresponds to 1 event. Though, 1 event could be considerably higher in the log size of raw events, thus treat this formula with caution.

@ihor, yeah the process is quite old(legacy project) and we are planning to upgrade.
Regarding your response, is there a way to tell enrichment stage to process files in processing folders which match the regexp like this for example: 2019-03-*?
Or at least force it to process files from some specific date towards the end without touching others?

I want to use -s flag like this:

/var/snowplow/snowplow-emr-etl-runner --config /var/snowplow/conf.yaml --resolver /var/snowplow/resolver.json --enrichments /var/snowplow/enrichments --skip staging -s 2019-03-01

But i’m not sure if it will not delete older events and processing folder after it is done.
Can you give some details how it is going to treat processing folder if i will use -s flag?

Hi @xinitrc,

Regarding your response, is there a way to tell enrichment stage to process files in processing folders which match the regexp like this for example: 2019-03-*?
Or at least force it to process files from some specific date towards the end without touching others?

No, EER can’t do it. It picks up all data which is available in the processing bucket. You need to prepare the files in advance: e.g. leave only one month and move others in another tmp location for future runs.