SendGrid+Snowplow+AWS S3&Redshift

Hi Team,

Thanks for your time to read this and any help would be much appreciated!

I have set up a scala stream collector and it’s linked with SendGrid and Kinesis stream. I can get raw event data from SendGrid in S3. What I want to do next is loading them in Redshift. I have already created 11 tables using DDLs that Snowplow provided.

The problem is the raw data (Thrift) files are containing 11 topics in a single file, how could I load them into different 11 tables? Do I need to separate them into 11 pieces and execute COPY command for each?

Thank you very much!

Hi @AllenWeieiei,

The next component you need is Stream Enrich that validates raw data and transforms it into a canonical TSV format. However this also isn’t the end if you also want to load data into Redshift because you’ll need RDB Loader and optionally EmrEtlRunner to orchestrate it.

What you want to have is so called Lambda Architecture, you can read more in its dedicated article:

@anton

Thanks, will try!

Hi, @anton

I installed the jar and put conf file and resolver json there, when I run it, I get this error.

[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1

[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator

[main] ERROR com.amazonaws.services.kinesis.leases.impl.LeaseManager - Failed to get table status for kinesis

Could you please help? Thanks!

@AllenWeieiei, I suppose this has something to do with DynamoDB. Stream Enrich uses KCL (Kinesis Client Library) under the hood and it manages state of the stream processors through DynamoDB. When you first launch Stream Enrich it tries to create a DynamoDB table with name of your application (enrich.appName config option). Make sure your instance has permissions to create and access this table and that it does not yet exist.

@anton Thank you so much!

hi @anton,

Here I got a new question. The Stream Enrich has been successfully processed data into my enriched stream. Now I think I need to set up S3 Loader to get data into S3 bucket. I have created a bucket for that, and put values into configuration file.

# Default configuration for s3-loader

# Sources currently supported are:
# 'kinesis' for reading records from a Kinesis stream
# 'nsq' for reading records from a NSQ topic
source = "kinesis"

# Sink is used for sending events which processing failed.
# Sinks currently supported are:
# 'kinesis' for writing records to a Kinesis stream
# 'nsq' for writing records to a NSQ topic
sink = "kinesis"

# The following are used to authenticate for the Amazon Kinesis sink.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
  accessKey = "xxx"
  secretKey = "xxx/"
}

# Config for NSQ
# nsq {
  # Channel name for NSQ source
  # If more than one application reading from the same NSQ topic at the same time,
  # all of them must have unique channel name for getting all the data from the same topic
  # channelName = "{{nsqSourceChannelName}}"

  # Host name for NSQ tools
  # host = "{{nsqHost}}"

  # HTTP port for nsqd
  # port = {{nsqdPort}}

  # HTTP port for nsqlookupd
  # lookupPort = {{nsqlookupdPort}}
# }

kinesis {
  # LATEST: most recent data.
  # TRIM_HORIZON: oldest available data.
  # "AT_TIMESTAMP": Start from the record at or after the specified timestamp
  # Note: This only affects the first run of this application on a stream.
  initialPosition = TRIM_HORIZON

  # Need to be specified when initialPosition is "AT_TIMESTAMP".
  # Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
  # Ex: "2017-05-17T10:00:00Z"
  # Note: Time need to specified in UTC.
  # initialTimestamp = "{{timestamp}}"

  # Maximum number of records to read per GetRecords call
  maxRecords = 1000

  region = "us-east-1"

  # "appName" is used for a DynamoDB table to maintain stream state.
  appName = "s3loader-test"
}

streams {
  # Input stream name
  inStreamName = "Stream-Enriched-Good"

  # Stream for events for which the storage process fails
  outStreamName = "S3-Process-Fail"

  # Events are accumulated in a buffer before being sent to S3.
  # The buffer is emptied whenever:
  # - the combined size of the stored records exceeds byteLimit or
  # - the number of stored records exceeds recordLimit or
  # - the time in milliseconds since it was last emptied exceeds timeLimit
  buffer {
    byteLimit = 1048576
    # Not supported by NSQ; will be ignored
    recordLimit = 100
    timeLimit = 60000
    # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "us-east-1"
  bucket = "enrich-s3-loader"

  # Format is one of lzo or gzip
  # Note, that you can use gzip only for enriched data stream.
  format = "gzip"

  # Maximum Timeout that the application is allowed to fail for
  maxTimeout = 120000
}

# Optional section for tracking endpoints
# monitoring {
#  snowplow{
#    collectorUri = "{{collectorUri}}"
#    collectorPort = 80
#    appId = "{{appName}}"
#    method = "{{method}}"
#  }
# }

I double checked all values and the same access keys are able to connect to streams and create tables in DynamoDB as well.

After I run it, I got this error.

configuration error: ConfigReaderFailures(KeyNotFound(nsq,Some(ConfigValueLocation(file:/home/ec2-user/loader2.conf,1)),Set()),List())

Do you have any advice?

Thank you!

Hi @anton,

Nevermind, I figured it out. NSQ needs some configuration values.

@anton
Hi, I have some questions about EmrEtlRunner configuration.

I am using stream enrich mode and editing the configuration file now, for S3 buckets, I created a couple for enriched and shredded. I just need to provide something like this: s3://emr-etl-enrich/good, right? I created a bucket named “emr-etl-enrich” and a folder “good”, no link/URL for the bucket need to provide, right?

Another is about ec2 key and subnet_id, there is a link in the instruction, but I believe it has been redirected. I go to that page and there is no useful information. So what do I need to do for this step? Creating a pair key and configure it’s name into it? How about the subnet ID?

Much appreciated all your help!

Yes, that’s right. I’m not even sure what other options we could have.

Yup, this is just an EC2 SSH key. You need to create one via EC2 console and specify its name. Using this key you’ll be able to log in to EMR master node (but I’m 99.9% sure you won’t ever need to do this). You can get list of existing SSH keys (or create new one) in AWS Console → EC2 → Key pairs.

subnet_id

Same here - it can be any EC2 subnet in your account. You can get list of existing Subnets (or create new one) in AWS Console → VPC → Subnets. IIRC pretty much any default configuration would work.

UPD: let’s use single thread, this one or RDB Loader, Storage Loader, EmrEtlRunner - #2 by ihor. Otherwise it’s hard to track what’s going on.

Much appreciated!
Thank you!

hello, @anton.

After create all of these and running the runner with config file and resolver json, I got the process completed. (showing completed successfully). I did not use redshift.json, because I am not understanding what is the real role of it and it’s optional.

Here is my problem, the S3 shredded bucket got files every time after running the runner, but these files are 0B and contain nothing. Absolutely, there is no data being loaded into Redshift. Do I need to do further configuration to fix this issue?

Thanks!

@anton More information about this, after s3 Loader running, files are getting into the S3 bucket with data (around 500b to 1kb). But after the runner running, everything is 0B.

Hi @AllenWeieiei,

You said:

Do you mean enriched bucket? The one you specified as aws.s3.buckets.enriched.stream in config.yml. If there’s data in enriched bucket then Shredder must produce something. Maybe it just sends data into aws.s3.buckets.shredded.bad bucket. Can you check it?

Can you also provides us an output of EmrEtlRunner? It should show you what steps were running.

It is optional only if you don’t need Redshift. This is configuration file, without which it is not possible to establish connection to Redshift cluster. Also RDB Loader won’t be launched if you don’t have redshift.json config file. So, if you need your data in Redshift - its pretty much mandatory.

@anton. Thanks!

When I said “files are getting into the S3 bucket with data (around 500b to 1kb)”, I mean after S3 Loader running, there are some files in the S3 bucket of the loader.

I took some screenshots of shredded and enriched folder. I do have some files in shredded bad folder but all of them are 0B as well.

So I have data in Kinesis stream (from collector), enriched stream (from enrich), S3 bucket (from S3 Loader), I don’t have data in emr-etl-enrich or emr-etl-shredded bucket (from emr-etl-runner). What I mean for I don’t have data is I have files there, but they are all 0B.

How do I generate the output of EmrEtlRunner? I want to provide it to here.

Thank you!

How do I generate the output of EmrEtlRunner? I want to provide it to here.

EmrEtlRunner generates output when you launch it in a terminal.

It seems that emr-etl-shred/good is your aws.s3.buckets.shredded.good bucket. What about aws.s3.buckets.shredded.archive? I bet all data is there. So EmrEtlRunner took data, shredded it and moved into archive. It should have also load it into Redshift, but didn’t because you don’t have redshift.json config. Add a Redshift config and it will start an RDB Loader.

yep, I have files are not empty in archive folder.

question about configuration file.

  1. I got a warning like this: I, [2019-10-23T13:35:24.475406 #20287] INFO – : Sending GET request to http://sgwebhook.moneymappress.com:443/i
    W, [2019-10-23T13:36:24.483339 #20287] WARN – : Failed to open TCP connection to sgwebhook.moneymappress.com:443 (execution expired) (Net::OpenTimeout)
    I pretty sure I am using the URL that my collector is using right now, and port is the number in my collector configuration file. Why is it failed? Although it’s failed, I can still get files into S3 bucket shredded.

  2. For the redshift.json, the value of schema would be “atomic.events”, is this correct? I created this table manually before as schema: atomic and name: events. Also, there are 11 types of data (subscribe, click… and so on), do we have the ability to load data into associated table?

Thank you!

@AllenWeieiei,

Indeed, when I try that (collector?) endpoint it does not respond - sgwebhook.moneymappress.com doesn’t appear to be operational.

The target configuration file requires schema name - that is it should be just “atomic”. You need to create all the tables for the corrersponding events you track. Their DDL scripts are in Iglu Central.

1 Like

ohhh,thanks!

Yeah, I have created all tables used these DDLs. So provide redshift.json will make the EmrEtlRunner loading data to these tables? Or just the atomic.events?

All tables that have correspondiing folders in shredded bucket.