Snowflake Transformer failing due to 'Timeout waiting for connection from pool'

Hello,

I am using dataflow-runner to execute Snowflake transformer and loader.

The ‘Snowflake Transformer’ EMR step is failing. I looked in the container stderr for the relating job and can see it is failing due to the following:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 106 in stage 0.0 failed 4 times, most recent failure: Lost task 106.3 in stage 0.0 (TID 124, ip-172-31-38-253.ap-southeast-2.compute.internal, executor 1): com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

What does this mean?

I checked the DynamoDB table and can see a RunId with the name of the only “run=xxxxarchive folder” so it doesn’t seem like an issue with DynamoDB at least…

Here is my self-describing-config.json:

{
  "schema": "iglu:com.snowplowanalytics.snowplow.storage/snowflake_config/jsonschema/1-0-2",
  "data": {
    "name": "Compareclub Snowflake Storage Target",
    "awsRegion": "ap-southeast-2",
    "auth": {
        "accessKeyId": "xxxxxxxxxx",
        "secretAccessKey": "xxxxxxxxxx",
        "sessionDuration": 900
    },
    "manifest": "cc-snowplow-snowflake-manifest",
    "snowflakeRegion": "us-west-2",
    "database": "snowplow",
    "input": "s3://cc-snowplow-enriched/archive/",
    "stage": "snowplow_stage",
    "badOutputUrl": "s3://cc-snowplow-snowflake/bad/",
    "stageUrl": "s3://cc-snowplow-snowflake/transformed/",
    "warehouse": "snowplow_wh",
    "schema": "atomic",
    "account": "xxxxxxxxx",
    "username": "snowplow_user",
    "password": "xxxxxxxxxx",
    "maxError": 1,
    "jdbcHost": "xxxxxxxx.snowflakecomputing.com",
    "purpose": "ENRICHED_EVENTS"
  }
}

This is my cluster.json file:

{
   "schema":"iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
   "data":{
      "name":"dataflow-runner - snowflake transformer",
      "logUri":"s3://cc-snowplow-snowflake-logs/",
      "region":"ap-southeast-2",
      "credentials":{
         "accessKeyId":"xxxxxxxxxx",
         "secretAccessKey":"xxxxxxxxxx"
      },
      "roles":{
         "jobflow":"EMR_EC2_DefaultRole",
         "service":"EMR_DefaultRole"
      },
      "ec2":{
         "amiVersion":"5.9.0",
         "keyName":"snowplow.etl.runner",
         "location":{
            "vpc":{
               "subnetId":"subnet-51d0ce18"
            }
         },
         "instances":{
            "master":{
               "type":"m2.xlarge"
            },
            "core":{
               "type":"m2.xlarge",
               "count":1
            },
            "task":{
               "type":"m1.medium",
               "count":0,
               "bid":"0.04"
            }
         }
      },
      "tags":[ ],
      "bootstrapActionConfigs":[ ],
      "configurations":[
         {
            "classification":"core-site",
            "properties":{
               "Io.file.buffer.size":"65536"
            }
         },
         {
            "classification":"mapred-site",
            "properties":{
               "Mapreduce.user.classpath.first":"true"
            }
         },
         {
            "classification":"yarn-site",
            "properties":{
               "yarn.resourcemanager.am.max-attempts":"1"
            }
         },
         {
            "classification":"spark",
            "properties":{
               "maximizeResourceAllocation":"true"
            }
         }
      ],
      "applications":[ "Hadoop", "Spark" ]
   }
}

This is my playbook.json:

{
   "schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
   "data":{
      "region":"ap-southeast-2",
      "credentials":{
         "accessKeyId":"xxxxxxxxxx",
         "secretAccessKey":"xxxxxxxxxx"
      },
      "steps":[
         {
            "type":"CUSTOM_JAR",
            "name":"Snowflake Transformer",
            "actionOnFailure":"CANCEL_AND_WAIT",
            "jar":"command-runner.jar",
            "arguments":[
               "spark-submit",
               "--conf",
               "spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat",
               "--deploy-mode",
               "cluster",
               "--class",
               "com.snowplowanalytics.snowflake.transformer.Main",

               "s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.6.0.jar",

               "--config",
               "{{base64File "./config/self-describing-config.json"}}",
               "--resolver",
               "{{base64File "./config/iglu_resolver.json"}}",
               "--events-manifest",
               "{{base64File "./config/dynamodb_config.json"}}"
            ]
         },

         {
            "type":"CUSTOM_JAR",
            "name":"Snowflake Loader",
            "actionOnFailure":"CANCEL_AND_WAIT",
            "jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.6.0.jar",
            "arguments":[
               "load",
               "--base64",
               "--config",
               "{{base64File "./config/self-describing-config.json"}}",
               "--resolver",
               "{{base64File "./config/iglu_resolver.json"}}"
            ]
         }
      ],
      "tags":[ ]
   }
}

I am executing dataflow-runner using the following command:

./dataflow-runner run-transient --emr-config ./config/cluster.json --emr-playbook ./config/playbook.json

Where am i going wrong? I feel like I am so close now in getting the data into Snowflake!

Appreciate your help,
Ryan

some further info:

I’ve tried running this multiple times, and each time the ‘Snowflake Transformer’ step fails after ~21 minutes.

In EMR console, if I go to the “application history” tab for the “snowflake transformer” step, the “aggregated metrics by executor” shows 97 total tasks, 85 success, 12 failed.

If looking at the stderr of the “aggregated metrics by executor” I notice errors appearing such as the following:

20/05/18 08:40:44 INFO HadoopRDD: Input split: s3a://cc-snowplow-enriched/archive/run=2020-05-14-05-42-14/part-786a4e86-7081-4aea-b14f-a32324915ce5-c000.csv113:134217728+4192000
20/05/18 08:40:49 WARN BlockManager: Putting block rdd_2_99 failed due to an exception
20/05/18 08:40:49 WARN BlockManager: Block rdd_2_99 could not be removed as it was not found on disk or in memory
20/05/18 08:40:49 ERROR Executor: Exception in task 99.3 in stage 0.0 (TID 111)

It kind of seems like a communication issue between S3 and Hadoop?

Just ran it but only for 1 enriched file and it worked (files now making it to snowflake-bad folder… which is a separate issue)

This is the first load so there is about 30GB of enriched data to transform.

It must be a bandwidth issue with the EMR instances or S3 itself?

just another update on this… I ran it again last night for the big batch of enriched data and it worked!

No idea why it wasn’t working in the past. The only difference to my json files is I removed the --events-manifest attribute from playbook.json, so perhaps something to do with that (I created the event-manifest dynamodb table manually as the auto create was failing)

The transform runs much slower with the deduplication process enabled anyway, so I’m going to just dedupe the data in Snowflake with a scheduled task / stored proc.