Our setup is - StreamCollector > Raw Stream > Kinesis LZO S3 Sink > EmrEtlRunner > StorageLoader > Redshift
The collector format is “thrift”.
We’re getting a large volume of data into our collector which then gets pushed into “raw/in” S3 folder. In this scenario, when we run EmrEtlRunner, it is not able to push data from “raw/in” into “raw/processing” faster than the data that’s coming into “raw/in” from collector. Hence, it is stuck in “staging” step and is not able to progress into the EMR stage of batch pipeline.
Question -
Is this setup correct?
The documentation on “2-Using-EmrEtlRunner” states that it can run in a timespan mode for only “cloudfront” collector format which doesn’t work for us (since we’re using thrift").
Can timespan mode be made to support thrift format?
Can the timespan be made granular to include hours and minutes?
The EmrEtlRunner process runs but cannot proceed beyond step 1 of the batch pipeline because it’s is not able to catch up to the new records that get put in under “raw/in” through collectors. I had to manually stop the EmrEtlRunner process and rerun it with --skip staging option to start the EMR jobflow.
Also, could you please let me know your thoughts on question #2.
We have 2 collectors that are writing 500 records every ~0.5 seconds. Here’s a snippet of the collector log -
08:05.518 [scala-stream-collector-akka.actor.default-dispatcher-14] INFO c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:05.993 [pool-1-thread-7] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:06.228 [scala-stream-collector-akka.actor.default-dispatcher-13] INFO c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:06.663 [pool-1-thread-2] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:06.950 [scala-stream-collector-akka.actor.default-dispatcher-10] INFO c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:07.410 [pool-1-thread-9] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:07.645 [scala-stream-collector-akka.actor.default-dispatcher-9] INFO c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:07.960 [pool-1-thread-1] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:08.354 [scala-stream-collector-akka.actor.default-dispatcher-10] INFO c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:08.674 [pool-1-thread-8] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
18:08:09.063 [scala-stream-collector-akka.actor.default-dispatcher-15] INFO c.s.s.c.s.sinks.KinesisSink - Writing 500 Thrift records to Kinesis stream collector-stream-good
18:08:09.483 [pool-1-thread-1] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 500 out of 500 records
How many EC2 instances is the Kinesis S3 sink running on
One (Type - m4.large)
What are the buffer settings (from the config.hocon) for the Kinesis S3 sink
I initially had set it to -
buffer {
byte-limit: 1000000 # 1MB
record-limit: 100
time-limit: 120000
}
What is the specification of the box running EmrEtlRunner
c3.xlarge
EMR config -
emr:
ami_version: 4.5.0
region: us-west-2 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: xxxx # Set this if running in VPC. Leave blank otherwise
ec2_key_name: xxxx
bootstrap: # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, “0.92.0”, keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, “1.1”, keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
master_instance_type: c3.xlarge
core_instance_count: 2
core_instance_type: c3.xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: true # Optional. Will default to true
task_instance_count: 3 # Increase to use spot instances
task_instance_type: c3.xlarge
task_instance_bid: xxxx # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
additional_info: # Optional JSON string for selecting additional features
EMR version - emr_r87_chichen_itza
Couple of points -
I started this setup a couple days ago only with the latest version of EMR so have not completed once success enrich and load in Redshift yet
The EMR job flow I started with --skip staging has been running for over 11 hours now with almost all the time spent in - Elasticity Scalding Step: Enrich Raw Events
Total Objects in S3 raw/processing - 72603
Thanks for the additional detail @amitkhanal. I read that as you generating either 1,000 or 2,000 events per second, and you have the record limit in Kinesis S3 set to 500 records, so are going to be generating either 120 or 240 files a minute, 7,200 or 14,400 files per hour.
This is going to massively overload not just EmrEtlRunner but also Hadoop.
My recommendation is to:
Change the buffer settings so you are generating only double-digit files per hour (the lower the better)
Delete the files you have so far
Delete the Kinesis S3 checkpoint from DynamoDB so that you can generate much smaller files from the start of your stream
Let us know what buffer settings end up working for you!
I’ve also set the time limit to 4 hrs but that leads out of memory sometimes. The raw flie size in S3 ranges from 250 MB to 750MB based on traffic and time of day.
EMR config.yml -
jobflow:
master_instance_type: m1.medium
core_instance_count: 3
core_instance_type: c4.2xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 500 # Gigabytes
volume_type: “gp2”
volume_iops: 400 # Optional. Will only be used if volume_type is “io1”
ebs_optimized: true # Optional. Will default to true
task_instance_count: 2 # Increase to use spot instances
task_instance_type: m4.xlarge
task_instance_bid: x.xx # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
Here are some entries from manifest with the configs above -
Hey @amitkhanal - thanks for sharing that. It sounds like a good first step - for a second phase you could consider adding a file compaction step at the front of the EMR job. This will reduce your reliance on the Kinesis S3 buffer settings to do all the heavy lifting of file # minimization.
I’m afraid there’s no formal documentation on the compaction approach, so let’s write some here
You will need to delete the .lzo.index files before you do the compaction
You can perform the compaction using S3DistCp, the jar is /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
After the compaction if you want you can re-index the files using s3://snowplow-hosted-assets/third-party/twitter/hadoop-lzo-0.4.20.jar
Then you can run the Hadoop Enrich job
Hope this helps! These kinds of techniques are very valuable for running Snowplow at “mega-scale” - this is an ongoing program of R&D at Snowplow, which is why none of this has been formally documented / integrated into the codebase yet.
If you are responsible for operating a Snowplow at these kinds of volumes and the Snowplow Managed Service is not an option, then I would recommend checking out the Big Data on AWS three day training course.
If you are also running Stream Enrich, then another strategy you can consider is to add a second Kinesis S3 sink onto the Kinesis enriched event stream, and then run EMR with --skip enrich, so it only runs the Hadoop Shred step.