RDB Shredder step fails in Dataflow Runner

Hi everyone! I’m new to Snowplow and am working on RDB Shredder piece of the AWS real time pipeline.

When I run the dataflow runner job via ./dataflow-runner run-transient --emr-config dataflow-runner-cluster.json --emr-playbook play book.json the cluster starts up successfully and the S3DistCP step of the playbook succeeds. However, the second step fails, causing the cluster to terminate.

I’m not really sure what the problem is, and I’m at a bit of a loss because I believe I’ve used the standard template versions of the playbook and cluster files and only customized the fields called out in the documentation. For reference, below are all my related config files. Please let me know if anything looks off or if there are any known configuration issues that may cause the shredder step to not to succeed.

Playbook.json:

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data": {
    "region": "us-east-1",
    "credentials": {
      "accessKeyId": "{{my access key}}",
      "secretAccessKey": "{{my secret key}}"
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "S3DistCp enriched data archiving",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
            "--src", "s3://snowplow/enriched/",
            "--dest", "s3://snowplow/archive/enriched/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
            "--s3Endpoint", "s3.us-east-1.amazonaws.com",
            "--srcPattern", ".*",
            "--outputCodec", "gz",
            "--deleteOnSuccess"
        ]
      },

      {
        "type": "CUSTOM_JAR",
        "name": "RDB Shredder",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "command-runner.jar",
        "arguments": [
            "spark-submit",
            "--class", "com.snowplowanalytics.snowplow.shredder.batch.Main",
            "--master", "yarn",
            "--deploy-mode", "cluster","s3://snowplow-hosted-assets-us-east-1/4-storage/rdb-shredder/snowplow-rdb-shredder-1.0.0.jar", 
			"--iglu-config", "{{base64File "./iglu_resolver.json"}}",
            "--config", "{{base64File "./rdb-loader-config.hocon"}}"
        ]
      }
    ],
    "tags": [ ]
  }
}

Cluster.json:

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data": {
    "name": "RDB Shredder",
    "logUri": "s3://snowplow/logs/",
    "region":"us-east-1",
    "credentials": {
      "accessKeyId": "{{my access key}}",
      "secretAccessKey": "{{my secret key}}"
    },
    "roles": {
      "jobflow": "EMR_EC2_DefaultRole",
      "service": "EMR_DefaultRole"
    },
    "ec2": {
      "amiVersion": "6.2.0",
      "keyName": "{{my key name}}",
      "location": {
        "vpc": {
          "subnetId": "{{my subnet ID}}"
        }
      },
      "instances": {
        "master": {
          "type": "m4.large",
          "ebsConfiguration": {
            "ebsOptimized": true,
            "ebsBlockDeviceConfigs": [

            ]
          }
        },
        "core": {
          "type": "r4.xlarge",
          "count": 1
        },
        "task": {
          "type": "m4.large",
          "count": 0,
          "bid": "0.015"
        }
      }
    },
    "tags": [ ],
    "bootstrapActionConfigs": [ ],
    "configurations": [
      {
         "classification":"core-site",
         "properties":{
            "Io.file.buffer.size":"65536"
         },
         "configurations":[
   
         ]
      },
      {
         "classification":"yarn-site",
         "properties":{
            "yarn.nodemanager.resource.memory-mb":"57344",
            "yarn.scheduler.maximum-allocation-mb":"57344",
            "yarn.nodemanager.vmem-check-enabled":"false"
         },
         "configurations":[
   
         ]
      },
      {
         "classification":"spark",
         "properties":{
            "maximizeResourceAllocation":"false"
         },
         "configurations":[
   
         ]
      },
      {
         "classification":"spark-defaults",
         "properties":{
            "spark.executor.memory":"7G",
            "spark.driver.memory":"7G",
            "spark.driver.cores":"3",
            "spark.yarn.driver.memoryOverhead":"1024",
            "spark.default.parallelism":"24",
            "spark.executor.cores":"1",
            "spark.executor.instances":"6",
            "spark.yarn.executor.memoryOverhead":"1024",
            "spark.dynamicAllocation.enabled":"false"
         },
         "configurations":[
   
         ]
      }
   ],
    "applications": [ "Hadoop", "Spark" ]
  }
}

Standard iglu_resolver.json file:

{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
  "data": {
    "cacheSize": 500,
    "repositories": [
      {
        "name": "Iglu Central",
        "priority": 0,
        "vendorPrefixes": [ "com.snowplowanalytics" ],
        "connection": {
          "http": {
            "uri": "http://iglucentral.com"
          }
        }
      }
    ]
  }
}

loader-config.hocon:

{
  # Human-readable identificator, can be random
  "name": "SP-Redshift",
  # Machine-readable unique identificator, must be UUID
  "id": "{{my uuid}}",

  # Data Lake (S3) region
  "region": "us-east-1",
  # SQS topic name used by Shredder and Loader to communicate
  "messageQueue": "{{my queue name}}",

   # Shredder-specific configs
  "shredder": {
    "type": "batch",
    # Path to enriched archive (must be populated separately with run=YYYY-MM-DD-hh-mm-ss directories)
    "input": "s3://snowplow/archive/enriched/",
    # Path to shredded output
    "output": {
      "path": "s3://snowplow/archive/shredded/",
      # Shredder output compression, GZIP or NONE
      "compression": "GZIP"
    }
  },

  # Schema-specific format settings (recommended to leave all three groups empty and use TSV as default)
  "formats": {
    # Format used by default (TSV or JSON)
    "default": "TSV",
    # Schemas to be shredded as JSONs, corresponding JSONPath files must be present. Automigrations will be disabled
    "json": [ ],
    # Schemas to be shredded as TSVs, presence of the schema on Iglu Server is necessary. Automigartions enabled
    "tsv": [ ],
    # Schemas that won't be loaded
    "skip": [ ]
  },

  # Warehouse connection details
  "storage" = {
    # Database, redshift is the only acceptable option
    "type": "redshift",
    # Redshift hostname
    "host": "{{my redshift endpoint}}",
    # Database name
    "database": "{{database name}}",
    # Database port
    "port": 5439,
    # AWS Role ARN allowing Redshift to load data from S3
    "roleArn": "arn:aws:iam::490975147635:role/aws-service-role/redshift.amazonaws.com/AWSServiceRoleForRedshift",
    # DB schema name
    "schema": "atomic",
    # DB user with permissions to load data
    "username": "{{storage loader username}}",
    # DB password
    "password": "{{storage loader password}}",
    # Custom JDBC configuration
    "jdbc": {"ssl": true},
    # MAXERROR, amount of acceptable loading errors
    "maxError": 10
  },

  # Additional steps. analyze, vacuum and transit_load are valid values
  "steps": ["analyze"],

  # Observability and logging opitons
  "monitoring": {
    # Snowplow tracking (optional)
    "snowplow": null,
    # Sentry (optional)
    "sentry": null
  }
}

Hi @samurijv2 ,

You can usually see the logs of EMR in the Steps tab of your EMR cluster :

Sometimes the error is not there and to find it you need to go to Spark UI and look at the logs of the driver/workers there :

2021-05-19-085205_426x250_scrot

In your case I think that the error is this line in your playbook :

"--class", "com.snowplowanalytics.snowplow.shredder.batch.Main",

It needs to be :

"--class", "com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main",

There was an error on our docs website, we fixed it, sorry about that.

Thanks for the help!

The correction to that line did resolve the initial error, but unfortunately now I’m seeing a different one. Upon review of the stderr log file for the RDB Shredder step, this line jumps out at me:

21/05/19 13:24:17 ERROR Client: Application diagnostics message: User class threw exception: java.io.IOException: Not a file: s3://{{my-bucket}}/archive/enriched/run=2021-05-18-17-26-10/2021

I’m not really sure how to interpret this. When I navigate to that bucket I do see the run=2021-05-18-17-26-10 folder. However, the “2021” object is not a file, but rather it is itself a folder with its own subfolders for month, day, and hour. Is that the expected structure?

Thanks again for the assistance!!

Not a file: s3://{{my-bucket}}/archive/enriched/run=2021-05-18-17-26-10/2021

Indeed the issue is that only files are expected inside run=2021-05-18-17-26-10/ folder.

As you can see at the end of this section, partitioning must not be activated for S3 loader in order for shredder to work.

1 Like

@BenB thank you! That resolved the problem and allowed the shredder step to succeed. I appreciate the help.

1 Like