Snowflake loader - cross-batch deduplication configuration issues

Hi again,

I’m using dataflow-runner with V0.6 of snowflake transformer/loader

If I specify the “events-manifest” argument in the playbook.json file, all the events end up in the snowflake-bad bucket due to error:

failures":[{"deduplicationError":{"message":"One or more parameter values were invalid: Missing the key RunId in the item (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: 8SBUTMLNRGRVQOMSN3118SDFFNVV4KQNSO5AEMVJF66Q9ASUAAJG)"}}],"processor":{"artifact":"snowplow-snowflake-loader","version":"0.6.0"}}}

If i take out the “events-manifest” argument from playbook.json, everything works fine and events end up in Snowflake atomic events table! It feels so good to finally see some events in Snowflake, but I really want to get this working properly with the deduplication process.

This is what my events_manifest.json looks like (file is actually called dynamodb_config.json):

{
  "schema": "iglu:com.snowplowanalytics.snowplow.storage/amazon_dynamodb_config/jsonschema/2-0-0",
  "data": {
    "name": "eventsManifest",
    "auth": {
      "accessKeyId": "xxxxxxxxxx",
      "secretAccessKey": "xxxxxxxxxx"
    },
    "awsRegion": "ap-southeast-2",
    "dynamodbTable": "cc-snowplow-crossbatch-dedupe",
    "id": "f0e840a9-5f94-4764-a64d-4d34d2361a1f",
    "purpose": "EVENTS_MANIFEST"
  }
}

The DynamoDB table correctly has “RunId” set as the partition key (I created it through the DynamoDB AWS web console)

The only thing I can think of is I am not setting the “id” value correct in the events-manifest.json.
I ran aws dynamodb describe-table --table-name cc-snowplow-crossbatch-dedupe and grabbed the “TableId” value and set “id” value as that. Is that correct?

Here is what my playbook.json looks like:

{
   "schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
   "data":{
      "region":"ap-southeast-2",
      "credentials":{
         "accessKeyId":"xxxxxxxxxx",
         "secretAccessKey":"xxxxxxxxxx"
      },
      "steps":[
         {
            "type":"CUSTOM_JAR",
            "name":"Snowflake Transformer",
            "actionOnFailure":"CANCEL_AND_WAIT",
            "jar":"command-runner.jar",
            "arguments":[
               "spark-submit",
               "--conf",
               "spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat",
               "--deploy-mode",
               "cluster",
               "--class",
               "com.snowplowanalytics.snowflake.transformer.Main",

               "s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.6.0.jar",

               "--config",
               "{{base64File "./config/self-describing-config.json"}}",
               "--resolver",
               "{{base64File "./config/iglu_resolver.json"}}"
               "--events-manifest",
               "{{base64File "./config/dynamodb_config.json"}}"
            ]
         },

         {
            "type":"CUSTOM_JAR",
            "name":"Snowflake Loader",
            "actionOnFailure":"CANCEL_AND_WAIT",
            "jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.6.0.jar",
            "arguments":[
               "load",
               "--base64",
               "--config",
               "{{base64File "./config/self-describing-config.json"}}",
               "--resolver",
               "{{base64File "./config/iglu_resolver.json"}}"
            ]
         }
      ],
      "tags":[ ]
   }

I’m testing this with just 1 enriched/archived file (deleting the record from processing manifest each time).

I have not back populated the run manifest, which should be fine. My understanding is that back populating the run manifest is for when you want to stop files/events being processed?

Cheers,
Ryan

Managed to figure this out after looking through the scala code: https://github.com/snowplow-incubator/snowplow-events-manifest/blob/master/src/main/scala/com/snowplowanalytics/snowplow/eventsmanifest/DynamoDbManifest.scala

The partition key for the event manifest DynamoDB table needs to be “eventId” not “RunId”. I was getting my wires crossed with the run manifest DynamoDB table.

Reading through the code it also seems that you don’t actually need to create the event manifest DynamoDB table and the script will create it if it doesn’t exist? Or maybe I am not understanding the code correctly because if i don’t create the event manifest table, I get the following error:
java.lang.RuntimeException (Cannot initialize duplicate storage
Amazon DynamoDB table for event manifest is unavailable)

Anyway it’s working now so happy days.

I just have 1 question on the DynamoDB table configuration for event manifest. Is it correct that the read capacity should be set to 20 and write capacity set to 100? I am thinking this because of the following lines in the code:
val readCapacityUnits: Long = readCapacity.getOrElse(20L)
val writeCapacityUnits: Long = writeCapacity.getOrElse(100L)

Apologies if these questions seem stupid, I’m not a coder.

1 Like

Thanks for posting! I had the exact same problem and changing partition key solved it.

Maybe this could be made more clear in the documentation, @anton ?