How does the Snowplow batch pipeline scale?

We’ve just been asked an excellent question by one of our users: sharing it here so everyone has access to the answer:

We know that the Snowplow pipeline has some sophisticated scaling-up routines in place (that also scale down, as we discussed before :slight_smile: ), but could you help us out with describing them a bit more detailed? I.e. what are the scaling thresholds, what is the reaction time, what is happening automatically and what needs manual input, etc.

  1. for the collector systems
  2. for the EMR jobs, and how they are linked to sudden spikes in the collector system.
3 Likes

On the batch pipeline, the only component that autoscales is the collector.

Scaling the collector

Originally we set the collector to scale based on CPU utlization e.g. add an instance when CPU utilization hits 60%. However, experience with a viral video publisher, and load testing with our Avalanche framework suggests that this does not scale the collector cluster fast enough in all cases - we need to:

Scale on load balancer response latency e.g.:

  • Add 1 instance when 0.1 < load balancer latency < 0.15
  • Add 2 instance when 0.15 < load balancer latency < 0.6
  • Add 3 instances when 0.6 < load balancer latency

We measure average load balancer latency over a 5 minute period.

We still scale based on CPU utilization as follows:

  • Add 1 instance when 40% < CPU utilization < 65%
  • Add 2 instances when 65% < CPU utilization < 85%
  • Add 3 instances when 85% < CPU utilization

We scale the collector cluster down if CPU utilization drops below 20%. We use lifecycle hooks to ensure that when an instance is removed from an autoscaling group because of a scale down, it stays alive for another 2 hours during which it can flush any remaining logs on it before the instance is terminated, preventing data loss.

Scaling EMR

EMR does not scale automatically. Currently we’ll get an alarm if an EMR job takes longer than usual because there’s been a traffic spike. At that stage we can manually bump up the cluster size .

A significant advantage of the Real Time pipeline over the Batch pipeline is that the full pipeline (including enrichment) autoscales.

Scaling Redshift

Again - this is not automatic. We recommend adding additional Redshift nodes when your disk utilization hits 75%.

6 Likes

Is it still the case that EMR autoscaling does not work?

We are experiencing very slow shredding processing time since last week with the following configuration:


{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data": {
    "name": "com.oneapp",
    "logUri": "LOGURI",
    "region": "eu-west-1",
    "credentials": {
      "accessKeyId": "AWS_ACCESS_KEY_ID",
      "secretAccessKey": "AWS_SECRET_ACCESS_KEY"
    },
    "roles": {
      "jobflow": "EMR_EC2_DefaultRole",
      "service": "EMR_DefaultRole"
    },
    "ec2": {
      "amiVersion": "6.5.0",
      "instances": {
          "core": {
              "count": 1,
              "type": "r5.xlarge"
          },
          "master": {
              "ebsConfiguration": {
                  "ebsBlockDeviceConfigs": [],
                  "ebsOptimized": true
              },
              "type": "m5.xlarge"
          },
          "task": {
              "bid": "0.015",
              "count": 0,
              "type": "m5.xlarge"
          }
      },
      "keyName": "EMR_ECS_KEY_PAIR",
      "location": {
          "vpc": {
              "subnetId": "AWS_SUBNET_PUBLIC_ID"
          }
      }
    },
    "tags": [
      {
        "key": "client",
        "value": "com.oneapp"
      },
      {
        "key": "job",
        "value": "main"
      },
      {
        "key": "GITC-VulnScanTool",
        "value": "tenable_io"
      }
    ],
    "bootstrapActionConfigs": [],
    "configurations": [
      {
        "classification": "spark",
        "configurations": [],
        "properties": {
            "maximizeResourceAllocation": "false"
        }
      },
      {
        "classification": "spark-defaults",
        "configurations": [],
        "properties": {
            "spark.default.parallelism": "8",
            "spark.driver.maxResultSize": "0",
            "spark.driver.cores": "1",
            "spark.driver.memory": "9G",
            "spark.dynamicAllocation.enabled": "false",
            "spark.executor.cores": "1",
            "spark.executor.instances": "2",
            "spark.executor.memory": "9G",
            "spark.yarn.driver.memoryOverhead": "1024",
            "spark.yarn.executor.memoryOverhead": "1024"
        }
      },
      {
        "classification": "yarn-site",
        "configurations": [],
        "properties": {
            "yarn.nodemanager.resource.memory-mb": "24576",
            "yarn.nodemanager.vmem-check-enabled": "false",
            "yarn.scheduler.maximum-allocation-mb": "24576"
        }
      }
    ],
    "applications": [ "Hadoop", "Spark" ]
  }
}

We are running on shredder and rdbloader 2.1.0 and dealing with less than 1GB per day.
We updated from ami 6.4.0 to 6.5.0 last week due to security reasons.