Hi again,
I’m using dataflow-runner with V0.6 of snowflake transformer/loader
If I specify the “events-manifest” argument in the playbook.json file, all the events end up in the snowflake-bad bucket due to error:
failures":[{"deduplicationError":{"message":"One or more parameter values were invalid: Missing the key RunId in the item (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ValidationException; Request ID: 8SBUTMLNRGRVQOMSN3118SDFFNVV4KQNSO5AEMVJF66Q9ASUAAJG)"}}],"processor":{"artifact":"snowplow-snowflake-loader","version":"0.6.0"}}}
If i take out the “events-manifest” argument from playbook.json, everything works fine and events end up in Snowflake atomic events table! It feels so good to finally see some events in Snowflake, but I really want to get this working properly with the deduplication process.
This is what my events_manifest.json looks like (file is actually called dynamodb_config.json):
{
"schema": "iglu:com.snowplowanalytics.snowplow.storage/amazon_dynamodb_config/jsonschema/2-0-0",
"data": {
"name": "eventsManifest",
"auth": {
"accessKeyId": "xxxxxxxxxx",
"secretAccessKey": "xxxxxxxxxx"
},
"awsRegion": "ap-southeast-2",
"dynamodbTable": "cc-snowplow-crossbatch-dedupe",
"id": "f0e840a9-5f94-4764-a64d-4d34d2361a1f",
"purpose": "EVENTS_MANIFEST"
}
}
The DynamoDB table correctly has “RunId” set as the partition key (I created it through the DynamoDB AWS web console)
The only thing I can think of is I am not setting the “id” value correct in the events-manifest.json.
I ran aws dynamodb describe-table --table-name cc-snowplow-crossbatch-dedupe
and grabbed the “TableId” value and set “id” value as that. Is that correct?
Here is what my playbook.json looks like:
{
"schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
"data":{
"region":"ap-southeast-2",
"credentials":{
"accessKeyId":"xxxxxxxxxx",
"secretAccessKey":"xxxxxxxxxx"
},
"steps":[
{
"type":"CUSTOM_JAR",
"name":"Snowflake Transformer",
"actionOnFailure":"CANCEL_AND_WAIT",
"jar":"command-runner.jar",
"arguments":[
"spark-submit",
"--conf",
"spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat",
"--deploy-mode",
"cluster",
"--class",
"com.snowplowanalytics.snowflake.transformer.Main",
"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.6.0.jar",
"--config",
"{{base64File "./config/self-describing-config.json"}}",
"--resolver",
"{{base64File "./config/iglu_resolver.json"}}"
"--events-manifest",
"{{base64File "./config/dynamodb_config.json"}}"
]
},
{
"type":"CUSTOM_JAR",
"name":"Snowflake Loader",
"actionOnFailure":"CANCEL_AND_WAIT",
"jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.6.0.jar",
"arguments":[
"load",
"--base64",
"--config",
"{{base64File "./config/self-describing-config.json"}}",
"--resolver",
"{{base64File "./config/iglu_resolver.json"}}"
]
}
],
"tags":[ ]
}
I’m testing this with just 1 enriched/archived file (deleting the record from processing manifest each time).
I have not back populated the run manifest, which should be fine. My understanding is that back populating the run manifest is for when you want to stop files/events being processed?
Cheers,
Ryan