Snowflake loader EMR throttling

Our snowflake loader runs on AWS EMR every hour and as we started to receive large amounts of data to be loaded from s3 to snowflake, calls to the EMR service of AWS were being throttled due to exceeding rate limits to the EMR API when status checking the snowflake loader operations and steps.

We got into this situation because of multiple EMRs running simultaneously caused by prolonged loading processes that run for more than an hour, which resulted in hitting the limit of EMR API calls (we got other limitation hits like dynamoDB which we were able to solve and configure correctly from forum questions here).

The exception we got which causes the EMR to abort all the of the loading steps is as follows:
Couldn’t retrieve step state: ThrottlingException: Rate exceeded
status code: 400, request id:

We appreciate all help even in taking a different approach to making the EMR run for a shorter period.

1 Like

Hi @Louai_Ghalia! So you likely want to change a few things and check a couple of things:

  1. You should only have 1 EMR cluster running at a time; if you are kicking off the EMR process via Cron you should wrap that process in a check that asserts that there is no currently running job in action which solves excessive clusters being spawned.

  2. Long load times are normally indicative of your SnowflakeDB warehouse being under-provisioned - its worth checking the usage charts for the warehouse and if it is capped at its maximum for the duration of the load look at increasing it until the load times are within an acceptable time-frame. Generally speaking you want the load to complete as quickly as possible to not waste EMR cycles.

Hope this helps!

Thank you Josh!

Relating to the second note, the step that takes long is the Snowflake transformer, so in our case it’s not related to snowflake loading (the operation aborts before getting to the Snowflake load). Any other ideas on how to make EMR processes load faster?

Hey @Louai_Ghalia - so if the Transformer is slow then the two things to check would be:

  1. EMR cluster spec - is it big enough? Are your spark settings configured appropriately? If you can share some metrics about how much data you are trying to process (number of objects and average object size) we could maybe provide some guidance here.

  2. DynamoDB Read/Write units - if the Manifest table is under-provisioned this can slow down the processing as well as it cannot read the values out of the table fast enough. This should be obvious in the metrics of the table as you will see Read or Write capacity warnings.

Regarding dynamo we did manage to monitor and provide the correct provision. As for the EMR cluster sepec, our dataflowrunner cluser json is as follows:

{
   "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
   "data":{
      "name": "Snowflake Pipeline",
      "logUri": "s3://.../dataflowrunnerlogs",
      "region": "us-east-1",
      "credentials":{
         "accessKeyId": "...",
         "secretAccessKey": "..."
      },
      "roles":{
         "jobflow":"...",
         "service":"..."
      },
      "ec2":{
         "amiVersion": "5.9.0",
         "keyName": "...",
         "location":{
            "vpc":{
               "subnetId": "..."
            }
         },
         "instances":{
            "master":{
               "type":"m4.xlarge"
            },
            "core":{
               "type":"m4.2xlarge",
               "count": 1
            },
            "task":{
               "type":"m1.medium",
               "count": 0,
               "bid":"0.015"
            }
         }
      },
      "tags":[ ],
      "bootstrapActionConfigs":[ ],
      "configurations":[
         {
            "classification":"core-site",
            "properties":{
               "Io.file.buffer.size":"65536"
            }
         },
         {
            "classification":"mapred-site",
            "properties":{
               "Mapreduce.user.classpath.first":"true"
            }
         },
         {
            "classification":"yarn-site",
            "properties":{
               "yarn.resourcemanager.am.max-attempts":"1"
            }
         },
         {
            "classification":"spark",
            "properties":{
               "maximizeResourceAllocation":"true"
            }
         }
      ],
      "applications":[ "Hadoop", "Spark" ]
   }
}

Could you also provide any statistics around the amount of data you are pushing through?

It could get around 10GB, ~75,000 files.

Hi Louai,

for batches of ~10GB we usually run the EMR cluster with the following config (to be specific, we’re using config for batches of ~7–15GB

 jobflow:
      job_name: foo
      master_instance_type: m4.large
      core_instance_count: 1
      core_instance_type: r5.8xlarge
      core_instance_ebs:
        volume_size: 60
        volume_type: gp2
        ebs_optimized: true
      task_instance_count: 0
      task_instance_type: m4.large
      task_instance_bid: 0.25
    configuration:
      yarn-site:
        yarn.nodemanager.vmem-check-enabled: "false"
        yarn.nodemanager.resource.memory-mb: "256000"
        yarn.scheduler.maximum-allocation-mb: "256000"
      spark:
        maximizeResourceAllocation: "false"
      spark-defaults:
        spark.dynamicAllocation.enabled: "false"
        spark.executor.instances: "9"
        spark.yarn.executor.memoryOverhead: "3072"
        spark.executor.memory: 22G
        spark.executor.cores: "3"
        spark.yarn.driver.memoryOverhead: "3072"
        spark.driver.memory: 22G
        spark.driver.cores: "3"
        spark.default.parallelism: "108"

Hope this helps

@Louai_Ghalia, looks like @pkutaj provided you with an example for EmrEtlRunner, not the Snowflake Transformer/Loader. If you are transforming ~10GB of compressed enriched data you need more powerful EMR cluster than what you use. However, taken that with the more powerful cluster you will not accumulate as much data any more I think you should do with the configuration as bellow to start with and then dropping it further down. Note that your Spark is not configured properly either which would underutilize the resources of your EMR cluster.

Here the example utilizing generation 4 EC2 types (that’s what you use) but you could do better with generation 5 and bumped AMI version (depending on the version of your Snowflake Transformer).

{
  "instances": {
    "master": {
      "type": "m4.xlarge"
    },
    "core": {
      "type": "r4.8xlarge",
      "count": 3,
      "ebsConfiguration": {
        "ebsOptimized": true,
        "ebsBlockDeviceConfigs": [
          {
            "volumesPerInstance": 1,
            "volumeSpecification": {
              "iops": 1500,
              "sizeInGB": 320,
              "volumeType": "io1"
            }
          }
        ]
      }
    },
    "task": {
      "type": "m4.large",
      "count": 0,
      "bid": "0.015"
    }
  },
  "configurations": [
    {
      "classification": "yarn-site",
      "properties": {
        "yarn.nodemanager.vmem-check-enabled": "false",
        "yarn.nodemanager.resource.memory-mb": "245760",
        "yarn.scheduler.maximum-allocation-mb": "245760"
      }
    },
    {
      "classification": "spark",
      "properties": {
        "maximizeResourceAllocation": "false"
      }
    },
    {
      "classification": "spark-defaults",
      "properties": {
        "spark.dynamicAllocationEnabled": "false",
        "spark.executor.instances": "44",
        "spark.yarn.executor.memoryOverhead": "3072",
        "spark.executor.memory": "13G",
        "spark.executor.cores": "2",
        "spark.yarn.driver.memoryOverhead": "3072",
        "spark.driver.memory": "13G",
        "spark.driver.cores": "2",
        "spark.default.parallelism": "352"
      }
    }
  ]
}

Notice the Spark configuration that comes along. When spinning EMR cluster you might want to follow this guide to ensure your cluster’s resources are used to its fullest.

As a further example for your current 1x r4.2xlarge cluster the Spark configuration would be

{
  "configurations": [
    {
      "classification": "yarn-site",
      "properties": {
        "yarn.nodemanager.vmem-check-enabled": "false",
        "yarn.nodemanager.resource.memory-mb": "57344",
        "yarn.scheduler.maximum-allocation-mb": "57344"
      }
    },
    {
      "classification": "spark",
      "properties": {
        "maximizeResourceAllocation": "false"
      }
    },
    {
      "classification": "spark-defaults",
      "properties": {
        "spark.dynamicAllocationEnabled": "false",
        "spark.executor.instances": "6",
        "spark.yarn.executor.memoryOverhead": "1024",
        "spark.executor.memory": "7G",
        "spark.executor.cores": "1",
        "spark.yarn.driver.memoryOverhead": "1024",
        "spark.driver.memory": "7G",
        "spark.driver.cores": "1",
        "spark.default.parallelism": "24"
      }
    }
  ]
}
3 Likes