Overview
We’ve had some hiccups and now some success in getting the new Spark based EMR job running. In the end, we were able to process 160GB of collector logs in 10 hours with a 10 node r4 cluster (through r3 may be better due to having instance stores).
Snowplow thought that our experience could be useful to others on this forum, so here’s what we’ve learned.
With no custom configuration, Spark doesn’t run well on EMR.
Without any configuration, YARN does not fully allocate the entire cluster’s resources to your Spark job. Even if you spun up 20 nodes, you’ll find only 1 or 2 being utilized. Snowplow provided us with a custom binary that they’ll likely be releasing soon, that allowed us to pass through Spark configuration to the EMR job.
In doing so, I found this spreadsheet invaluable while tuning the spark configuration. As an example, for a 6 node r3.4xlarge cluster (5 executors per node):
spark.executor.instances: "30"
spark.yarn.executor.memoryOverhead: "3072"
spark.executor.memory: "21G"
spark.yarn.driver.memoryOverhead: "1034"
spark.driver.memory: "6G"
spark.executor.cores: "3"
spark.driver.cores: "1"
spark.default.parallelism: "180"
spark.dynamicAllocation.enabled: "false"
Consider disabling dynamic allocation or enabling maximizeResourceAllocation.
By default, EMR sets spark.dynamicAllocation.enabled
to true. The impact of this is that you cannot explicitly configure the number of executors for your job, which could lead to many unused nodes.
As a work around (which I have not tried), you could enable maximizeResourceAllocation
, and then each node will be fully utilized by a single executor per node. The draw back here is that this also means an entire node will be allocated to the driver. This solution could work with clusters of smaller machines (no more than an large or xlarge instance).
However, we went down the path of explicitly setting the number of exectors, and thus we needed to set spark.dynamicAllocation.enabled
to false
.
Don’t use task nodes.
Because the task nodes do not have local HDFS storage, they are effectively useless. Any gains from leveraging the spot prices are likely wasted by the time lost due to poor data locality. Only use Core nodes (either instance backed or EBS storage work, but the former may give better performance).
Parallelism is limited by the number of log files your collector produces.
If the amount of parallelism that your cluster can support (executors x core per executor x parallelism per core) is greater than the number of collected log files, you will be under utilizing the cluster. We were generating 1GB log files from our collector. We’ve resized the collector in a way that will instead generate four 250MB files for each of the former files, and that helped us to scale up the cluster in a way that could bring down the running time of the job. So, favor many small files instead of a few large ones.
Hopefully this was helpful!
Rick
OneSpot