RDB loader wrong file path

In the process of setting up RDB loader(post R35) and got stuck on the shredding part.
In my error log i can se that they file location is wrong

ERROR Client: Application diagnostics message: User class threw exception: java.io.IOException: Not a file: s3://XXX/enriched/archive/run=2021-03-15-18-29-34/2021
as reads the year as well. However not sure where i change this i have

  1. Made sure the date format is correct and after a enriched run i got file in the following structure…
    “s3:/xxx/enriched/archive/run=2021-03-15-18-29-34/2021/03/15/”
  2. I don’t use any custom dateFormat in s3 sink

config.hocon looks like

{
  "name": "{{client}}",
  "id": "24cda775-ea2d-4cfd-b4f8-b580670cb465",

  "region": "{{aws_region}}",
  "messageQueue": "{{fifo_que}}",

  "shredder": {
    "input": "s3://{{s3_shredded}}/enriched/archive/",
    "output": "s3://{{s3_shredded}}/good/",
    "outputBad": "s3://{{s3_shredded}}/bad/",
    "compression": "GZIP"
  },

  "formats": {
    "default": "TSV",
    "json": {{shredded_as_jsons}},
    "tsv": {{shredded_as_tsvs}},
    "skip": {{skip_schemas}}
  },

  "storage": {
    "type": "redshift",
    "host": "{{redshift_hostname}}",
    "database": "{{snowplow_database_name}}",
    "port": {{db_port}},
    "roleArn": "{{roleArn}}",
    "schema": "{{schema_name}}",
    "username": "{{username}}",
    "password": "{{password}}",
    "jdbc": {"ssl": true},
    "maxError": 10,
    "compRows": 100000
  },

  "steps": {{steps}},

  monitoring = {
    "snowplow": {
    "collector": "{{collectorUri}}"
    "appId": "{{appName}}"
    method:"get"
    },
    "sentry": null
  }
}

and playbook.json is

    {
      "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
      "data": {
        "region": "{{aws_region}}",
        "credentials": {
          "accessKeyId": "default",
          "secretAccessKey": "default"
        },
        "steps": [
          {
            "type": "CUSTOM_JAR",
            "name": "S3DistCp enriched data archiving",
            "actionOnFailure": "CANCEL_AND_WAIT",
            "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
            "arguments": [
                "--src", "s3://{{s3_enriched_bucket}}/enriched/good/",
                "--dest", "s3://{{s3_shredded}}/enriched/archive/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
                "--s3Endpoint", "s3-{{aws_region}}.amazonaws.com",
                "--srcPattern", ".*",
                "--outputCodec", "gz",
                "--deleteOnSuccess"
            ]
          },

          {
            "type": "CUSTOM_JAR",
            "name": "RDB Shredder",
            "actionOnFailure": "CANCEL_AND_WAIT",
            "jar": "command-runner.jar",
            "arguments": [
                "spark-submit",
                "--class", "com.snowplowanalytics.snowplow.shredder.Main",
                "--master", "yarn",
                "--deploy-mode", "cluster",
                "s3://snowplow-hosted-assets-{{aws_region}}/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0.jar",
                "--iglu-config", "{{base64File "/config/iglu_resolver.json"}}",
                "--config", "{{base64File "/config/config.hocon"}}"
            ]
          }
        ],
        "tags": [ ]
      }
    }

Input welcome.
Best
f

Hi @fwahlqvist,

It appears the location isn’t correct. I would rather expect .../archive/run=2021-03-15-18-29-34 instead of .../archive/run=2021-03-15-18-29-34/2021/03/15/. Could you try to relocate the files and see if it helps?

Thanks,

Hey @egor, thanks for getting back to me…

These are the current config for enrich and archive (according to docs) could you kindly point me to the direction on what i should update?

"arguments": [
                "--src", "s3://{{s3_enriched_bucket}}/enriched/good/",
                "--dest", "s3://{{s3_shredded}}/enriched/archive/run={{nowWithFormat "2006-01-02-15-04-05"}}/",
                "--s3Endpoint", "s3-{{aws_region}}.amazonaws.com",
                "--srcPattern", ".*",
                "--outputCodec", "gz",
                "--deleteOnSuccess"
            ]

Thanks
F

@fwahlqvist,

These arguments look correctly. I think what happens here is that the shredder tries to find files with enriched data in run=YYYY-MM-DD-hh-mm-ss bucket but instead it finds a sub-bucket 2021 which in its turn has a few more sub-buckets.

It’s a bit unclear from where these sub-buckets come. I suppose that you might have an incorrect structure of files in the src bucket - in this one you should have only files and 0 sub-buckets. If you move all files from s3:/xxx/enriched/archive/run=2021-03-15-18-29-34/2021/03/15/ to s3://{{s3_enriched_bucket}}/enriched/good/ and ensure that there are no sub-bucket - it should likely work as expected.

Could you also clarify how you produce the enriched data and upload it on S3?

1 Like

Hey @egor
thanks for getting back to me

The folder structure of the data in the enriched is from S3 sink and as you say has the structure of yyyy/mm/dd/hh as per documentation

However to get s3-sink to work with emr i needed a custom format so have now enabled the blow

  # optional date format prefix for directory pattern
  # eg: {YYYY}/{MM}/{dd}/{HH}
  # dateFormat = "{YYYY}/{MM}/{dd}/{HH}"

Have now updated it to use a custom date format.
dateFormat = “{YYYY}-{MM}-{dd}-{HH}”

(Hopefully useful for someone)

Best
F

Thanks for the additional details and the hint, @fwahlqvist.

As mentioned in the example configuration for the S3 Loader, dateFormat is an optional argument. Based on your goals, you need to make a call whether it should be used or not, and which format to use.

If you are going to consume data directly from outputDirectory (e.g. via Athena) then it’s better to define this argument to take benefits or partitioned data. The same applies if you want to have a separate folder for every data dump and to have more control of run folders size.

If you are going to load the data into Redshift or Snowflake then this argument isn’t required (at least for the standard playbooks).

We will look if we can make it clearer. For this, I have passed your feedback over to the relevant team members.

1 Like

Thanks @egor,
to be clear I had to use a custom dateFormat for EMR as otherwise the default behaviour was to create folder structure with yyyy/mm/dd

However i can see the files being moved to a folder structure of

run=2021-03-17-11-43-42/

But they are not being shredded…
From the EMR step the first one succeed but second on get stuck in state running with the following error log and cluster.json config (ps instance type is m4.large)

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data": {
    "name": "dataflow-runner - RDB Shredder",
    "logUri": "s3://{{emr_logs}}",
    "region": "{{aws_region}}",
    "credentials": {
      "accessKeyId": "default",
      "secretAccessKey": "default"
    },
    "roles": {
      "jobflow": "EMR_EC2_DefaultRole",
      "service": "EMR_DefaultRole"
    },
    "ec2": {
      "amiVersion": "6.1.0",
      "keyName": "XXXX",
      "location": {
        "vpc": {
          "subnetId": "{{subnets}}"
        }
      },
      "instances": {
        "master": {
          "type": "{{instance_type_master}}"
        },
        "core": {
          "type": "{{instance_type_core}}",
          "count": 1
        },
        "task": {
          "type": "{{instance_type_task}}",
          "count": 0,
          "bid": "0.0015"
        }
      }
    },
    "tags": [
      {
        "key": "client",
        "value": "com.engineering"
      },
      {
        "key": "job",
        "value": "main"
      }
    ],
    "bootstrapActionConfigs": [
      {
        "name": "Elasticity Bootstrap Action",
        "scriptBootstrapAction": {
          "path": "s3://snowplow-hosted-assets-eu-west-2/common/emr/snowplow-ami4-bootstrap-0.2.0.sh",
          "args": ["1.5"]
        }
      }
    ],
    "configurations": [
      {
        "classification": "core-site",
        "properties": {
          "Io.file.buffer.size": "65536"
        }
      },
      {
        "classification": "mapred-site",
        "properties": {
          "Mapreduce.user.classpath.first": "true"
        }
      }
    ],
    "applications": ["Hadoop", "Spark"]
  }
}



21/03/17 11:51:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/17 11:51:58 WARN DependencyUtils: Skip remote jar s3://snowplow-hosted-assets-eu-west-2/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0.jar.
21/03/17 11:51:58 INFO RMProxy: Connecting to ResourceManager at ip-xxxx.eu-west-2.compute.internal/xxxx:8032
21/03/17 11:51:59 INFO Client: Requesting a new application from cluster with 1 NodeManagers
21/03/17 11:51:59 INFO Configuration: resource-types.xml not found
21/03/17 11:51:59 INFO ResourceUtils: Unable to find 'resource-types.xml'.
21/03/17 11:51:59 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (6144 MB per container)
21/03/17 11:51:59 INFO Client: Will allocate AM container, with 2432 MB memory including 384 MB overhead
21/03/17 11:51:59 INFO Client: Setting up container launch context for our AM
21/03/17 11:51:59 INFO Client: Setting up the launch environment for our AM container
21/03/17 11:51:59 INFO Client: Preparing resources for our AM container
21/03/17 11:51:59 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
21/03/17 11:52:02 INFO Client: Uploading resource file:/mnt/tmp/spark-6a2731aa-3229-4ad0-bc0d-b6b868197bf0/__spark_libs__513458978147608271.zip -> hdfs://ip-10-0-101-238.eu-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1615981741937_0002/__spark_libs__513458978147608271.zip
21/03/17 11:52:06 INFO ClientConfigurationFactory: Set initial getObject socket timeout to 2000 ms.
21/03/17 11:52:06 INFO Client: Uploading resource s3://snowplow-hosted-assets-eu-west-2/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0.jar -> hdfs://ip-10-0-101-238.eu-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1615981741937_0002/snowplow-rdb-shredder-0.19.0.jar
21/03/17 11:52:07 INFO S3NativeFileSystem: Opening 's3://snowplow-hosted-assets-eu-west-2/4-storage/rdb-shredder/snowplow-rdb-shredder-0.19.0.jar' for reading
21/03/17 11:52:10 INFO Client: Uploading resource file:/mnt/tmp/spark-6a2731aa-3229-4ad0-bc0d-b6b868197bf0/__spark_conf__6123137051064315275.zip -> hdfs://ip-10-0-101-238.eu-west-2.compute.internal:8020/user/hadoop/.sparkStaging/application_1615981741937_0002/__spark_conf__.zip
21/03/17 11:52:10 INFO SecurityManager: Changing view acls to: hadoop
21/03/17 11:52:10 INFO SecurityManager: Changing modify acls to: hadoop
21/03/17 11:52:10 INFO SecurityManager: Changing view acls groups to: 
21/03/17 11:52:10 INFO SecurityManager: Changing modify acls groups to: 
21/03/17 11:52:10 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
21/03/17 11:52:10 INFO Client: Submitting application application_1615981741937_0002 to ResourceManager
21/03/17 11:52:10 INFO YarnClientImpl: Submitted application application_1615981741937_0002
21/03/17 11:52:11 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:11 INFO Client: 
	 client token: N/A
	 diagnostics: AM container is launched, waiting for AM container to Register with RM
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: default
	 start time: 1615981930499
	 final status: UNDEFINED
	 tracking URL: http://ip-xxxx.eu-west-2.compute.internal:20888/proxy/application_1615981741937_0002/
	 user: hadoop
21/03/17 11:52:12 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:13 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:14 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:15 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:16 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:17 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:18 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:19 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:20 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:21 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:22 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:23 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:24 INFO Client: Application report for application_1615981741937_0002 (state: ACCEPTED)
21/03/17 11:52:25 INFO Client: Application report for application_1615981741937_0002 (state: RUNNING)
21/03/17 11:52:25 INFO Client: 
	 client token: N/A
	 diagnostics: N/A
	 ApplicationMaster host: ip-xxxx.eu-west-2.compute.internal
	 ApplicationMaster RPC port: 34039
	 queue: default
	 start time: 1615981930499
	 final status: UNDEFINED
	 tracking URL: http://ip-xxxx.eu-west-2.compute.internal:20888/proxy/application_1615981741937_0002/
	 user: hadoop
21/03/17 11:52:26 INFO Client: Application report for application_1615981741937_0002 (state: RUNNING)
21/03/17 11:52:27 INFO Client: Application report for application_1615981741937_0002 (state: RUNNING)
21/03/17 11:52:28 INFO Client: Application report for application_1615981741937_0002 (state: RUNNING)

Hi @fwahlqvist,

It’s likely the instance type is too small for the amount of data. If it’s the case you will need to use a bigger cluster (with more core nodes or with more powerful compute notes). There are some instruction here which can help you to find an optimal cluster’s size

How many runs do you have in .../enriched/archive/? What is the total size and size of each run bucket? It still might be useful to see a tree of these (to ensure it’s the expected structure now).

Best,

Hey @egor,
This is a test set up with 9 runs and about 10 rows of data in each run and it has m4.large so one would hope its not that … :slight_smile:

Hi @fwahlqvist,

Sorry for taking some time to get back to you.

We updated our docs website to make it clear that enriched events on S3 should not be partitioned by date.

We also added a diagram that explains the new architecture and we added details about the algorithm that shredder uses to infer which folders it needs to shred. Please have a look and let us know if there are things unclear.

We have released 1.0.0 which is latest production release.

get stuck in state running

Your configuration looks correct. What is Spark UI showing ? Is there work being done ? Can you show the content of stdout for the driver please ?

This is a test set up with 9 runs

You’re testing on a new env with only 10 folders in archive/enriched/ and nothing in shredded/ ?

2 Likes