Dataflow Runner RDBLoader step taking long

Referencing this thread: Converting from emrEtlRunner to DataflowRunner example?

We currently have the pipeline set up with an EMR Cluster running a large master node and two xlarge runners. The shredder for avalanche load testing of 200k users runs after a 5 minute blast in 3 minutes, but the loader seems to take quite some time. What are the current best practices for configuring both the spark shredder job and the rdbloader? What are the points of tuning to focus on? Is getting data into RS under this type of load in <5 min latency achievable?

Just for some context here are our configs for the pipeline:

collector.conf

# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for the Scala Stream Collector.
#
# To use, copy this to 'application.conf' and modify the configuration options.

# 'collector' contains configuration options for the main Scala collector.
collector {
  # The collector runs as a web service specified on the following interface and port.
  interface = "0.0.0.0"
  port = "8080"

  # Configure the P3P policy header.
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  # Cross domain policy configuration.
  # If "enabled" is set to "false", the collector will respond with a 404 to the /crossdomain.xml
  # route.
  crossDomain {
    enabled = false
    # Domain that is granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    domains = [ "*" ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
  }

  # The collector returns a cookie to clients for user identification
  # with the following domain and expiration.
  cookie {
    enabled = true
    expiration = "365 days"
    # Network cookie name
    name = "piv-kenesis-collector"
    # The domain is optional and will make the cookie accessible to other
    # applications on the domain. Comment out this line to tie cookies to
    # the collector's full domain
    # domain = "{{cookieDomain}}"
  }

  doNotTrackCookie {
    enabled = false
    name = "" #{{doNotTrackCookieName}}
    value = "" #{{doNotTrackCookieValue}}
  }

  # When enabled and the cookie specified above is missing, performs a redirect to itself to check
  # if third-party cookies are blocked using the specified name. If they are indeed blocked,
  # fallbackNetworkId is used instead of generating a new random one.
  cookieBounce {
    enabled = false
    # The name of the request parameter which will be used on redirects checking that third-party
    # cookies work.
    name = "n3pc"
    # Network user id to fallback to when third-party cookies are blocked.
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    # Optionally, specify the name of the header containing the originating protocol for use in the
    # bounce redirect location. Use this if behind a load balancer that performs SSL termination.
    # The value of this header must be http or https. Example, if behind an AWS Classic ELB.
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }

  # When enabled, the redirect url passed via the `u` query parameter is scanned for a placeholder
  # token. All instances of that token are replaced withe the network ID. If the placeholder isn't
  # specified, the default value is `${SP_NUID}`.
  redirectMacro {
    enabled = false
    # Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
    placeholder = "[TOKEN]"
  }

  rootResponse {
      enabled = false
      statusCode = 302
      # Optional, defaults to empty map
      headers = {
         Location = "https://127.0.0.1/",
         X-Custom = "something"
      }
      #Optional, defaults to empty string
      body = "302, redirecting"
   }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = stream-${AWS_REGION}-${PRODUCTION_ENV}-collector-to-enricher

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = stream-${AWS_REGION}-${PRODUCTION_ENV}-bad-data-to-s3

    # Whether to use the incoming event's ip as the partition key for the good stream/topic
    # Note: Nsq does not make use of partition key.
    useIpAddressAsPartitionKey = false

    # Enable the chosen sink by uncommenting the appropriate configuration
    sink {
      # Choose between kinesis, googlepubsub, kafka, nsq, or stdout.
      # To use stdout, comment or remove everything in the "collector.streams.sink" section except
      # "enabled" which should be set to "stdout".
      ## enabled = kinesis
      enabled = kinesis

      # Region where the streams are located
      region = ${AWS_REGION}

      # Thread pool size for Kinesis API requests
      threadPoolSize = 10

      # The following are used to authenticate for the Amazon Kinesis sink.
      # If both are set to 'default', the default provider chain is used
      # (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
         accessKey = "default"
         secretKey = "default"
      }

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
         minBackoff = 1000  #{{minBackoffMillis}}
         maxBackoff = 3000  #{{maxBackoffMillis}}
      }

      # Or Google Pubsub
      #googleProjectId = ID
      # Minimum, maximum and total backoff periods, in milliseconds
      # and multiplier between two backoff
      #backoffPolicy {
      #  minBackoff = {{minBackoffMillis}}
      #  maxBackoff = {{maxBackoffMillis}}
      #  totalBackoff = {{totalBackoffMillis}} # must be >= 10000
      #  multiplier = {{backoffMultiplier}}
      #}

      # Or Kafka
      #brokers = "{{kafkaBrokers}}"
      ## Number of retries to perform before giving up on sending a record
      #retries = 0

      # Or NSQ
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
    }

    # Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches record-limit or
    # - the combined size of the stored records reaches byte-limit or
    # - the time in milliseconds since the buffer was last emptied reaches time-limit
    buffer {
      byteLimit: 4000000
      recordLimit: 500
      timeLimit: 5000  # "" #{{bufferTimeThreshold}}
    }
  }
}

# Akka has a variety of possible configuration options defined at
# http://doc.akka.io/docs/akka/current/scala/general/configuration.html
akka {
  loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  # akka-http is the server the Stream collector uses and has configurable options defined at
  # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
  http.server {
    # To obtain the hostname in the collector, the 'remote-address' header
    # should be set. By default, this is disabled, and enabling it
    # adds the 'Remote-Address' header to every request automatically.
    remote-address-header = on

    raw-request-uri-header = on

    # Define the maximum request length (the default is 2048)
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
  }
}

enricher.conf

# Copyright (c) 2013-2018 Snowplow Analytics Ltd. All rights reserved.
#
# This program is licensed to you under the Apache License Version 2.0, and
# you may not use this file except in compliance with the Apache License
# Version 2.0.  You may obtain a copy of the Apache License Version 2.0 at
# http://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the Apache License Version 2.0 is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.  See the Apache License Version 2.0 for the specific language
# governing permissions and limitations there under.

# This file (application.conf.example) contains a template with
# configuration options for Stream Enrich.

enrich {

  streams {

    in {
      # Stream/topic where the raw events to be enriched are located
      raw = stream-${AWS_REGION}-${PRODUCTION_ENV}-collector-to-enricher
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = stream-${AWS_REGION}-${PRODUCTION_ENV}-enricher-to-s3
      # Stream/topic where the event that failed enrichment will be stored
      bad = stream-${AWS_REGION}-${PRODUCTION_ENV}-bad-data-to-s3
      # Stream/topic where the pii tranformation events will end up
      # pii = "" # {{outPii}}

      # How the output stream/topic will be partitioned.
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint.
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # Note: Nsq does not make use of partition key.
      partitionKey = "user_id"
    }

    # Configuration shown is for Kafka, to use another uncomment the appropriate configuration
    # and comment out the other
    # To use stdin, comment or remove everything in the "enrich.streams.sourceSink" section except
    # "enabled" which should be set to "stdin".
    sourceSink {
      # Sources / sinks currently supported are:
      # 'kinesis' for reading Thrift-serialized records and writing enriched and bad events to a
      # Kinesis stream
      # 'googlepubsub' for reading / writing to a Google PubSub topic
      # 'kafka' for reading / writing to a Kafka topic
      # 'nsq' for reading / writing to a Nsq topic
      # 'stdin' for reading from stdin and writing to stdout and stderr
      enabled = kinesis

      # Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
      region = ${AWS_REGION}

      ## Optional endpoint url configuration to override aws kinesis endpoints,
      ## this can be used to specify local endpoints when using localstack
      # customEndpoint = {{kinesisEndpoint}}

      # AWS credentials
      # If both are set to 'default', use the default AWS credentials provider chain.
      # If both are set to 'iam', use AWS IAM Roles to provision credentials.
      # If both are set to 'env', use env variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
      aws {
        accessKey = default
        secretKey = default
      }

      # Maximum number of records to get from Kinesis per call to GetRecords
      maxRecords = 10000

      # LATEST: most recent data.
      # TRIM_HORIZON: oldest available data.
      # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
      # Note: This only effects the first run of this application on a stream.
      # (pertinent to kinesis source type)
      initialPosition = TRIM_HORIZON

      # Need to be specified when initial-position is "AT_TIMESTAMP".
      # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
      # Ex: "2017-05-17T10:00:00Z"
      # Note: Time need to specified in UTC.
      initialTimestamp = "2018-08-27T10:00:00Z"

      # Minimum and maximum backoff periods, in milliseconds
      backoffPolicy {
        minBackoff = 1000
        maxBackoff = 3000
      }

      # Or Google PubSub
      #googleProjectId = my-project-id
      ## Size of the subscriber thread pool
      #threadPoolSize = 4
      ## Minimum, maximum and total backoff periods, in milliseconds
      ## and multiplier between two backoffs
      #backoffPolicy {
      #  minBackoff = {{enrichStreamsOutMinBackoff}}
      #  maxBackoff = {{enrichStreamsOutMaxBackoff}}
      #  totalBackoff = {{enrichStreamsOutTotalBackoff}} # must be >= 10000
      #  multiplier = {{enrichStreamsOutTotalBackoff}}
      #}

      # Or Kafka (Comment out for other types)
      # brokers = "{{kafkaBrokers}}"
      # Number of retries to perform before giving up on sending a record
      # retries = 0

      # Or NSQ
      ## Channel name for nsq source
      ## If more than one application is reading from the same NSQ topic at the same time,
      ## all of them must have the same channel name
      #rawChannel = "{{nsqSourceChannelName}}"
      ## Host name for nsqd
      #host = "{{nsqHost}}"
      ## TCP port for nsqd, 4150 by default
      #port = {{nsqdPort}}
      ## Host name for lookupd
      #lookupHost = "{{lookupHost}}"
      ## HTTP port for nsqlookupd, 4161 by default
      #lookupPort = {{nsqlookupdPort}}
    }

    # After enrichment, events are accumulated in a buffer before being sent to Kinesis/Kafka.
    # Note: Buffering is not supported by NSQ.
    # The buffer is emptied whenever:
    # - the number of stored records reaches recordLimit or
    # - the combined size of the stored records reaches byteLimit or
    # - the time in milliseconds since it was last emptied exceeds timeLimit when
    #   a new event enters the buffer
    buffer {
      byteLimit: 4000000
      recordLimit: 500 # Not supported by Kafka; will be ignored
      timeLimit: 5000
    }

    # Used for a DynamoDB table to maintain stream state.
    # Used as the Kafka consumer group ID.
    # Used as the Google PubSub subscription name.
    appName = kinesis-${AWS_REGION}-${PRODUCTION_ENV}-enricher-db
  }

  # Optional section for tracking endpoints
  #monitoring {
  #  snowplow {
  #    collectorUri = "{{collectorUri}}"
  #    collectorPort = 80
  #    appId = {{enrichAppName}}
  #    method = GET
  #  }
  #}
}

s3loader.conf

# Prod configuration for PIV s3 Loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "default"
  secretKey = "default"
}

# Config for NSQ
nsq {
  channelName = "{{dummy}}"

  host = "{{nsqHost}}"

  # TCP port for nsqd, 4150 by default
  port = 4150

  # Host name for lookupd
  lookupHost = "{{lookupHost}}"

  # HTTP port for nsqlookupd, 4161 by default
  lookupPort = 4161
}

kinesis {

  initialPosition = LATEST


  initialTimestamp = AT_TIMESTAMP

  # Maximum number of records to read per GetRecords call
  # Chosen per Josh recommendation at:
  # https://discourse.snowplow.io/t/350k-rpm-of-throughput-with-stream-collector-kinesis/103
  maxRecords = 500

  region = ${AWS_REGION}

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = kinesis-${AWS_REGION}-${PRODUCTION_ENV}-s3loader-db
}

streams {
  # Input stream name
  inStreamName = stream-${AWS_REGION}-${PRODUCTION_ENV}-enricher-to-s3

  # Stream for events for which the storage process fails
  outStreamName = stream-${AWS_REGION}-${PRODUCTION_ENV}-bad-data-to-s3

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 45000 # Not supported by NSQ; will be ignored
    recordLimit = 200
    timeLimit = 10000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = ${AWS_REGION}
  bucket = piv-data-${AWS_REGION}-${PRODUCTION_ENV}-good/enriched

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for
  maxTimeout = 1
}

# Optional section for tracking endpoints
#monitoring {
#  snowplow{
    # collectorUri = "{{collectorUri}}"
    # collectorPort = 80
    # appId = "{{appName}}"
    # method = "{{method}}"
#  }
#}

logging {
  level: "INFO"
}

rs-load

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data": {
    "region": "{{systemEnv "AWS_REGION"}}",
    "credentials": {
      "accessKeyId": "{{systemEnv "AWS_ACCESS_KEY_ID"}}",
      "secretAccessKey": "{{systemEnv "AWS_SECRET_ACCESS_KEY"}}"
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "S3DistCp Step: Enriched events -> staging S3",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src","s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/enriched/",
          "--dest","s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/staging/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
          "--deleteOnSuccess"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "rdb shred step",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "command-runner.jar",
        "arguments": [
          "spark-submit",
          "--class", "com.snowplowanalytics.snowplow.storage.spark.ShredJob",
          "--master", "yarn",
          "--deploy-mode", "cluster",
          "s3://snowplow-hosted-assets/4-storage/rdb-shredder/snowplow-rdb-shredder-0.14.0.jar",
          "--iglu-config",
          "{{base64File "/root/dataflow-runner_dir/configs/resolver.json"}}",
          "--input-folder",
          "s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/staging/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
          "--output-folder",
          "s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/shredded/good/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
          "--bad-folder",
          "s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/shredded/bad/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "rdb load step",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "s3://snowplow-hosted-assets/4-storage/rdb-loader/snowplow-rdb-loader-0.14.0.jar",
        "arguments": [
          "--config",
          "{{base64File "/root/dataflow-runner_dir/configs/emr.yml"}}",
          "--target",
          "{{base64File "/root/dataflow-runner_dir/configs/targets/redshift.conf"}}",
          "--resolver",
          "{{base64File "/root/dataflow-runner_dir/configs/resolver.json"}}",
          "--folder",
          "s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/shredded/good/run={{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}/",
          "--logkey",
          "s3n://piv-data-{{systemEnv "AWS_REGION"}}-{{systemEnv "PRODUCTION_ENV"}}-good/log/rdb-loader-{{timeWithFormat "1540322909" "2006-01-02-15-04-05"}}",
          "--skip",
          "analyze"
        ]
      }
    ],
    "tags": [
    ]
  }
}

UPDATE: we were able to get this load into redshift in under 5 minutes. We switched to using compute instances for the shred.

1 Like