Hi @ihor,
My configuration for the EMR ETL Runner looks as follows:
aws:
# Credentials can be hardcoded or set in environment variables
access_key_id: <%= ENV['AWS_SNOWPLOW_ACCESS_KEY'] %>
secret_access_key: <%= ENV['AWS_SNOWPLOW_SECRET_KEY'] %>
s3:
region: ap-south-1
buckets:
assets: s3://snowplow-hosted-assets # DO NOT CHANGE unless you are hosting the jarfiles etc yourself in your own bucket
jsonpath_assets: s3://scrpbx-snwplw-hosted-assets/jsonpaths
# https://stackoverflow.com/questions/10569455/differences-between-amazon-s3-and-s3n-in-hadoop
log: s3n://logs/
enriched:
good: s3://output/enriched/good # e.g. s3://my-out-bucket/enriched/good
bad: s3://output/enriched/bad # e.g. s3://my-out-bucket/enriched/bad
stream: s3://collector-logs
shredded:
good: s3://output/shredded/good # e.g. s3://my-out-bucket/shredded/good
bad: s3://output/shredded/bad # e.g. s3://my-out-bucket/shredded/bad
errors: s3://output/shredded/error # Leave blank unless :continue_on_unexpected_error: set to true below
archive: s3://output/shredded/archive # Where to archive shredded events to, e.g. s3://my-archive-bucket/shredded
emr:
ami_version: 5.9.0
region: ap-south-1 # Always set this
jobflow_role: EMR_EC2_DefaultRole # Created using $ aws emr create-default-roles
service_role: EMR_DefaultRole # Created using $ aws emr create-default-roles
autoscaling_role: EMR_AutoScaling_DefaultRole
placement: # Set this if not running in VPC. Leave blank otherwise
ec2_subnet_id: subnet-c9ed65a1 # Set this if running in VPC. Leave blank otherwise
ec2_key_name: test@test.com
bootstrap: [] # Set this to specify custom boostrap actions. Leave empty otherwise
software:
hbase: # Optional. To launch on cluster, provide version, "0.92.0", keep quotes. Leave empty otherwise.
lingual: # Optional. To launch on cluster, provide version, "1.1", keep quotes. Leave empty otherwise.
# Adjust your Hadoop cluster below
jobflow:
job_name: Snowplow Stream ETL # Give your job a name
master_instance_type: c4.xlarge
core_instance_count: 2
core_instance_type: c4.xlarge
core_instance_ebs: # Optional. Attach an EBS volume to each core instance.
volume_size: 100 # Gigabytes
volume_type: "gp2"
volume_iops: 400 # Optional. Will only be used if volume_type is "io1"
ebs_optimized: false # Optional. Will default to true
task_instance_count: 0 # Increase to use spot instances
task_instance_type: c4.xlarge
task_instance_bid: 0.015 # In USD. Adjust bid, or leave blank for non-spot-priced (i.e. on-demand) task instances
bootstrap_failure_tries: 3 # Number of times to attempt the job in the event of bootstrap failures
configuration:
yarn-site:
yarn.resourcemanager.am.max-attempts: "1"
spark:
maximizeResourceAllocation: "true"
additional_info: # Optional JSON string for selecting additional features
enrich:
versions:
spark_enrich: 1.13.0 # Version of the Spark Enrichment process
continue_on_unexpected_error: false # Set to 'true' (and set :out_errors: above) if you don't want any exceptions thrown from ETL
output_compression: NONE # Compression only supported with Redshift, set to NONE if you have Postgres targets. Allowed formats: NONE, GZIP
storage:
versions:
rdb_loader: 0.14.0
rdb_shredder: 0.13.0 # Version of the Spark Shredding process
hadoop_elasticsearch: 0.1.0 # Version of the Hadoop to Elasticsearch copying process
monitoring:
tags: {} # Name-value pairs describing this job
logging:
level: DEBUG # You can optionally switch to INFO for production
snowplow:
method: get
app_id: snowplow # e.g. snowplow
collector: collector.test.net # e.g. d3rkrsqld9gmqf.cloudfront.net
I’m currently running R104(Stoplesteinan) of the EMR ETL runner. It is executed with the command
AWS_SNOWPLOW_ACCESS_KEY=key AWS_SNOWPLOW_SECRET_KEY=secret snowplow-emr-etl-runner-r104 run -c emr-etl-runner-stream-enrich.config.yml --resolver iglu_resolver.json -t targets -n enrichments -f staging_stream_enrich
The targets
folder contains the Redshift target with the required configuration. I’ve also added additional enrichments using the enrichment
folder.
With the staging_stream_enrich
mode, there is only one step displayed in EMR which is Elasticity S3DistCp Step: Shredded S3 -> S3 Shredded Archive
. There are no error logs for this step, just the controller
logs and syslog
.
Controller logs:
2019-06-05T10:05:49.455Z INFO Ensure step 1 jar file /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar
2019-06-05T10:05:49.456Z INFO StepRunner: Created Runner for step 1
INFO startExec 'hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess'
INFO Environment:
PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
LESS_TERMCAP_md=[01;38;5;208m
LESS_TERMCAP_me=[0m
HISTCONTROL=ignoredups
LESS_TERMCAP_mb=[01;31m
AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
UPSTART_JOB=rc
LESS_TERMCAP_se=[0m
HISTSIZE=1000
HADOOP_ROOT_LOGGER=INFO,DRFA
JAVA_HOME=/etc/alternatives/jre
AWS_DEFAULT_REGION=ap-south-1
AWS_ELB_HOME=/opt/aws/apitools/elb
LESS_TERMCAP_us=[04;38;5;111m
EC2_HOME=/opt/aws/apitools/ec2
TERM=linux
XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
runlevel=3
LANG=en_US.UTF-8
AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
MAIL=/var/spool/mail/hadoop
LESS_TERMCAP_ue=[0m
LOGNAME=hadoop
PWD=/
LANGSH_SOURCED=1
HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-3OM7XL0P3NKQ/tmp
_=/etc/alternatives/jre/bin/java
CONSOLETYPE=serial
RUNLEVEL=3
LESSOPEN=||/usr/bin/lesspipe.sh %s
previous=N
UPSTART_EVENTS=runlevel
AWS_PATH=/opt/aws
USER=hadoop
UPSTART_INSTANCE=
PREVLEVEL=N
HADOOP_LOGFILE=syslog
PYTHON_INSTALL_LAYOUT=amzn
HOSTNAME=ip-172-31-50-196
NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ
EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
SHLVL=5
HOME=/home/hadoop
HADOOP_IDENT_STRING=hadoop
INFO redirectOutput to /mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ/stdout
INFO redirectError to /mnt/var/log/hadoop/steps/s-3OM7XL0P3NKQ/stderr
INFO Working dir /mnt/var/lib/hadoop/steps/s-3OM7XL0P3NKQ
INFO ProcessRunner started child process 8578 :
hadoop 8578 4184 0 10:05 ? 00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess
2019-06-05T10:05:53.462Z INFO HadoopJarStepRunner.Runner: startRun() called for s-3OM7XL0P3NKQ Child Pid: 8578
INFO Synchronously wait child process to complete : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO waitProcessCompletion ended with exit code 0 : hadoop jar /usr/share/aws/emr/s3-dist-cp/lib/s3...
INFO total process run time: 30 seconds
2019-06-05T10:06:21.722Z INFO Step created jobs: job_1559729030926_0001
2019-06-05T10:06:21.722Z INFO Step succeeded with exitCode 0 and took 30 seconds
$@
Syslog:
2019-06-05 10:05:51,788 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Running with args: --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess
2019-06-05 10:05:52,048 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): S3DistCp args: --src s3://output/shredded/good/run=2018-09-12-20-00-21/ --dest s3://output/shredded/archive/run=2018-09-12-20-00-21/ --s3Endpoint s3-ap-south-1.amazonaws.com --deleteOnSuccess
2019-06-05 10:05:52,067 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Using output path 'hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/output'
2019-06-05 10:05:54,021 WARN com.amazon.ws.emr.hadoop.fs.rolemapping.RoleMappings (main): Found no mappings configured with 'fs.s3.authorization.roleMapping', credentials resolution may not work as expected
2019-06-05 10:05:55,230 WARN com.amazonaws.profile.path.cred.CredentialsLegacyConfigLocationProvider (main): Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
2019-06-05 10:05:55,288 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): DefaultAWSCredentialsProviderChain is used to create AmazonS3Client. KeyId: ASIA2LHO7ZSGIVSQCLND
2019-06-05 10:05:55,288 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): AmazonS3Client setEndpoint s3-ap-south-1.amazonaws.com
2019-06-05 10:05:55,422 INFO com.amazon.elasticmapreduce.s3distcp.FileInfoListing (main): Opening new file: hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/files/1
2019-06-05 10:05:55,500 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Created 1 files to copy 9 files
2019-06-05 10:05:56,241 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Reducer number: 3
2019-06-05 10:05:56,501 INFO org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl (main): Timeline service address: http://ip-172-31-50-196.ap-south-1.compute.internal:8188/ws/v1/timeline/
2019-06-05 10:05:56,508 INFO org.apache.hadoop.yarn.client.RMProxy (main): Connecting to ResourceManager at ip-172-31-50-196.ap-south-1.compute.internal/172.31.50.196:8032
2019-06-05 10:05:57,279 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 1
2019-06-05 10:05:57,351 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): number of splits:1
2019-06-05 10:05:57,931 INFO org.apache.hadoop.mapreduce.JobSubmitter (main): Submitting tokens for job: job_1559729030926_0001
2019-06-05 10:05:58,466 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl (main): Submitted application application_1559729030926_0001
2019-06-05 10:05:58,504 INFO org.apache.hadoop.mapreduce.Job (main): The url to track the job: http://ip-172-31-50-196.ap-south-1.compute.internal:20888/proxy/application_1559729030926_0001/
2019-06-05 10:05:58,505 INFO org.apache.hadoop.mapreduce.Job (main): Running job: job_1559729030926_0001
2019-06-05 10:06:04,583 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1559729030926_0001 running in uber mode : false
2019-06-05 10:06:04,584 INFO org.apache.hadoop.mapreduce.Job (main): map 0% reduce 0%
2019-06-05 10:06:10,625 INFO org.apache.hadoop.mapreduce.Job (main): map 100% reduce 0%
2019-06-05 10:06:17,654 INFO org.apache.hadoop.mapreduce.Job (main): map 100% reduce 33%
2019-06-05 10:06:18,658 INFO org.apache.hadoop.mapreduce.Job (main): map 100% reduce 67%
2019-06-05 10:06:19,662 INFO org.apache.hadoop.mapreduce.Job (main): map 100% reduce 100%
2019-06-05 10:06:20,673 INFO org.apache.hadoop.mapreduce.Job (main): Job job_1559729030926_0001 completed successfully
2019-06-05 10:06:20,748 INFO org.apache.hadoop.mapreduce.Job (main): Counters: 54
File System Counters
FILE: Number of bytes read=1133
FILE: Number of bytes written=518685
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=3577
HDFS: Number of bytes written=0
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=6
S3: Number of bytes read=0
S3: Number of bytes written=0
S3: Number of read operations=0
S3: Number of large read operations=0
S3: Number of write operations=0
Job Counters
Launched map tasks=1
Launched reduce tasks=3
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=129976
Total time spent by all reduces in occupied slots (ms)=1425952
Total time spent by all map tasks (ms)=2954
Total time spent by all reduce tasks (ms)=16204
Total vcore-milliseconds taken by all map tasks=2954
Total vcore-milliseconds taken by all reduce tasks=16204
Total megabyte-milliseconds taken by all map tasks=4159232
Total megabyte-milliseconds taken by all reduce tasks=45630464
Map-Reduce Framework
Map input records=9
Map output records=9
Map output bytes=4421
Map output materialized bytes=1121
Input split bytes=170
Combine input records=0
Combine output records=0
Reduce input groups=9
Reduce shuffle bytes=1121
Reduce input records=9
Reduce output records=0
Spilled Records=18
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=625
CPU time spent (ms)=19230
Physical memory (bytes) snapshot=1539936256
Virtual memory (bytes) snapshot=16474624000
Total committed heap usage (bytes)=1281359872
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=3407
File Output Format Counters
Bytes Written=0
2019-06-05 10:06:20,750 INFO com.amazon.elasticmapreduce.s3distcp.S3DistCp (main): Try to recursively delete hdfs:/tmp/a1ae3c19-b66f-4eac-9f0a-c2cd80144d47/tempspace
Also as a digression, would it be possible to the records from the raw
kinesis stream to collector-logs
s3 bucket and use EMR ETL directly? That way I can remove the Stream Enrich
part in my pipeline.