RDB shredder failed?

I’m running RDB shredder and it’s failing at step 2.

followed this doc to bring up the service. My config looks like below.

cluster.json

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
  "data": {
    "name": "RDB Shredder",
    "logUri": "s3://snowplow-s3-emr-logs/",
    "region": "ap-southeast-1",
    "credentials": {
      "accessKeyId": "123",
      "secretAccessKey": "123"
    },
    "roles": {
      "jobflow": "EMR_EC2_DefaultRole",
      "service": "EMR_DefaultRole"
    },
    "ec2": {
      "amiVersion": "6.2.0",
      "keyName": "xyz-ap",
      "location": {
        "vpc": {
          "subnetId": "subnet-123"
        }
      },
      "instances": {
        "master": {
          "type": "m4.large",
          "ebsConfiguration": {
            "ebsOptimized": true,
            "ebsBlockDeviceConfigs": [

            ]
          }
        },
        "core": {
          "type": "r4.xlarge",
          "count": 1
        },
        "task": {
          "type": "m4.large",
          "count": 1,
          "bid": "0.015"
        }
      }
    },
    "tags": [],
    "bootstrapActionConfigs": [],
    "configurations": [{
        "classification": "core-site",
        "properties": {
          "Io.file.buffer.size": "65536"
        },
        "configurations": [

        ]
      },
      {
        "classification": "yarn-site",
        "properties": {
          "yarn.nodemanager.resource.memory-mb": "57344",
          "yarn.scheduler.maximum-allocation-mb": "57344",
          "yarn.nodemanager.vmem-check-enabled": "false"
        },
        "configurations": [

        ]
      },
      {
        "classification": "spark",
        "properties": {
          "maximizeResourceAllocation": "false"
        },
        "configurations": [

        ]
      },
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.executor.memory": "7G",
          "spark.driver.memory": "7G",
          "spark.driver.cores": "3",
          "spark.default.parallelism": "24",
          "spark.executor.cores": "1",
          "spark.executor.instances": "6",
          "spark.dynamicAllocation.enabled": "false"
        },
        "configurations": [

        ]
      }
    ],
    "applications": ["Hadoop", "Spark"]
  }
}

paybook.json

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data": {
    "region": "ap-southeast-1",
    "credentials": {
      "accessKeyId": "123",
      "secretAccessKey": "123"
    },
    "steps": [{
        "type": "CUSTOM_JAR",
        "name": "S3DistCp enriched data archiving",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src", "s3://snowplow-s3-sink/enriched/",
          "--dest", "s3://snowplow-s3-shred-test/shredded/archive/run=2006-01-02-15-04-05",
          "--s3Endpoint", "s3-ap-southeast-1.amazonaws.com",
          "--srcPattern", ".*",
          "--outputCodec", "gz",
          "--deleteOnSuccess"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "RDB Shredder",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "command-runner.jar",
        "arguments": [
          "spark-submit",
          "--class",
          "com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main",
          "--master", "yarn",
          "--deploy-mode", "cluster",
          "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-1.1.0.jar",
          "--iglu-config", "Z2x1OmNvbS5zbm93cGxvd2FuYWVyLWNvbmZpZewogICAgInNjaGVtYSI6ICJpy9qc29uc2NoZW1hLzEtMC0xIiwKICAgICJkYXRhIjogewogICAgICAiY2FjaGVTaXplIjogNTAwLAogICAgICAicmVwb3NpdG9yaWVzIjogWwogICAgICAgIHsKICAgICAgICAgICJuYW1lIjogIklnbHUgQ2VudHJhbCIsCiAgICAgICAgICAicHJpb3JpdHkiOiAwLAogICAgICAgICAgInZlbmRvclByZWZpeGVzIjogWyAiY29tLnNub3dwbG93YW5hbHl0aWNzIiBdLAogICAgICAgICAgImNvbm5lY3Rpb24iOiB7CiAgICAgICAgICAgICJodHRwIjogewogICAgICAgICAgICAgICJ1cmkiOiAiaHR0cDovL2lnbHVjZW50cmFsLmNvbSIKICAgICAgICAgICAgfQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgXQogICAgfQp9",
          "--config", "ewogICMgUGF0aCB0byBlGlyZWN0b3JpZXMpIGZvciBTMyBpbnB1dAogICJpbnB1dCI6ICJzMzoewogICAgInNjaGVtYSI6ICJpvL251Y2xldewogICAgInNjaGVtYSI6ICJpXMtZGF0YS10ZWFtLXNub3dwbG93LWthZmthLewogICAgInNjaGVtYSI6ICJpXMzLXNpbmstdGVzdC9lbnJpY2hlZC8iLAoKICAjIFBhdGggdG8gc2hyZWRkZWQgYXJjaGl2ZQogICJvdXRwdXQiOiB7CiAgICAjIFBhdGggdG8gc2hyZWRkZWQgb3V0cHV0CiAgICAicGF0aCI6ICJzMzovL251Y2xldXMtZGF0YS10ZWFtLXNub3dwbG93LWthZmthLXMzLXNocmVkLXRlc3Qvc2hyZWRkZWQvIiwKICAgICMgU2hyZWRkZXIgb3V0cHV0IGNvbXByZXNzaW9uLCBHWklQIG9yIE5PTkUKICAgICMgT3B0aW9uYWwsIGRlZmF1bHQgdmFsdWUgR1pJUAogICAgImNvbXByZXNzaW9uIjogIkdaSVAiLAogICAgIyBUaGlzIGZpZWxkIGlzIG9wdGlvbmFsIGlmIGl0IGNhbiBiZSByZXNvbHZlZCB3aXRoIEFXUyByZWdpb24gcHJvdmlkZXIgY2hhaW4uCiAgICAjIEl0IGNoZWNrcyBwbGFjZXMgbGlrZSBlbnYgdmFyaWFibGVzLCBzeXN0ZW0gcHJvcGVydGllcywgQVdTIHByb2ZpbGUgZmlsZS4KICAgICMgaHR0cHM6Ly9zZGsuYW1hem9uYXdzLmNvbS9qYXZhL2FwaS9sYXRlc3Qvc29mdHdhcmUvYW1hem9uL2F3c3Nkay9yZWdpb25zL3Byb3ZpZGVycy9EZWZhdWx0QXdzUmVnaW9uUHJvdmlkZXJDaGFpbi5odG1sCiAgICAicmVnaW9uIjogImFwLXNvdXRoZWFzdC0xIgogIH0KCiAgIyBRdWV1ZSB1c2VkIHRvIGNvbW11bmljYXRlIHdpdGggTG9hZGVyCiAgInF1ZXVlIjogewogICAgIyBUeXBlIG9mIHRoZSBxdWV1ZS4gSXQgY2FuIGJlIGVpdGhlciBzcXMgb3Igc25zCiAgICAidHlwZSI6ICJzcXMiLAogICAgIyBOYW1lIG9mIHRoZSBzcXMgcXVldWUKICAgICJxdWV1ZU5hbWUiOiAic25vd3Bsb3ctYmlrcm95LXNocmVkZGVyLWxvYWRlciIsCiAgICAjIFJlZ2lvbiBvZiB0aGUgU1FTIHF1ZXVlLgogICAgIyBPcHRpb25hbCBpZiBpdCBjYW4gYmUgcmVzb2x2ZWQgd2l0aCBBV1MgcmVnaW9uIHByb3ZpZGVyIGNoYWluLgogICAgInJlZ2lvbiI6ICJhcC1zb3V0aGVhc3QtMSIKICB9CiAgIyBTTlMgZXhhbXBsZToKICAjInF1ZXVlIjogewogICMgICMgVHlwZSBvZiB0aGUgcXVldWUuIEl0IGNhbiBiZSBlaXRoZXIgc3FzIG9yIHNucwogICMgICJ0eXBlIjogInNucyIsCiAgIyAgIyBBUk4gb2YgU05TIHRvcGljCiAgIyAgInRvcGljQXJuIjogImFybjphd3M6c25zOmV1LWNlbnRyYWwtMToxMjM0NTY3ODk6dGVzdC1zbnMtdG9waWMiLAogICMgICMgUmVnaW9uIG9mIHRoZSBTTlMgdG9waWMKICAjICAicmVnaW9uIjogImV1LWNlbnRyYWwtMSIKICAjfQoKICAjIFNjaGVtYS1zcGVjaWZpYyBmb3JtYXQgc2V0dGluZ3MgKHJlY29tbWVuZGVkIHRvIGxlYXZlIGFsbCB0aHJlZSBncm91cHMgZW1wdHkgYW5kIHVzZSBUU1YgYXMgZGVmYXVsdCkKICAiZm9ybWF0cyI6IHsKICAgICMgRm9ybWF0IHVzZWQgYnkgZGVmYXVsdCAoVFNWIG9yIEpTT04pCiAgICAjIE9wdGlvbmFsLCBkZWZhdWx0IHZhbHVlIFRTVgogICAgImRlZmF1bHQiOiAiVFNWIiwKICAgICMgU2NoZW1hcyB0byBiZSBzaHJlZGRlZCBhcyBKU09OcywgY29ycmVzcG9uZGluZyBKU09OUGF0aCBmaWxlcyBtdXN0IGJlIHByZXNlbnQuIEF1dG9taWdyYXRpb25zIHdpbGwgYmUgZGlzYWJsZWQKICAgICMgT3B0aW9uYWwsIGRlZmF1bHQgdmFsdWUgW10KICAgICJqc29uIjogWwogICAgICAiaWdsdTpjb20uYWNtZS9qc29uLWV2ZW50L2pzb25zY2hlbWEvMS0wLTAiLAogICAgICAiaWdsdTpjb20uYWNtZS9qc29uLWV2ZW50L2pzb25zY2hlbWEvMi0qLSoiCiAgICBdLAogICAgIyBTY2hlbWFzIHRvIGJlIHNocmVkZGVkIGFzIFRTVnMsIHByZXNlbmNlIG9mIHRoZSBzY2hlbWEgb24gSWdsdSBTZXJ2ZXIgaXMgbmVjZXNzYXJ5LiBBdXRvbWlnYXJ0aW9ucyBlbmFibGVkCiAgICAjIE9wdGlvbmFsLCBkZWZhdWx0IHZhbHVlIFtdCiAgICAidHN2IjogWyBdLAogICAgIyBTY2hlbWFzIHRoYXQgd29uJ3QgYmUgbG9hZGVkCiAgICAjIE9wdGlvbmFsLCBkZWZhdWx0IHZhbHVlIFtdCiAgICAic2tpcCI6IFsKICAgICAgImlnbHU6Y29tLmFjbWUvc2tpcC1ldmVudC9qc29uc2NoZW1hLzEtKi0qIgogICAgXQogIH0sCgogICMgT2JzZXJ2YWJpbGl0eSBhbmQgcmVwb3J0aW5nIG9wdGlvbnMKICAibW9uaXRvcmluZyI6IHsKICAgICMgT3B0aW9uYWwsIGZvciB0cmFja2luZyBydW50aW1lIGV4Y2VwdGlvbnMKICAgICJzZW50cnkiOiB7CiAgICAgICJkc24iOiAiaHR0cDovL3NlbnRyeS5hY21lLmNvbSIKICAgIH0KICB9Cn0K"
        ]
      }
    ],
    "tags": []
  }
}

command to run

dataflow-runner run-transient --emr-config ./config/cluster.json --emr-playbook ./config/playbook.json

Interesting logs (where I feel the issue is)

22/01/01 08:31:47 INFO ApplicationMaster: Waiting for spark context initialization...
- Unable to parse the configuration: DecodingFailure at .name: Attempt to decode value on failed cursor.

Usage: snowplow-rdb-shredder-1.1.0 --iglu-config <<base64>> [--duplicate-storage-config <<base64>>] --config <config.hocon>

Spark job to shred event and context JSONs from Snowplow enriched events

Options and flags:
    --help
        Display this help text.
    --iglu-config <<base64>>
        Base64-encoded Iglu Client JSON config
    --duplicate-storage-config <<base64>>
        Base64-encoded Events Manifest JSON config
    --config <config.hocon>, -c <config.hocon>
        base64-encoded config HOCON
22/01/01 08:31:51 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Shutdown hook called before final status was reported.)
22/01/01 08:31:51 INFO ApplicationMaster: Deleting staging directory hdfs://ip-x-x-x-x.ap-southeast-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1641025694876_0002
22/01/01 08:31:52 INFO ShutdownHookManager: Shutdown hook called
For more detailed output, check the application tracking page: http://ip-x-x-x-x.ap-southeast-1.compute.internal:8088/cluster/app/application_1641025694876_0002 Then click on links to logs of each attempt.
. Failing the application.
	 ApplicationMaster host: N/A
	 ApplicationMaster RPC port: -1
	 queue: default
	 start time: 1641025896332
	 final status: FAILED
	 tracking URL: http://ip-x-x-x-x.ap-southeast-1.compute.internal:8088/cluster/app/application_1641025694876_0002
	 user: hadoop
22/01/01 08:31:53 ERROR Client: Application diagnostics message: Application application_1641025694876_0002 failed 2 times due to AM Container for appattempt_1641025694876_0002_000002 exited with  exitCode: 2
Failing this attempt.Diagnostics: [2022-01-01 08:31:52.508]Exception from container-launch.
Container id: container_1641025694876_0002_02_000001
Exit code: 2
[2022-01-01 08:31:52.511]Container exited with a non-zero exit code 2. Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :
Last 4096 bytes of stderr :

Now observe this error
- Unable to parse the configuration: DecodingFailure at .name: Attempt to decode value on failed cursor
from above error logs. Not sure whats wrong in .name (playbook.json) config or somewhere around it in 2nd step.

Versions.

DATAFLOW_RUNNER_VERSION=0.6.0-rc2
snowplow-s3-loader:2.1.3
stream-enrich-kinesis:2.0.5
scala-stream-collector-kinesis:2.4.5

help is appreciated. thanks

Hi @pramod.niralakeri,

In your playbook.json file, the --config argument should be a valid base64-encoded configuration. But your config is definitely mangled - you can see this by copying your base64 encoded string into this decoder. The decoded version looks like this:

{
  # Path to e\XeܚY\H܈[][]·'66V#&۝X]{
    "schema": "i\Y]K]X[K\ۛeeZYK{
    "schema": "i\\[]\[XYȋ]eYY\]B]]ˆ]eYYe]]]΋۝X]\Y]K]X[K\ۛeeZYK\\Y]\YYȋY\e]]\\[ۋ֒Te܈ӑB[ۘ[Y][[YH֒T\\[ۈ֒T\Y[\e[ۘ[Y][H\Y]UY[ۈݚY\Z[]XeX\eZH[\XX\\[H\Y\Uٚ[H[K΋˘[X^ۘ]˘Kژ]K\K]\ٝ\K[X^ۋ]ܙY[ۜݚY\Y][]ԙY[۔ݚY\Z[e[Y[ۈ\\]X\LHB]Y]YH\Ye[][X]H]eY\]Y]YHˆ\HeوH]Y]YK][HZ]\\e܈ۜˆ\H\ȋ[YHeوH\]Y]YB]Y]YS[YHۛeeXZܛK\Y\[eY\Y[ۈeوHT]Y]YK[ۘ[Y][H\Y]UY[ۈݚY\Z[Y[ۈ\\]X\LHBӔ^[\eNȜ]Y]YHˆ\HeوH]Y]YK][HZ]\\e܈ۜˆ\HۜȋTeوӔeXˆeX\\]ΜۜΙ]KX[[LNL
MN\\ۜ]eXȋY[ۈeوHӔeXˆY[ۈ]KX[[LHB[XK\XYXܛX]][
X[Y[YeeX]H[eYHܛ\[\H[\HՈ\Y][
BܛX]ȎˆܛX]\YHY][
Ոe܈ӊB[ۘ[Y][[YHՂY][Ո[X\eHYY\Ӝܜ\eۙ[Ӕ][\e]\H\[]]eZYܘ][ۜ[eH\XY[ۘ[Y][[YHBۈˆYNKXYKڜۋY][ڜۜ[XKKLLYNKXYKڜۋY][ڜۜ[XK̋JJK[X\eHYY\՜\[HeوH[XHeۈYH\\\eX\\K]]eZY\[ۜ[XY[ۘ[Y][[YHB݈K[X\]ۉHeeYY[ۘ[Y][[YHB\ˆYNKXYK\Y][ڜۜ[XKKJJBK؜\X[]H[\eܝ[e[ۜˆ[ۚ]eܚ[Ȏˆ[ۘ[܈X[[[YH^\[ۜˆ[Hˆۈ[KXYKHBBB

I suggest you revisit how you generated the base64-encoded config.

To post here publicly I just messed it up a bit. its actually correct.

I tried passing like this

          "--iglu-config", "{{base64File "/snowplow/config/iglu_resolver.json"}}",
          "--config", "{{base64File "/snowplow/config/config.hocon"}}"

still no luck

My actual config would look something like this

{
  # Path to enriched archive (must be populated separately with run=YYYY-MM-DD-hh-mm-ss directories) for S3 input
  "input": "s3://snowplow-s3-sink/enriched/",

  # Path to shredded archive
  "output": {
    # Path to shredded output
    "path": "s3://snowplow-s3-shred-test/shredded/",
    # Shredder output compression, GZIP or NONE
    # Optional, default value GZIP
    "compression": "GZIP",
    # This field is optional if it can be resolved with AWS region provider chain.
    # It checks places like env variables, system properties, AWS profile file.
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
    "region": "ap-southeast-1"
  }

  # Queue used to communicate with Loader
  "queue": {
    # Type of the queue. It can be either sqs or sns
    "type": "sqs",
    # Name of the sqs queue
    "queueName": "snowplow-shredder-loader",
    # Region of the SQS queue.
    # Optional if it can be resolved with AWS region provider chain.
    "region": "ap-southeast-1"
  }

  # Schema-specific format settings (recommended to leave all three groups empty and use TSV as default)
  "formats": {
    # Format used by default (TSV or JSON)
    # Optional, default value TSV
    "default": "TSV",
    # Schemas to be shredded as JSONs, corresponding JSONPath files must be present. Automigrations will be disabled
    # Optional, default value []
    "json": [
      "iglu:com.acme/json-event/jsonschema/1-0-0",
      "iglu:com.acme/json-event/jsonschema/2-*-*"
    ],
    # Schemas to be shredded as TSVs, presence of the schema on Iglu Server is necessary. Automigartions enabled
    # Optional, default value []
    "tsv": [ ],
    # Schemas that won't be loaded
    # Optional, default value []
    "skip": [
      "iglu:com.acme/skip-event/jsonschema/1-*-*"
    ]
  }
}

And one more thing to note, after step 1(S3DistCp enriched data archiving) it transfers the s3 enrich sinked files to shredded archive folder but along with it, it’s creating file with _$folder$.

Example Under shredded archive.

run=2006-01-02-15-04-05/ (folder)
run=2006-01-02-15-04-05_$folder$ (file)

Hi @pramod.niralakeri, the config you shared is valid for version 2.0.0 of the shredder (the latest version). But you are currently using version 1.1.0, which expects a slightly different configuration format.

So the change you need to make is in your playbook.json file. Change this line:

          "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-1.1.0.jar",

to

          "s3://snowplow-hosted-assets-eu-central-1/4-storage/rdb-shredder/snowplow-rdb-shredder-2.0.0.jar",

That fixes the problem, thank you. one last issue.

And one more thing to note, after step 1(S3DistCp enriched data archiving) it transfers the s3 enrich sinked files to shredded archive folder but along with it, it’s creating file with _$folder$.
Example Under shredded archive.
run=2006-01-02-15-04-05/ (folder)
run=2006-01-02-15-04-05_$folder$ (file)

Also, let’s please update this doc with 2.0 version.

This is an expected side-effect of some keys that EMR creates as part of the run.

Got it mike thanks.

One last question with shredder. How to use ENV variables in playbook.json or cluster.json

Hi @pramod.niralakeri ,

Unfortunately this is not possible yet.

You probably want to take a look here.

If that does not work, please share the logs of EMR cluster. You can find them in EMR UI (stdout and stderr below) :

Hi @BenB following is the EMR yarn Spark logs

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/10/__spark_libs__685388673399952971.zip/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
22/01/03 16:37:46 INFO SignalUtils: Registered signal handler for TERM
22/01/03 16:37:46 INFO SignalUtils: Registered signal handler for HUP
22/01/03 16:37:46 INFO SignalUtils: Registered signal handler for INT
22/01/03 16:37:46 INFO SecurityManager: Changing view acls to: yarn,hadoop
22/01/03 16:37:46 INFO SecurityManager: Changing modify acls to: yarn,hadoop
22/01/03 16:37:46 INFO SecurityManager: Changing view acls groups to: 
22/01/03 16:37:46 INFO SecurityManager: Changing modify acls groups to: 
22/01/03 16:37:46 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
22/01/03 16:37:46 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:46 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:47 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:47 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:47 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:47 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:47 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1641227632978_0002_000001
22/01/03 16:37:47 INFO ApplicationMaster: Starting the user application in a separate Thread
22/01/03 16:37:47 INFO ApplicationMaster: Waiting for spark context initialization...
22/01/03 16:37:52 INFO SparkContext: Running Spark version 3.0.1-amzn-0
22/01/03 16:37:52 INFO ResourceUtils: ==============================================================
22/01/03 16:37:52 INFO ResourceUtils: Resources for spark.driver:

22/01/03 16:37:52 INFO ResourceUtils: ==============================================================
22/01/03 16:37:52 INFO SparkContext: Submitted application: Main$
22/01/03 16:37:52 INFO SecurityManager: Changing view acls to: yarn,hadoop
22/01/03 16:37:52 INFO SecurityManager: Changing modify acls to: yarn,hadoop
22/01/03 16:37:52 INFO SecurityManager: Changing view acls groups to: 
22/01/03 16:37:52 INFO SecurityManager: Changing modify acls groups to: 
22/01/03 16:37:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn, hadoop); groups with view permissions: Set(); users  with modify permissions: Set(yarn, hadoop); groups with modify permissions: Set()
22/01/03 16:37:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:52 INFO Utils: Successfully started service 'sparkDriver' on port 46509.
22/01/03 16:37:52 INFO SparkEnv: Registering MapOutputTracker
22/01/03 16:37:52 INFO SparkEnv: Registering BlockManagerMaster
22/01/03 16:37:52 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/01/03 16:37:52 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/01/03 16:37:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:52 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:52 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
22/01/03 16:37:53 INFO DiskBlockManager: Created local directory at /mnt1/yarn/usercache/hadoop/appcache/application_1641227632978_0002/blockmgr-4b0c2fe4-104f-4b1b-a5f1-a5c1464b7d5a
22/01/03 16:37:53 INFO DiskBlockManager: Created local directory at /mnt/yarn/usercache/hadoop/appcache/application_1641227632978_0002/blockmgr-189d5979-f4ce-4cc4-b7f4-643bf1a1f81e
22/01/03 16:37:53 INFO MemoryStore: MemoryStore started with capacity 4.0 GiB
22/01/03 16:37:53 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:53 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:53 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:53 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:53 INFO SparkEnv: Registering OutputCommitCoordinator
22/01/03 16:37:53 INFO log: Logging initialized @8681ms to org.sparkproject.jetty.util.log.Slf4jLog
22/01/03 16:37:53 INFO Server: jetty-9.4.20.v20190813; built: 2019-08-13T21:28:18.144Z; git: 84700530e645e812b336747464d6fbbf370c9a20; jvm 1.8.0_312-b07
22/01/03 16:37:53 INFO Server: Started @8864ms
22/01/03 16:37:53 INFO AbstractConnector: Started ServerConnector@ed1d786{HTTP/1.1,[http/1.1]}{0.0.0.0:39979}
22/01/03 16:37:53 INFO Utils: Successfully started service 'SparkUI' on port 39979.
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /jobs: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@46e2ca96{/jobs,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /jobs/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@78adc08f{/jobs/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /jobs/job: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@421dfe54{/jobs/job,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /jobs/job/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@68a6a670{/jobs/job/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@54a6dc83{/stages,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2aad36a8{/stages/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages/stage: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@61505b39{/stages/stage,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages/stage/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@356f4843{/stages/stage/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages/pool: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@72f699c4{/stages/pool,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages/pool/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@29d10d4f{/stages/pool/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /storage: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6bd786d6{/storage,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /storage/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@7a47ca9{/storage/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /storage/rdd: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@626890ca{/storage/rdd,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /storage/rdd/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6a1345cc{/storage/rdd/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /environment: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5d61b4ea{/environment,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /environment/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@63a3d1dd{/environment/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /executors: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@18287104{/executors,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /executors/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3ca7b378{/executors/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /executors/threadDump: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@20610b0{/executors/threadDump,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /executors/threadDump/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@437c4ebc{/executors/threadDump/json,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /static: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@577d2c77{/static,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6e4247e3{/,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /api: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@3b7a73d1{/api,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /jobs/job/kill: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@e9b8fba{/jobs/job/kill,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO ServerInfo: Adding filter to /stages/stage/kill: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:53 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@614533e3{/stages/stage/kill,null,AVAILABLE,@Spark}
22/01/03 16:37:53 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ip-10-0-0-243.ap-southeast-1.compute.internal:39979
22/01/03 16:37:53 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:53 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:53 INFO YarnClusterScheduler: Created YarnClusterScheduler
22/01/03 16:37:53 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45117.
22/01/03 16:37:53 INFO NettyBlockTransferService: Server created on ip-10-0-0-243.ap-southeast-1.compute.internal:45117
22/01/03 16:37:53 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/01/03 16:37:54 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, ip-10-0-0-243.ap-southeast-1.compute.internal, 45117, None)
22/01/03 16:37:54 INFO BlockManagerMasterEndpoint: Registering block manager ip-10-0-0-243.ap-southeast-1.compute.internal:45117 with 4.0 GiB RAM, BlockManagerId(driver, ip-10-0-0-243.ap-southeast-1.compute.internal, 45117, None)
22/01/03 16:37:54 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, ip-10-0-0-243.ap-southeast-1.compute.internal, 45117, None)
22/01/03 16:37:54 INFO BlockManager: external shuffle service port = 7337
22/01/03 16:37:54 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-10-0-0-243.ap-southeast-1.compute.internal, 45117, None)
22/01/03 16:37:54 INFO ServerInfo: Adding filter to /metrics/json: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
22/01/03 16:37:54 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@36c0eb73{/metrics/json,null,AVAILABLE,@Spark}
22/01/03 16:37:54 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:54 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:55 INFO SingleEventLogFileWriter: Logging events to hdfs:/var/log/spark/apps/application_1641227632978_0002_1.inprogress
22/01/03 16:37:55 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:55 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:55 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: fs.s3.buffer.dir;  Ignoring.
22/01/03 16:37:55 WARN Configuration: __spark_hadoop_conf__.xml:an attempt to override final parameter: yarn.nodemanager.local-dirs;  Ignoring.
22/01/03 16:37:55 INFO RMProxy: Connecting to ResourceManager at ip-10-0-0-107.ap-southeast-1.compute.internal/10.0.0.107:8030
22/01/03 16:37:55 INFO YarnRMClient: Registering the ApplicationMaster
22/01/03 16:37:55 INFO ApplicationMaster: Preparing Local resources
22/01/03 16:37:56 INFO ApplicationMaster: 
===============================================================================
Default YARN executor launch context:
  env:
    CLASSPATH -> /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar<CPS>{{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/*<CPS>$HADOOP_COMMON_HOME/lib/*<CPS>$HADOOP_HDFS_HOME/*<CPS>$HADOOP_HDFS_HOME/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$HADOOP_YARN_HOME/*<CPS>$HADOOP_YARN_HOME/lib/*<CPS>/usr/lib/hadoop-lzo/lib/*<CPS>/usr/share/aws/emr/emrfs/conf<CPS>/usr/share/aws/emr/emrfs/lib/*<CPS>/usr/share/aws/emr/emrfs/auxlib/*<CPS>/usr/share/aws/emr/lib/*<CPS>/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar<CPS>/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar<CPS>/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar<CPS>/usr/share/aws/emr/cloudwatch-sink/lib/*<CPS>/usr/share/aws/aws-java-sdk/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*<CPS>/usr/lib/hadoop-lzo/lib/*<CPS>/usr/share/aws/emr/emrfs/conf<CPS>/usr/share/aws/emr/emrfs/lib/*<CPS>/usr/share/aws/emr/emrfs/auxlib/*<CPS>/usr/share/aws/emr/lib/*<CPS>/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar<CPS>/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar<CPS>/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar<CPS>/usr/share/aws/emr/cloudwatch-sink/lib/*<CPS>/usr/share/aws/aws-java-sdk/*<CPS>{{PWD}}/__spark_conf__/__hadoop_conf__
    SPARK_YARN_STAGING_DIR -> hdfs://ip-10-0-0-107.ap-southeast-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1641227632978_0002
    SPARK_USER -> hadoop
    SPARK_PUBLIC_DNS -> ip-10-0-0-243.ap-southeast-1.compute.internal

  command:
    LD_LIBRARY_PATH=\"/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native:$LD_LIBRARY_PATH\" \ 
      {{JAVA_HOME}}/bin/java \ 
      -server \ 
      -Xmx7168m \ 
      '-verbose:gc' \ 
      '-XX:+PrintGCDetails' \ 
      '-XX:+PrintGCDateStamps' \ 
      '-XX:OnOutOfMemoryError=kill -9 %p' \ 
      '-XX:+UseParallelGC' \ 
      '-XX:InitiatingHeapOccupancyPercent=70' \ 
      -Djava.io.tmpdir={{PWD}}/tmp \ 
      '-Dspark.driver.port=46509' \ 
      '-Dspark.history.ui.port=18080' \ 
      '-Dspark.ui.port=0' \ 
      -Dspark.yarn.app.container.log.dir=<LOG_DIR> \ 
      org.apache.spark.executor.YarnCoarseGrainedExecutorBackend \ 
      --driver-url \ 
      spark://CoarseGrainedScheduler@ip-10-0-0-243.ap-southeast-1.compute.internal:46509 \ 
      --executor-id \ 
      <executorId> \ 
      --hostname \ 
      <hostname> \ 
      --cores \ 
      1 \ 
      --app-id \ 
      application_1641227632978_0002 \ 
      --resourceProfileId \ 
      0 \ 
      --user-class-path \ 
      file:$PWD/__app__.jar \ 
      1><LOG_DIR>/stdout \ 
      2><LOG_DIR>/stderr

  resources:
    __app__.jar -> resource { scheme: "hdfs" host: "ip-10-0-0-107.ap-southeast-1.compute.internal" port: 8020 file: "/user/hadoop/.sparkStaging/application_1641227632978_0002/snowplow-rdb-shredder-2.0.0.jar" } size: 57506726 timestamp: 1641227861538 type: FILE visibility: PRIVATE
    __spark_libs__ -> resource { scheme: "hdfs" host: "ip-10-0-0-107.ap-southeast-1.compute.internal" port: 8020 file: "/user/hadoop/.sparkStaging/application_1641227632978_0002/__spark_libs__685388673399952971.zip" } size: 224748239 timestamp: 1641227850839 type: ARCHIVE visibility: PRIVATE
    __spark_conf__ -> resource { scheme: "hdfs" host: "ip-10-0-0-107.ap-southeast-1.compute.internal" port: 8020 file: "/user/hadoop/.sparkStaging/application_1641227632978_0002/__spark_conf__.zip" } size: 290917 timestamp: 1641227861960 type: ARCHIVE visibility: PRIVATE

===============================================================================
22/01/03 16:37:56 INFO Configuration: resource-types.xml not found
22/01/03 16:37:56 INFO ResourceUtils: Unable to find 'resource-types.xml'.
22/01/03 16:37:56 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark://YarnAM@ip-10-0-0-243.ap-southeast-1.compute.internal:46509)
22/01/03 16:37:56 INFO YarnAllocator: Will request 6 executor container(s), each with 1 core(s) and 8512 MB memory (including 1344 MB of overhead)
22/01/03 16:37:56 INFO YarnAllocator: Submitted 6 unlocalized container requests.
22/01/03 16:37:56 INFO ApplicationMaster: Started progress reporter thread with (heartbeat : 3000, initial allocation : 200) intervals
22/01/03 16:37:56 INFO YarnClusterSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
22/01/03 16:37:56 INFO YarnClusterScheduler: YarnClusterScheduler.postStartHook done
22/01/03 16:37:56 INFO YarnAllocator: Launching container container_1641227632978_0002_01_000002 on host ip-10-0-0-243.ap-southeast-1.compute.internal for executor with ID 1
22/01/03 16:37:56 INFO YarnAllocator: Launching container container_1641227632978_0002_01_000003 on host ip-10-0-0-243.ap-southeast-1.compute.internal for executor with ID 2
22/01/03 16:37:56 INFO YarnAllocator: Received 2 containers from YARN, launching executors on 2 of them.
22/01/03 16:37:57 INFO YarnAllocator: Launching container container_1641227632978_0002_01_000004 on host ip-10-0-0-243.ap-southeast-1.compute.internal for executor with ID 3
22/01/03 16:37:57 INFO YarnAllocator: Launching container container_1641227632978_0002_01_000005 on host ip-10-0-0-243.ap-southeast-1.compute.internal for executor with ID 4
22/01/03 16:37:57 INFO YarnAllocator: Launching container container_1641227632978_0002_01_000006 on host ip-10-0-0-243.ap-southeast-1.compute.internal for executor with ID 5
22/01/03 16:37:57 INFO YarnAllocator: Received 3 containers from YARN, launching executors on 3 of them.
22/01/03 16:38:02 INFO AbstractConnector: Stopped Spark@ed1d786{HTTP/1.1,[http/1.1]}{0.0.0.0:0}
22/01/03 16:38:02 INFO SparkUI: Stopped Spark web UI at http://ip-10-0-0-243.ap-southeast-1.compute.internal:39979
22/01/03 16:38:02 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
22/01/03 16:38:02 INFO YarnAllocator: Canceling requests for 1 executor container(s) to have a new desired total 0 executors.
22/01/03 16:38:02 INFO YarnClusterSchedulerBackend: Shutting down all executors
22/01/03 16:38:02 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/01/03 16:38:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/01/03 16:38:03 INFO MemoryStore: MemoryStore cleared
22/01/03 16:38:03 INFO BlockManager: BlockManager stopped
22/01/03 16:38:03 INFO BlockManagerMaster: BlockManagerMaster stopped
22/01/03 16:38:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/01/03 16:38:03 INFO SparkContext: Successfully stopped SparkContext
22/01/03 16:38:03 INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
22/01/03 16:38:03 INFO ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED
22/01/03 16:38:03 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
22/01/03 16:38:03 INFO ApplicationMaster: Deleting staging directory hdfs://ip-10-0-0-107.ap-southeast-1.compute.internal:8020/user/hadoop/.sparkStaging/application_1641227632978_0002
22/01/03 16:38:03 INFO ShutdownHookManager: Shutdown hook called
22/01/03 16:38:03 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1641227632978_0002/spark-499d48a3-d7d6-44b9-87df-2cdf01e61956
22/01/03 16:38:03 INFO ShutdownHookManager: Deleting directory /mnt1/yarn/usercache/hadoop/appcache/application_1641227632978_0002/spark-821d0f9f-b45c-4be0-bac0-060df5c2156c

So this means, all our secretes keys and all will live in git? this is scary.

is there any safe way to get this done?

What secrets do you need in playbook.json and cluster.json ?

You can use the usual AWS_ACCESS_KEY_ID and AWS_ACCESS_KEY_ID with :

"credentials": {
      "accessKeyId": "env",
      "secretAccessKey": "env"
},

in both files, please see the docs.

Can you show stdout and stderr for Spark master please ? They are the ones that should contain an error.

we’ve different problem, same piece of code runs in different countries. and all these bucket name, subnet, key name and everything need to be read from env so that we set env according to country. CI/CD will take care of it for us so. Hope you understand the problem now

@BenB sorry it’s getting dragged. but I’ve provided the log i’d. I don’t see spark master anywhere in EMR. (Screenshot for reference).

shredder has just 2 steps.

  1. S3 sink to shred archive folder
  2. and shred

And all this log. Not sure @BenB what I’m missing


By Spark master I was talking about the driver that we can see on your screenshot. Can you check the stdout and stderr please ?

Are there files in the run= folders in archive/enriched/ copied by S3DistCp ?

That seems weird to me that we don’t see any Spark executor apart from the driver on your screenshot.

@BenB what I observed is S3 loader has uploaded data from enriched kinesis to S3 in this following structure.

s3://snowplow-s3-sink/enriched/unknown.unknown/model=-1/date=2022-01-05/

Wondering if this is a valid structure or path? primarily my S3 sink path is s3://snowplow-s3-sink/enriched and I guess a folder should be created by date=<date> immediately but not sure why /unknown.unknown/model=-1 folder structure is in between.

So what I did is, moved date=2022-01-05 folder 2 steps above and deleted unknown.unknown/model=-1 and ran the EMR shred. Shredder will move files from enriched to shred/archive(i.e step 1) and then runs shredder(step 2). EMR job complete successfully, without any error. and that’s all logs I see in EMR ui(shared screenshot earlier/above)
note that date=2022-01-05 folder has gz files, around 100-200 files.

Also note : these events are generated by triggering multiple times this sample event from snowplow example

And if I run as it is(without moving date=<date> files 2 steps above).

step 2 fails with following error

	 client token: N/A
	 diagnostics: User class threw exception: java.io.IOException: Not a file: s3://nucleus-data-team-snowplow-kafka-s3-sink-test/enriched/unknown.unknown/model=-1
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:303)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:292)
	at org.apache.spark.scheduler.DAGScheduler.createShuffleMapStage(DAGScheduler.scala:450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateShuffleMapStage$1(DAGScheduler.scala:415)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
	at scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:408)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:531)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
	at scala.collection.SetLike.map(SetLike.scala:104)
	at scala.collection.SetLike.map$(SetLike.scala:104)
	at scala.collection.mutable.AbstractSet.map(Set.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:530)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:517)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1049)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2344)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2333)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:815)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob.run(ShredJob.scala:139)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob$.$anonfun$run$35(ShredJob.scala:226)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob$.$anonfun$run$35$adapted(ShredJob.scala:223)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob$.run(ShredJob.scala:223)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main$.$anonfun$main$2(Main.scala:45)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at shadecats.syntax.EitherObjectOps$.catchNonFatal$extension(either.scala:370)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main$.main(Main.scala:45)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:728)

	 ApplicationMaster host: ip-10-0-0-27.ap-southeast-1.compute.internal
	 ApplicationMaster RPC port: 41395
	 queue: default
	 start time: 1641352009512
	 final status: FAILED
	 tracking URL: http://ip-10-0-0-92.ap-southeast-1.compute.internal:20888/proxy/application_1641351772500_0002/
	 user: hadoop
22/01/05 03:08:12 ERROR Client: Application diagnostics message: User class threw exception: java.io.IOException: Not a file: s3://nucleus-data-team-snowplow-kafka-s3-sink-test/enriched/unknown.unknown/model=-1
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:303)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.RDD.getNumPartitions(RDD.scala:292)
	at org.apache.spark.scheduler.DAGScheduler.createShuffleMapStage(DAGScheduler.scala:450)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateShuffleMapStage$1(DAGScheduler.scala:415)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
	at scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
	at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateShuffleMapStage(DAGScheduler.scala:408)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$getOrCreateParentStages$1(DAGScheduler.scala:531)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
	at scala.collection.SetLike.map(SetLike.scala:104)
	at scala.collection.SetLike.map$(SetLike.scala:104)
	at scala.collection.mutable.AbstractSet.map(Set.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.getOrCreateParentStages(DAGScheduler.scala:530)
	at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:517)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1049)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2344)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2333)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:815)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob.run(ShredJob.scala:139)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob$.$anonfun$run$35(ShredJob.scala:226)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob$.$anonfun$run$35$adapted(ShredJob.scala:223)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.ShredJob$.run(ShredJob.scala:223)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main$.$anonfun$main$2(Main.scala:45)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at shadecats.syntax.EitherObjectOps$.catchNonFatal$extension(either.scala:370)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main$.main(Main.scala:45)
	at com.snowplowanalytics.snowplow.rdbloader.shredder.batch.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:728)

Exception in thread "main" org.apache.spark.SparkException: Application application_1641351772500_0002 finished with failed status
	at org.apache.spark.deploy.yarn.Client.run(Client.scala:1196)
	at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1587)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:936)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1015)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1024)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)