SendGrid+Snowplow+AWS S3&Redshift

@anton Thank you so much!

hi @anton,

Here I got a new question. The Stream Enrich has been successfully processed data into my enriched stream. Now I think I need to set up S3 Loader to get data into S3 bucket. I have created a bucket for that, and put values into configuration file.

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "xxx"
  secretKey = "xxx/"
}

# Config for NSQ
# nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  # channelName = "{{nsqSourceChannelName}}"

  # Host name for NSQ tools
  # host = "{{nsqHost}}"

  # HTTP port for nsqd
  # port = {{nsqdPort}}

  # HTTP port for nsqlookupd
  # lookupPort = {{nsqlookupdPort}}
# }

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = TRIM_HORIZON

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  # initialTimestamp = "{{timestamp}}"

  # Maximum number of records to read per GetRecords call
  maxRecords = 1000

  region = "us-east-1"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "s3loader-test"
}

streams {
  # Input stream name
  inStreamName = "Stream-Enriched-Good"

  # Stream for events for which the storage process fails
  outStreamName = "S3-Process-Fail"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 1048576
    # Not supported by NSQ; will be ignored
    recordLimit = 100
    timeLimit = 60000
    # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "us-east-1"
  bucket = "enrich-s3-loader"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for
  maxTimeout = 120000
}

# Optional section for tracking endpoints
# monitoring {
#  snowplow{
#    collectorUri = "{{collectorUri}}"
#    collectorPort = 80
#    appId = "{{appName}}"
#    method = "{{method}}"
#  }
# }

I double checked all values and the same access keys are able to connect to streams and create tables in DynamoDB as well.

After I run it, I got this error.

configuration error: ConfigReaderFailures(KeyNotFound(nsq,Some(ConfigValueLocation(file:/home/ec2-user/loader2.conf,1)),Set()),List())

Do you have any advice?

Thank you!

Hi @anton,

Nevermind, I figured it out. NSQ needs some configuration values.

@anton
Hi, I have some questions about EmrEtlRunner configuration.

I am using stream enrich mode and editing the configuration file now, for S3 buckets, I created a couple for enriched and shredded. I just need to provide something like this: s3://emr-etl-enrich/good, right? I created a bucket named “emr-etl-enrich” and a folder “good”, no link/URL for the bucket need to provide, right?

Another is about ec2 key and subnet_id, there is a link in the instruction, but I believe it has been redirected. I go to that page and there is no useful information. So what do I need to do for this step? Creating a pair key and configure it’s name into it? How about the subnet ID?

Much appreciated all your help!

Yes, that’s right. I’m not even sure what other options we could have.

Yup, this is just an EC2 SSH key. You need to create one via EC2 console and specify its name. Using this key you’ll be able to log in to EMR master node (but I’m 99.9% sure you won’t ever need to do this). You can get list of existing SSH keys (or create new one) in AWS Console → EC2 → Key pairs.

subnet_id

Same here - it can be any EC2 subnet in your account. You can get list of existing Subnets (or create new one) in AWS Console → VPC → Subnets. IIRC pretty much any default configuration would work.

UPD: let’s use single thread, this one or RDB Loader, Storage Loader, EmrEtlRunner - #2 by ihor. Otherwise it’s hard to track what’s going on.

Much appreciated!
Thank you!

hello, @anton.

After create all of these and running the runner with config file and resolver json, I got the process completed. (showing completed successfully). I did not use redshift.json, because I am not understanding what is the real role of it and it’s optional.

Here is my problem, the S3 shredded bucket got files every time after running the runner, but these files are 0B and contain nothing. Absolutely, there is no data being loaded into Redshift. Do I need to do further configuration to fix this issue?

Thanks!

@anton More information about this, after s3 Loader running, files are getting into the S3 bucket with data (around 500b to 1kb). But after the runner running, everything is 0B.

Hi @AllenWeieiei,

You said:

Do you mean enriched bucket? The one you specified as aws.s3.buckets.enriched.stream in config.yml. If there’s data in enriched bucket then Shredder must produce something. Maybe it just sends data into aws.s3.buckets.shredded.bad bucket. Can you check it?

Can you also provides us an output of EmrEtlRunner? It should show you what steps were running.

It is optional only if you don’t need Redshift. This is configuration file, without which it is not possible to establish connection to Redshift cluster. Also RDB Loader won’t be launched if you don’t have redshift.json config file. So, if you need your data in Redshift - its pretty much mandatory.

@anton. Thanks!

When I said “files are getting into the S3 bucket with data (around 500b to 1kb)”, I mean after S3 Loader running, there are some files in the S3 bucket of the loader.

I took some screenshots of shredded and enriched folder. I do have some files in shredded bad folder but all of them are 0B as well.

So I have data in Kinesis stream (from collector), enriched stream (from enrich), S3 bucket (from S3 Loader), I don’t have data in emr-etl-enrich or emr-etl-shredded bucket (from emr-etl-runner). What I mean for I don’t have data is I have files there, but they are all 0B.

How do I generate the output of EmrEtlRunner? I want to provide it to here.

Thank you!

How do I generate the output of EmrEtlRunner? I want to provide it to here.

EmrEtlRunner generates output when you launch it in a terminal.

It seems that emr-etl-shred/good is your aws.s3.buckets.shredded.good bucket. What about aws.s3.buckets.shredded.archive? I bet all data is there. So EmrEtlRunner took data, shredded it and moved into archive. It should have also load it into Redshift, but didn’t because you don’t have redshift.json config. Add a Redshift config and it will start an RDB Loader.

yep, I have files are not empty in archive folder.

question about configuration file.

  1. I got a warning like this: I, [2019-10-23T13:35:24.475406 #20287] INFO – : Sending GET request to http://sgwebhook.moneymappress.com:443/i
    W, [2019-10-23T13:36:24.483339 #20287] WARN – : Failed to open TCP connection to sgwebhook.moneymappress.com:443 (execution expired) (Net::OpenTimeout)
    I pretty sure I am using the URL that my collector is using right now, and port is the number in my collector configuration file. Why is it failed? Although it’s failed, I can still get files into S3 bucket shredded.

  2. For the redshift.json, the value of schema would be “atomic.events”, is this correct? I created this table manually before as schema: atomic and name: events. Also, there are 11 types of data (subscribe, click… and so on), do we have the ability to load data into associated table?

Thank you!

@AllenWeieiei,

Indeed, when I try that (collector?) endpoint it does not respond - sgwebhook.moneymappress.com doesn’t appear to be operational.

The target configuration file requires schema name - that is it should be just “atomic”. You need to create all the tables for the corrersponding events you track. Their DDL scripts are in Iglu Central.

1 Like

ohhh,thanks!

Yeah, I have created all tables used these DDLs. So provide redshift.json will make the EmrEtlRunner loading data to these tables? Or just the atomic.events?

All tables that have correspondiing folders in shredded bucket.

ok!

for the target file, redshift.json. I have it there as you can see the screenshot. Then I use the following command to run the runner,
./snowplow-emr-etl-runner run -c eer5-1.conf -r resolver1.json -t redshift3.json

Without the redshift part, it’s fine. But if I have it, I got the error as the screenshot. Do I need to do something different than the configuration file and resolver json file?

Thanks!

@AllenWeieiei, -t should be pointing to the directory where your Redshift configuration file is.

Yeah, that’s what I am doing. redshift3.json is my redshift configuration file (from my understanding, it’s similar as the resolver json file, am I write?). You can see it’s there, redshift3.json. The resolver1.json is there too, and using -r can correctly find it.

@ihor Or I have to set up something different for the redshift json?

Thanks!

@AllenWeieiei, as per EmfrEtlRunner usage wiki, -t points to the directory of different targets while -r is a single resolver file.