Delay in s3 loader

I had setup snowplow in AWS EKS pods and kinesis streams where used. Issue I am facing while doing a loadtest is S3loader tooks more time to load data to s3.

Collector Config

collector {
  interface = "0.0.0.0"
  port = 8080
  
  paths {
    "/com.acme/track"    = "/com.snowplowanalytics.snowplow/tp2"
  }
  
  doNotTrackCookie {
    enabled = false
    #enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
    # name = {{doNotTrackCookieName}}
    name = collector-do-not-track-cookie
    # value = {{doNotTrackCookieValue}}
    value = collector-do-not-track-cookie-value
  }
 

  streams {
    good = ${collector_good}
    bad = ${collector_bad}
    sink {
      enabled = "kinesis"
      threadPoolSize = 10
      region = ${envregion}
      aws {
        accessKey = default
        secretKey = default
      }

      backoffPolicy {
        minBackoff = 3000
        maxBackoff = 600000
      }
    }

    buffer {
      byteLimit = 2097152
      recordLimit = 500
      timeLimit = 5000
    }
   }
   
  telemetry {
	disable = true
	}
}


akka {
  loglevel = WARNING
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  http.server {
    remote-address-header = on
    raw-request-uri-header = on

    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
      illegal-header-warnings = off
    }

    max-connections = 2048
  }

  coordinated-shutdown {
    run-by-jvm-shutdown-hook = off
  }
}

Enricher Config

{
  "input": {
    "type": "Kinesis"

    # Optional. Name of the application which the KCL daemon should assume
    "appName": "snowplow-enrich-kinesis"

    # Name of the Kinesis stream to read from
    "streamName": ${collector_good}

    # Optional. Region where the Kinesis stream is located
    # This field is optional if it can be resolved with AWS region provider chain.
    # It checks places like env variables, system properties, AWS profile file.
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
    "region": ${envregion}

    # Optional, set the initial position to consume the Kinesis stream
    # Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # AT_TIMESTAMP: start from the record at or after the specified timestamp
    "initialPosition": {
      "type": "TRIM_HORIZON"
    }
    # "initialPosition": {
    #   "type": "AT_TIMESTAMP"
    #   "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
    # }

    # Optional, set the mode for retrieving records.
    "retrievalMode": {
      "type": "Polling"

      # Maximum size of a batch returned by a call to getRecords.
      # Records are checkpointed after a batch has been fully processed,
      # thus the smaller maxRecords, the more often records can be checkpointed
      # into DynamoDb, but possibly reducing the throughput.
      "maxRecords": 10000
    }
    # "retrievalMode": {
    #   "type": "FanOut"
    # }

    # Optional. Size of the internal buffer used when reading messages from Kinesis,
    # each buffer holding up to maxRecords from above
    "bufferSize": 3

    # Optional. Settings for backoff policy for checkpointing.
    # Records are checkpointed after all the records of the same chunk have been enriched
    "checkpointBackoff": {
      "minBackoff": 100 milliseconds
      "maxBackoff": 10 seconds
      "maxRetries": 10
    }

    # Optional, endpoint url configuration to override aws kinesis endpoints
    # Can be used to specify local endpoint when using localstack
    # "customEndpoint": "http://localhost:4566"

    # Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
    # Can be used to specify local endpoint when using localstack
    #"dynamodbCustomEndpoint": "http://dynamodb:us-east-1:180648733583:table/snowplow-enrich-kinesis:8000"

    # Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
    # Can be used to specify local endpoint when using localstack
    # "cloudwatchCustomEndpoint": "http://localhost:4582"
  }

  "output": {
    "good": {
      "streamName": ${enrich_good}
	  
	  # Optional. Limits the number of events in a single PutRecords request.
      # Several requests are made in parallel
      # Maximum allowed: 500
      "recordLimit": 500

      # Optional. Limits the number of bytes in a single PutRecords request,
      # including records and partition keys.
      # Several requests are made in parallel
      # Maximum allowed: 5 MB
      "byteLimit": 5242880
    }

    "bad": {
      "streamName": ${enrich_bad}
	
      "recordLimit": 500
    }
  }
}

S3loader config

{
  # Optional, but recommended
  "region": ${envregion},
  # Options are: RAW, ENRICHED_EVENTS, JSON
  # RAW simply sinks data 1:1
  # ENRICHED_EVENTS work with monitoring.statsd to report metrics (identical to RAW otherwise)
  # SELF_DESCRIBING partitions self-describing data (such as JSON) by its schema
  "purpose": "ENRICHED_EVENTS",
  # Input Stream config
  "input": {
    # Kinesis Client Lib app name (corresponds to DynamoDB table name)
    "appName": "snowplow-s3-loader-kinesis",
    # Kinesis stream name
    "streamName": ${enrich_good},
    # Options are: LATEST, TRIM_HORIZON, AT_TIMESTAMP
    "position": "LATEST",
    # Max batch size to pull from Kinesis
    "maxRecords": 10000
  },
  "output": {
    "s3": {
      # Full path to output data
      "path": ${s3path},
      # Partitioning format; Optional
      # Valid substitutions are {vendor}, {schema}, {format}, {model} for self-describing jsons
      # and {yy}, {mm}, {dd}, {hh} for year, month, day, hour
      #partitionFormat: "date={yy}-{mm}-{dd}"
      # Prefix for all file names; Optional
      "filenamePrefix": ${prefix},
      # Maximum Timeout that the application is allowed to fail for, e.g. in case of S3 outage
      "maxTimeout": 2000,
      # Output format; Options: GZIP, LZO
      "compression": "GZIP"
    },
    # Kinesis Stream to output failures
    "bad": {
      "streamName": ${enrich_bad}
    }
  },
  # Flush control. A first limit the KCL worker hits will trigger flushing
  "buffer": {
    # Maximum bytes to read before flushing
    "byteLimit": 4096,
    # Maximum records to read before flushing
    "recordLimit": 500,
    # Maximum time between flushes
    "timeLimit": 5000
  }
}

Hi @Sreenath - could you elaborate a little more on the issue you are seeing? The S3 Loader is designed to batch data together before pushing to S3 as it is generally used as an archive before being transformed and loaded into data-warehouses like Redshift, Databricks and Snowflake.

We would generally be setting a time window of 3 minutes or more in the S3 Loader to ensure you do not end up with many many small files on S3 which makes these transformation operations much more cost and performance effective.

If you want to use the data in real-time I would recommend instead consuming directly from the Kinesis stream and writing a lambda or similar consumer process to pull down the information you are interested in.

Hi @josh - we are doing a loadtest of 15mins, even after the loadest completes loader still runs for another 45 mins.
I have one more doubt which is buffer byteLimit is satisfied against data which is compressed or uncompressed

What volume of data are you processing (number of events for example) in that 15 mins, and what instance size/count/scaling have you provisioned for the S3 loader?

@Colm
Input throughput collector - 720 records/sec (this may have slight variation)
S3loader is deployed in EKS pods, memory and cpu of pods are normal while doing loadtest

@Sreenath my hunch would be that as your byte limit is so small (only 4096) you are throttling on being able to write out files to Amazon S3 fast enough. Again the S3 Loader is designed around writing fewer larger files over trying to write 4kb blocks!

The configuration we use internally for the S3 Loader buffers:

"byteLimit": 67108864,
"recordLimit": 100000,
"timeLimit": 180000

This will rotate 1 file per shard every 3 minutes generally or up to 64mb in size. This means far less writes to S3 which should improve the throughput of your loader. You don’t need to go quite to this level if you want it to be more responsive but would recommend increasing the limits somewhat!

2 Likes