We are planning to extent our infrastructure to enable real-time analyses and real-time recommendations.
Is there any documentation how to accomplish this or best practice infrastructures/tools how to do this?
Using the Snowplow Analytics SDKs will help you parse the TSV event from the enriched pubsub stream in real time. Using Cloud Functions is a good starting point, or building your own consumers directly as a pubsub subscription. I’d also look at the new pubsub attribute filtering that you can now do with pubsub subscriptions, this has a potential to reduce the load as you might not need all your events going to your real time app.
Hi @PaulBoocock
thank you for pointing me to this tutorial!
How would we detach the real-time pipeline from the batch pipeline? I guess we will need to push the enriched event to two separate Pubsubs and continue from there?
You shouldn’t need to detach it if I’m understanding your use case. A typical set up would involve adding a new subscription to the enriched pubsub topic, and then consuming from there. Much in the same way the BigQuery loader currently consumes the stream (although this wouldn’t have to be a dataflow job, it could be a cloud function or something else entirely).
Sorry I got confused - it’s Friday afternoon here
Of course, your completely right. Having another subscription will work just fine.
Do you suggest to use Cloud Functions to process the events? Other tools that might work could be Cloud Run / Dataflow / Dataproc. Which tool would you recommend ?
After processing the event we plan to serve the results via Cloud Run / Firestore.
I think this depends a fair bit on what your use cases are - if it’s real time applications that you can do atomically, and don’t require any windowing then cloud functions is a good bet. If however you need windowing, or better parallelism then often Dataflow or Cloud Run is a better bet here.
Depending on what your domain is for recommendations, given that you are on GCP I’d be tempted to look at Recommendations AI. It’s heavily ecommerce / product focused and you’ll need some data to bootstrap with but you’ll be able to feed and serve recommendations in real time and tweak optimisation criteria.
Hi there! Jumping in because we recently added a Cloud Function that uses the Python SDK to parse the enriched events and thought my experience might be helpful!
We used Terraform (Infrastructure As Code) to specify our resource setup, and in the process learned that Cloud Functions that listen on PubSub topics create their own subscription object, so you don’t have to set that up manually. You just tell the Function which PubSub topic its messages will be coming from. This shouldn’t interfere with your existing Loader’s subscription. Also, each enriched event is sent as a separate message in the pubsub queue so your Function script can assume 1 message = 1 event payload and process accordingly.
I have a sample script which we wrote before we could confirm that was 1 to 1, so you’ll notice some batch processing tasks in there, but I hope you will find this helpful! We wanted to ingest our events into Snowflake rather than BigQuery and there’s no existing GCP Loader for that, so our Function just reads the PubSub message, uses the SDK “transform” function to shred it into JSON, then outputs it as a JSON file to a Cloud Storage bucket path.
from snowplow_analytics_sdk.snowplow_event_transformation_exception import SnowplowEventTransformationException
from snowplow_analytics_sdk.event_transformer import transform
from google.cloud import storage
import logging
import base64
import json
bucket_name = ""
write_path = "transformed"
def format_timestamp_for_filename(timestamp):
""" Parses the timestamp into partition folder name syntax. Drops the millisecond term and timezone marker, if any."""
return timestamp.replace('-', '/').replace('T', '/').replace(':', '').split('.')[0]
def parse_payload(payload: str):
""" Parses the payload string into an array of individual records. """
decoded = base64.b64decode(payload).decode('utf-8')
records = decoded.split("\n")
print("Received " + str(len(records)) + " enriched records.")
return records
def transform_records(records: list):
""" Uses the Snowplow Analytics SDK to "shred"/transform each Snowplow
event record into a JSON object.
"""
transformed = []
for idx in range(len(records)):
print("Transforming record " + str(idx) + "/" + str(len(records)) + " into JSON.")
try:
json_record = transform(records[idx])
print("Successfully transformed record.")
except SnowplowEventTransformationException as sete:
logging.error(sete)
return
transformed.append(json_record)
return transformed
def write_output(context, batch, bucket, path):
""" Writes a batch of transformed records to a path in the Snowflake staging bucket. """
batch_id = context.event_id
timestamp = format_timestamp_for_filename(context.timestamp)
client = storage.Client()
bucket = client.get_bucket(bucket)
for idx in range(len(batch)):
filename = path + "/" + timestamp + "/" + batch_id + "_" + str(idx) + ".json"
blob = bucket.blob(filename)
print("Writing JSON record to file: " + filename)
byte_encoded_json_record = json.dumps(batch[idx])
try:
blob.upload_from_string(byte_encoded_json_record, content_type='application/json')
print("Successfully wrote the record.")
except Exception as e:
logging.error(" Problem writing record to the bucket: \n" + batch[idx])
logging.error(e)
return
def main(event, context):
""" Receives a PubSub message as event and context objects in the following formats:
event: {'@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage', 'attributes': {'app_id': '<app-id>'}, 'data': '...'}
context: {event_id: #######, timestamp: 2021-08-10T18:42:11.477Z, event_type: providers/cloud.pubsub/eventTypes/topic.publish, resource: projects/<project>/topics/<topic-name>}
Executes the payload parse, record transformation, and write to output steps.
"""
print("The function was triggered by messageId " + context.event_id + " published at " + context.timestamp + " to " + context.resource)
if 'data' not in event:
logging.error(" No data in this message: " + context.event_id)
return
records_tsv = parse_payload(event['data'])
transformed = transform_records(records_tsv)
write_output(context, transformed, bucket_name, write_path)
Thanks for sharing this @mariah_rogers I’m sure a lot of others will find this very useful.
This is my main annoyance with Cloud Functions - I’m hoping they will roll out batching at some stage otherwise having to use Dataflow / Cloud Run becomes a necessity.