We have been using Snowplow for a while now and have just started adding some more events that have been increasing our volume significantly.
One thing we have noticed is that the shred step is taking longer and longer, when we expected the enrich step to be the time consuming one.
Our cluster config is:
Master: 1 x m1.medium Core: 3 x i2.2xlarge Task: 40 x m3.2xlarge
The enrich step took 1h59m and the shred step took 6h48m, is there something about our config that is causing this? or is it normal?
The files they are processing are from the stream collector, approx 40mb compressed per batch, this was around 24 hours or so worth of data as we had an issue on the previous run.
Not sure if I need to change the cluster config to optimize speed or something.
Hi @lookaflyingdonkey - my first thought is that Shred is struggling with the sheer number of files it needs to generate:
You recently added new event types, so there are more files to process
You have a very large cluster - 43 boxes
To validate this hypothesis - can you check in your shredded:good folder in S3 for a run and share with us the total number of files and bytes for that run (s3 ls s3://foo/bar --summarize).
Great, once this storage load is complete I will try the last 24 hours with less task nodes, I usually try to run every 12 hours but we got backlogged due to a failed EMR job and now it is snowballing!
What would the optimal number of nodes/instance types be? Are the EMR jobs CPU, Memory or disk bound? the C4 and I2 have similar CPU capabilities but I2 double the RAM and a NVMe drive, but also at a much higher cost. If they are disk bound then these should certainly speed up and allow to run less task nodes.
Thanks for the heads up @bernardosrulzon , that does look like a strange bug you found there! I will try with just core nodes and beef those up to see if having less files helps.
Will also subscribe to that bug as with the number of events we are tracking we will save alot of $$ with spot instances
Another advice for you. The i2 instances are quite slow. Try replacing 3 x i2.2xlarge with 3 x c3.8xlarge unless you require lots of storage in which case you could use c4 instances with the amount of (EBS) storage you need, provided you run pipeline version r87.
The cost of EC2/EMR should be comparable but the performance will be greatly increased.
Thanks @ihor, that was the information I needed, I wasn’t sure if the shred was bound by CPU or disk, but by the sounds of it they are CPU based when dealing with less, but larger, files.
Still waiting for this archive step of storage loader to finish (been over 5 hours now!), I was reading about skipping that and doing the archive in parallel with the AWS CLI directly, may have to give that a go.
I will let you all know the results once I get to run the next batch.
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://ip-10-10-17-13.eu-west-1.compute.internal:8020/tmp/89e087b6-0fb9-4236-b40b-284729b1bef1/files
So after 6h25m the job failed, 4h20m was in shredding and it was the shredding step that failed
2017-03-28 06:15:20,504 INFO [communication thread] org.apache.hadoop.mapred.Task: Communication exception: java.io.EOFException: End of File Exception between local host is: “ip-10-98-33-101.eu-west-1.compute.internal/10.98.33.101”; destination host is: “ip-10-98-33-101.eu-west-1.compute.internal”:49432; : java.io.EOFException; For more details see: EOFException - HADOOP2 - Apache Software Foundation
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:792)
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:765)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:250)
at com.sun.proxy.$Proxy8.statusUpdate(Unknown Source)
at org.apache.hadoop.mapred.Task$TaskReporter.run(Task.java:768)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1084)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
2017-03-28 06:15:22,224 INFO [EventFetcher for fetching Map Completion Events] org.apache.hadoop.ipc.Client: Retrying connect to server: ip-10-98-33-101.eu-west-1.compute.internal/10.98.33.101:49432. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
This was with 3 x c3.8xlarge, so the enrich step was quite fast but shredding still taking a long long time.
Not at the same time these jobs started having issues, we added some a week or so ago, I think it is just the volumes it is struggling with.
I just ran 12 hours worth (27k shredded files totaling 14gb) and it completed successfully in 2h11m with 6 x c3.8xlarge, the enrich step took 29m and the shred took 1h19m
I always thought the shred would be faster than enrich but it seems the other way now.
I will try to run another batch in 12 hours or so and see if it continues to be stable, if we ever have a failed job where data starts to back up I may setup a second EMR environment so I can run 2 batches side by side.