Shredding & loading enriched events in near-real-time

Hi all,

After some research on this forum and on github I know that there is some plans to create a loader able to load stream enriched events into Redshift, but curently I would like to achieve kind of the same thing with the plain of jars release (r89).

This is our current pipeline (we currently don’t use the result of stream enriched)

This is what I would like to do:

Basically as we are enriching in near real time I just would like to shred and load the stream enriched data into Redshift.

I managed to do it by using the GZIP sink on the Stream enrich good stream, nevertheless it required some tweaks because by default the EmrEtlRunner expect enriched events files to follow a pattern containing part- which is not the case if you use one of the S3 Kinesis sink.

Is there any plan in supporting this flow, or will we see a stream shredder which could then have its own sink to load the data into Redshift afterwards?

Thanks in advance!

1 Like

Hey @AcidFlow - that’s great that you got this working. We have managed to set up something similar internally - but we used Dataflow Runner rather than EmrEtlRunner, which sidestepped the EmrEtlRunner incompatibilities you found:

Other changes we made:

  1. Reading/writing direct to/from S3 (i.e. skipping HDFS and S3DistCp)
  2. Running the job on a persistent cluster (saving on the cluster bootstrap times)

We’re continuing to tweak and improve this setup, with the goal of getting Redshift load frequency down to every 10 minutes or so. We will keep you posted!

Hi @alex !

Thanks for your answer and for pointing me to the dataflow-runner repository :slight_smile:
I didn’t think about it and it fulfill my needs!

I found it invaluable to use Kinesis firehose and lambda to read data straight from kinesis when creating the same type of real-time system.

I remember seeing a snowplow lambda event shredder somewhere but I can’t find it now. We created our own in-house solutions to forward events to third party systems and AWS firehose which puts things into s3 and then redshift (but can do elastic-search as well)

The only downside is that firehose will only write to redshift at minimum every 1mb or 60 seconds (whatever comes first), however this problem disappears with volume.

I hope this helps
Bruce

Hi there,

I have been trying to also setup this kind of config (Enriched stream to Redshift) but the StorageLoader is obviously not reading the files from the Kinesis S3 sink (from the enriched stream). As @alex suggests here is to use the dataflow-runner but I get stuck in how to configure this one. Could you get me some more clues on configuring this?

Thank you in advance!

If you want to process shred stream enriched events, first your kinesis s3 sink would need to store the enriched events in GZIP format (from what I understood from the source code).

Then I think you can do the following:

  • Spawn a cluster with the same bootstrap actions than the normal EmrEtlRunner
  • EMR step 1: s3distcp the enriched events from their current location to a staging one (so you don’t process your events twice)
  • EMR step 2:(Optional ?) Copy events from the staging location to HDFS using s3distcp
  • EMR step 3: Start the shredding process on the events and write them either to HDFS or S3
  • EMR step 4 (if you write the shredded events to HDFS): s3distcp the events back to s3

Now you should be able to run the storage loader on the output of the shredding job :slight_smile:

I tried this without using the dataflow-runner but by creating an EMR cluster manually, and the shredded events looked okay, so then it’s just a matter of automating it :wink:

Thanks a lot for your answer. I am quite new to EMR so I am going to figure out what you are saying. If you have tips or tips for pieces of source code which are helpful please let me know!

Here’s the beginnings of a Dataflow Runner playbook that should help you guys:

{
  "schema": "iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
  "data": {
    "region": "eu-west-1",
    "credentials": {
      "accessKeyId": "...",
      "secretAccessKey": "..."
    },
    "steps": [
      {
        "type": "CUSTOM_JAR",
        "name": "S3DistCp Step: Enriched events -> staging S3",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar",
        "arguments": [
          "--src","s3n://path-to-kinesis-s3-sink-output/",
          "--dest","s3n://my-working-bucket/enriched/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
          "--s3Endpoint","s3-eu-west-1.amazonaws.com",
          "--deleteOnSuccess"
        ]
      },
      {
        "type": "CUSTOM_JAR",
        "name": "Hadoop Shred: shred enriched events for Redshift",
        "actionOnFailure": "CANCEL_AND_WAIT",
        "jar": "s3://snowplow-hosted-assets/3-enrich/scala-hadoop-shred/snowplow-hadoop-shred-0.11.0-rc2.jar",
        "arguments": [
          "com.snowplowanalytics.snowplow.enrich.hadoop.ShredJob",
          "--hdfs",
          "--iglu_config",
          "{{base64File "/snowplow-config/iglu_resolver.json"}}",
          "--duplicate_storage_config",
          "{{base64File "/snowplow-config/targets/duplicate_dynamodb.json"}}",
          "--input_folder","s3n://my-working-bucket/enriched/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
          "--output_folder","s3n://my-working-bucket/shredded/good/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/",
          "--bad_rows_folder","s3n://my-working-bucket/shredded/bad/run={{timeWithFormat .epochTime "2006-01-02-15-04-05"}}/"
        ]
      }
    ],
    "tags": [
    ]
  }
}

Invoked with:

--vars epochTime,`date +%s`
```

Thanks a lot! I am going to experiment with this. Will come back with results or questions :wink:

My second step is failing at the moment and I can’t figure out why. It happens after a minute or 20. This is what the error tells me:

Exception in thread “main” cascading.flow.FlowException: step failed: (2/7), with job id: job_1501XXXXXXXX, please see cluster logs for failure messages.

Finding the clusterlogs with more information is kind of troublesome, suggestions on where to look / how to solve are welcome!

Thanks in advance!

Hi @alex,

Your Dataflow Runner example made me think how EmrEtlRunner and now Dataflow Runner relates to workflow/orchestration systems like Luigi or Airflow.

We have been orchestrating our Snowplow pipelines by invoking EmrEtlRunner for each atomic step from Airflow, achieved by --skipping every other step. This gives us visibility into each step in the pipeline and also allows us to just use aws s3 mv and aws s3 rm --recursive where Sluice fails us.

Dataflow Runner steps look awfully a lot like our Airflow tasks in our Snowplow DAGs. Airflow also has rudimentary EMR support to create ephemeral clusters and add steps to it.

Is it fair to say that Dataflow Runner is a lightweight orchestrator (“runner”) for (not just) Snowplow pipelines? Does Dataflow Runner try to slow anything else that I might’ve missed?

Thanks!
Gabor

Hi @rgabo - it sounds like we are thinking about all this in the same way.

Note that Sluice is no more - it was removed in Snowplow R91.

Yes, that’s correct. Although we still use EmrEtlRunner internally for all core Snowplow pipelines, we use Dataflow Runner for our R&D and non-standard/non-Snowplow jobs on EMR.

Dataflow Runner is built around the Unix philosophy - all it does is run jobflows, currently on EMR only. You can schedule it any way you like. And it’s fully declarative - it’s just Avro, so you can generate, lint or visualise a dataflow anyway you like. (We are also planning a native integration between Factotum and Dataflow Runner in the future, so you get that “view-through” that you described between Airflow and EmrEtlRunner.)

Note that a future release of EmrEtlRunner will generate Dataflow Runner playbooks, and a later release will then remove the EMR orchestration functionality from EmrEtlRunner altogether:

Hi @alex - we do think alike, that’s why we are happy, long-time Snowplow users :slight_smile:

Sluice is out, but our production pipeline is r87. We are putting together a new pipeline that has both streaming and batch components. Scala Stream Collector and Stream Enrich with S3 Sink for the enriched events and Elasticsearch Sink for real-time feedback on events and bad rows. We would still like to shred and load enriched events into Redshift for data modeling and analysis and we will most likely limit data in Elasticsearch to a couple of weeks to keep storage requirements low.

How exactly we orchestrate the batch steps of such a pipeline is why I am looking at Dataflow Runner and the new components. I really like that Sluice is no more and both staging and loading steps were moved to EMR.

I would imagine that it makes sense to stick with EmrEtlRunner for the time being, as it already delivers benefits such as staging logs and loading into Redshift from EMR. Dataflow Runner will just become part of that story, correct?

EmrEtlRunner doesn’t quite work with the enriched events from the S3 sink - so I’d probably use Dataflow Runner in your case.

We will start working on RT loading of Redshift from Kinesis relatively soon, which should simplify things further.

3 Likes