Enrich Raw Events fails due to "Not a file: hdfs" -- Clojure connector -- EMR ETL Runner

Collector: Clojure-Connector

EMR ETL Runner is failing at the “Elasticity Spark Step: Enrich Raw Events” step. I have reviewed the container logs associated with this job and it appears to be failing due to (please see error below). Any help will be greatly appreciated.

Error

ERROR ApplicationMaster: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-5.ec2.internal:8020/local/snowplow/raw-events/resources/environments java.io.IOException: Not a file: hdfs://ip-10-97-58-5.ec2.internal:8020/local/snowplow/raw-events/resources/environments

Command line

./snowplow-emr-etl-runner run -c ./snowplow/3-enrich/emr-etl-runner/config/config.yml -r ./resolver.json --enrichments ./snowplow/3-enrich/config/enrichments/ --targets snowplow/4-storage/ --debug

Conf file

aws:
  # Credentials can be hardcoded or set in environment variables
  access_key_id: ***********
  secret_access_key: **********
  s3:
    region: us-east-1
    buckets:
      assets: s3n://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
      jsonpath_assets: # If you have defined your own JSON Schemas, add the s3:// path to your own JSON Path files in your own bucket here
      log: s3n://******-snowplow-etl/logs
      raw:
        in:                  # This is a YAML array of one or more in buckets - you MUST use hyphens before each entry in the array, as below
          - s3n://elasticbeanstalk-us-east-1-**********         # e.g. s3://my-old-collector-bucket
        processing: s3://*******-snowplow-etl-archive/processing
        archive: s3://******-snowplow-etl-archive/raw    # e.g. s3://my-archive-bucket/raw
      enriched:
        good: s3://********-snowplow-etl-data/enriched/good       # e.g. s3://my-out-bucket/enriched/good
        bad: s3://********-snowplow-etl-data/enriched/bad        # e.g. s3://my-out-bucket/enriched/bad
        errors:  s3://********-snowplow-etl-data/enriched/errors
        archive: s3://********-snowplow-etl-data/enriched/archive    # Where to archive enriched events to, e.g. s3://my-archive-bucket/enriched
      shredded:
        good: s3://********-snowplow-etl-data/shredded/good       # e.g. s3://my-out-bucket/shredded/good
        bad: s3://********-snowplow-etl-data/shredded/bad        # e.g. s3://my-out-bucket/shredded/bad
        errors:  s3://********-snowplow-etl-data/shredded/errors   # Leave blank unless :continue_on_unexpected_error: set to true below
        archive: s3://********-snowplow-etl-data/shredded/archive   # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
  emr:
    ami_version: 5.8.0
    region: us-east-1        # Always set this
    jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
    service_role: EMR_DefaultRole     # Created using $ aws emr create-default-roles
    placement:      # Set this if not running in VPC. Leave blank otherwise
    ec2_subnet_id: subnet-********* # Set this if running in VPC. Leave blank otherwise
    ec2_key_name: ************
    bootstrap: []           # Set this to specify custom boostrap actions. Leave empty otherwise
    software:
      hbase:                # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
      lingual:              # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
    # Adjust your Hadoop cluster below
    jobflow:
      job_name: Snowplow # Give your job a name
      master_instance_type: c4.large
      core_instance_count: 2
      core_instance_type: c4.large
      core_instance_ebs:    # Optional. Attach an EBS volume to each core instance.
        volume_size: 200    # Gigabytes
        volume_type: "gp2"
        volume_iops: 400    # Optional. Will only be used if volume_type is "io1"
        ebs_optimized: false # Optional. Will default to true
      task_instance_count: 0 # Increase to use spot instances
      task_instance_type: c4.large
      task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
    bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
    configuration:
      yarn-site:
        yarn.resourcemanager.am.max-attempts: "1"
      spark:
        maximizeResourceAllocation: "true"
    additional_info:        # Optional JSON string for selecting additional features
collectors:
  format: clj-tomcat # For example: 'clj-tomcat' for the Clojure Collector, 'thrift' for Thrift records, 'tsv/com.amazon.aws.cloudfront/wd_access_log' for Cloudfront access logs or 'ndjson/urbanairship.connect/v1' for UrbanAirship Connect events
enrich:
  versions:
    spark_enrich: 1.9.0 # Version of the Spark Enrichment process
  continue_on_unexpected_error: true  # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
  output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
  versions:
    rdb_loader: 0.12.0
    rdb_shredder: 0.12.0        # Version of the Spark Shredding process
    hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
  tags: {} # Name-value pairs describing this job
  logging:
    level: DEBUG # You can optionally switch to INFO for production
  snowplow:
    method: get
    app_id: snowplow # e.g. snowplow
    collector: ********.us-east-1.elasticbeanstalk.com # e.g. d3rkrsqld9gmqf.cloudfront.net

Could you give us the detail of the step prior to the enrich one?
That should be a step named “Elasticity S3DistCp Step: Raw S3 -> Raw HDFS”? I’m particularly interested in the arguments.

Thank you, I was able to confirm by cloning the terminated cluster that the files are indeed being copied over to the cluster.

Elasticity S3DistCp Step: Raw S3 -> Raw HDFS

JAR location : /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
Main class : None
Arguments : --src s3://********-snowplow-etl-archive/processing/ --dest hdfs:///local/snowplow/raw-events/ --s3Endpoint s3.amazonaws.com
Action on failure: Terminate cluster

Could you check the contents of the s3://********-snowplow-etl-archive/processing/ bucket or alternatively, the hdfs:///local/snowplow/raw-events/ HDFS folder.

I find it weird that there would be a /local/snowplow/raw-events/resources/environments folder in there.

I confirmed the logs are in both the S3 archive/processing bucket and hdfs, I checked the format in the S3 bucket and it matches the example in the documentation for tomcat clojure-connector.

HDFS

hdfs dfs -ls /local/snowplow/raw-events/resources/environments/logs/publish/e-******/i-******
single log output just as an example:  var_log_tomcat8_rotated_localhost_access_log.txt1506348061.gz

Hi Ben, also here is the stacktrace from the error. Oh, and Nick Chammas says hi!

17/09/26 14:31:08 INFO FileInputFormat: Total input paths to process : 26
17/09/26 14:31:08 ERROR ApplicationMaster: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-243.ec2.internal:8020/local/snowplow/raw-events/resources/environments
java.io.IOException: Not a file: hdfs://ip-10-97-58-243.ec2.internal:8020/local/snowplow/raw-events/resources/environments
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.run(EnrichJob.scala:198)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.run(EnrichJob.scala:84)
	at com.snowplowanalytics.snowplow.enrich.spark.SparkJob$class.main(SparkJob.scala:32)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.main(EnrichJob.scala:51)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.main(EnrichJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)

Ah, the problem comes from the fact that you should specify your in buckets as:

raw:
  in:
    - s3n://elasticbeanstalk-us-east-1-**********/resources/environments/logs/publish/e-******

for each of your collector which can be identified with e-*****.

Indeed, the folder hierarchy needs to be mostly flat to pick up every log file.

Say hi to Nick from me as well! Small world!

Will do, I should have updated. I had actually made that change to the config after I submitted my original post for review. I wonder if this could be tied to the S3 bucket lay out. This might not be correct, the processing bucket has two directories in it.

******-snowplow-etl-archive/processing/resources/environments/logs/publish
e-*******q
e-*******a
 - s3n://elasticbeanstalk-us-east-1-*******/resources/environments/logs/publish/e-*****

Then you’ll need something like:

raw:
  in:
    - "s3n://elasticbeanstalk-us-east-1-**********/resources/environments/logs/publish/e-******q"
    - "s3n://elasticbeanstalk-us-east-1-**********/resources/environments/logs/publish/e-******a"

I ran the job again but unfortunately it produces the same stacktrace.

17/09/26 14:06:54 ERROR ApplicationMaster: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-60.ec2.internal:8020/local/snowplow/raw-events/resources/environments
java.io.IOException: Not a file: hdfs://ip-10-97-58-60.ec2.internal:8020/local/snowplow/raw-events/resources/environments
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:288)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
	at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.run(EnrichJob.scala:198)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.run(EnrichJob.scala:84)
	at com.snowplowanalytics.snowplow.enrich.spark.SparkJob$class.main(SparkJob.scala:32)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob$.main(EnrichJob.scala:51)
	at com.snowplowanalytics.snowplow.enrich.spark.EnrichJob.main(EnrichJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:635)
17/09/26 14:06:54 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.io.IOException: Not a file: hdfs://ip-10-97-58-60.ec2.internal:8020/local/snowplow/raw-events/resources/environments)

It made it past the enrichment step. I added the extra bucket in the config, and then I cleaned up my buckets, reran and it worked. Thanks for all the help! Much appreciated.

No problem!