In many blogs, i have found suggested to use lambda function to transform TSV to JSON to save to S3 bucket. They have given link to function source code but no clear steps to implement Lambda function in snowplow. Can anyone help me how to implement lambda in snowplow?
You would create the Lambda function using AWS Lambda. https://aws.amazon.com/lambda/
However, if your goal is to write to S3, you will probably have more luck using the Snowplow S3 Loader: https://docs.snowplowanalytics.com/docs/setup-snowplow-on-aws/setup-destinations/
For future reference, you might find this an interesting read on consuming the Kinesis stream using an AWS Lambda in realtime: Real-time reporting using AWS Lambda and DynamoDB: a tutorial to compute the number of players in a game level on the Snowplow event stream (1/2)
@PaulBoocock at first, i have used Snowplow S3 Loader but it has done nothing so i was forced to use Aws Data Fireshose. When i try to use Aws Lambda in Data Fireshose, it give error
{"attemptsMade":4,"arrivalTimestamp":1600154159619,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"****","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:snowplow-json-transformer-lambda:$LATEST"}
{"attemptsMade":4,"arrivalTimestamp":1600154161523,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result.","attemptEndingTimestamp":1600154235846,"rawData":"*****=","lambdaArn":"arn:aws:lambda:ap-southeast-2:573188294151:function:snowplow-json-transformer-lambda:$LATEST"}
If i dont use lambda, log is created in S3 Good Enriched Bucket for page view event, but at the same time, log is created in S3 bad Enriched Bucket for the same page view event saying
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/collector_payload_format_violation/jsonschema/1-0-0","data":{"processor":{"artifact":"snowplow-stream-enrich","version":"1.0.0"},"failure":{"timestamp":"2020-09-15T07:16:02.488Z","loader":"thrift","message":{"error":"error deserializing raw event: Cannot read. Remote side has closed. Tried to read 2 bytes, but only got 1 bytes. (This is often indicative of an internal error on the server side. Please check your server logs.)"}},"payload":"****="}}
I have followed your documentation repeatedly but i am confused in the setup for stream enrich. What i did not understand is that , do we need to setup database for stream enrich if we are not using custom schema ? Because since i am trying to test with Page View event from Javascript Tracker, I have not setup any database, just provided access to create Dyanamo Db because strangly it is calling DyanmoDb even i have not used DyanmoDb prefix in the command. If this will not help to find the issue, i can show you all my configurations.
Please help me to setup snowplow. @PaulBoocock
@PaulBoocock here are my configurations:
collector.conf
collector.conf
collector {
interface = "0.0.0.0"
interface = ${?COLLECTOR_INTERFACE}
port = 8181
port = ${?COLLECTOR_PORT}
# optional SSL/TLS configuration
ssl {
enable = false
enable = ${?COLLECTOR_SSL}
# whether to redirect HTTP to HTTPS
redirect = false
redirect = ${?COLLECTOR_SSL_REDIRECT}
port = 9543
port = ${?COLLECTOR_SSL_PORT}
}
paths {
# "/com.acme/track" = "/com.snowplowanalytics.snowplow/tp2"
# "/com.acme/redirect" = "/r/tp2"
# "/com.acme/iglu" = "/com.snowplowanalytics.iglu/v1"
}
# Configure the P3P policy header.
p3p {
policyRef = "/w3c/p3p.xml"
CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
}
crossDomain {
enabled = false
# Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
enabled = ${?COLLECTOR_CROSS_DOMAIN_ENABLED}
domains = [ "*" ]
domains = [ ${?COLLECTOR_CROSS_DOMAIN_DOMAIN} ]
# Whether to only grant access to HTTPS or both HTTPS and HTTP sources
secure = true
secure = ${?COLLECTOR_CROSS_DOMAIN_SECURE}
}
cookie {
enabled = true
enabled = ${?COLLECTOR_COOKIE_ENABLED}
expiration = "365 days"
expiration = ${?COLLECTOR_COOKIE_EXPIRATION}
# Network cookie name
name = zanui_collector_cookie
name = ${?COLLECTOR_COOKIE_NAME}
domains = [
"{{cookieDomain1}}" # e.g. "domain.com" -> any origin domain ending with this will be matched and domain.com will be returned
"{{cookieDomain2}}" # e.g. "secure.anotherdomain.com" -> any origin domain ending with this will be matched and secure.anotherdomain.com will be returned
# ... more domains
]
domains += ${?COLLECTOR_COOKIE_DOMAIN_1}
domains += ${?COLLECTOR_COOKIE_DOMAIN_2}
fallbackDomain = ""
fallbackDomain = ${?FALLBACK_DOMAIN}
secure = false
secure = ${?COLLECTOR_COOKIE_SECURE}
httpOnly = false
httpOnly = ${?COLLECTOR_COOKIE_HTTP_ONLY}
sameSite = "{{cookieSameSite}}"
sameSite = ${?COLLECTOR_COOKIE_SAME_SITE}
}
doNotTrackCookie {
enabled = false
enabled = ${?COLLECTOR_DO_NOT_TRACK_COOKIE_ENABLED}
# name = {{doNotTrackCookieName}}
name = zanui-collector-do-not-track-cookie
# value = {{doNotTrackCookieValue}}
value = zanui-collector-do-not-track-cookie-value
}
cookieBounce {
enabled = false
enabled = ${?COLLECTOR_COOKIE_BOUNCE_ENABLED}
name = "n3pc"
name = ${?COLLECTOR_COOKIE_BOUNCE_NAME}
fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
fallbackNetworkUserId = ${?COLLECTOR_COOKIE_BOUNCE_FALLBACK_NETWORK_USER_ID}
forwardedProtocolHeader = "X-Forwarded-Proto"
forwardedProtocolHeader = ${?COLLECTOR_COOKIE_BOUNCE_FORWARDED_PROTOCOL_HEADER}
}
enableDefaultRedirect = true
enableDefaultRedirect = ${?COLLECTOR_ALLOW_REDIRECTS}
redirectMacro {
enabled = false
enabled = ${?COLLECTOR_REDIRECT_MACRO_ENABLED}
# Optional custom placeholder token (defaults to the literal `${SP_NUID}`)
placeholder = "[TOKEN]"
placeholder = ${?COLLECTOR_REDIRECT_REDIRECT_MACRO_PLACEHOLDER}
}
rootResponse {
enabled = false
enabled = ${?COLLECTOR_ROOT_RESPONSE_ENABLED}
statusCode = 302
statusCode = ${?COLLECTOR_ROOT_RESPONSE_STATUS_CODE}
# Optional, defaults to empty map
headers = {
Location = "https://127.0.0.1/",
Location = ${?COLLECTOR_ROOT_RESPONSE_HEADERS_LOCATION},
X-Custom = "something"
}
# Optional, defaults to empty string
body = "302, redirecting"
body = ${?COLLECTOR_ROOT_RESPONSE_BODY}
}
cors {
accessControlMaxAge = 5 seconds
accessControlMaxAge = ${?COLLECTOR_CORS_ACCESS_CONTROL_MAX_AGE}
}
# Configuration of prometheus http metrics
prometheusMetrics {
enabled = false
}
streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = snowplow-collected-good-events-stream
good = ${?COLLECTOR_STREAMS_GOOD}
# Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
bad = snowplow-collected-bad-events-stream
bad = ${?COLLECTOR_STREAMS_BAD}
useIpAddressAsPartitionKey = false
useIpAddressAsPartitionKey = ${?COLLECTOR_STREAMS_USE_IP_ADDRESS_AS_PARTITION_KEY}
sink {
enabled = kinesis
enabled = ${?COLLECTOR_STREAMS_SINK_ENABLED}
# Region where the streams are located
region = ap-southeast-2
region = ${?COLLECTOR_STREAMS_SINK_REGION}
threadPoolSize = 10
threadPoolSize = ${?COLLECTOR_STREAMS_SINK_THREAD_POOL_SIZE}
aws {
accessKey = env
accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
secretKey = env
secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
}
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
#minBackoff = {{minBackoffMillis}}
minBackoff = 10
#maxBackoff = {{maxBackoffMillis}}
maxBackoff = 10
}
}
buffer {
byteLimit = 4500000
byteLimit = ${?COLLECTOR_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit =500 # Not supported by Kafka; will be ignored
recordLimit = ${?COLLECTOR_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 5000
timeLimit = ${?COLLECTOR_STREAMS_BUFFER_TIME_LIMIT}
}
}
}
akka {
loglevel = DEBUG # 'OFF' for no logging, 'DEBUG' for all logging.
loglevel = ${?AKKA_LOGLEVEL}
loggers = ["akka.event.slf4j.Slf4jLogger"]
loggers = [${?AKKA_LOGGERS}]
http.server {
remote-address-header = on
remote-address-header = ${?AKKA_HTTP_SERVER_REMOTE_ADDRESS_HEADER}
raw-request-uri-header = on
raw-request-uri-header = ${?AKKA_HTTP_SERVER_RAW_REQUEST_URI_HEADER}
# Define the maximum request length (the default is 2048)
parsing {
max-uri-length = 32768
max-uri-length = ${?AKKA_HTTP_SERVER_PARSING_MAX_URI_LENGTH}
uri-parsing-mode = relaxed
uri-parsing-mode = ${?AKKA_HTTP_SERVER_PARSING_URI_PARSING_MODE}
}
}
}
Run Command:
java -Dcom.amazonaws.sdk.disableCbor -jar snowplow-stream-collector-kinesis-1.0.0.jar --config collector.conf
================================
enricher.conf
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = snowplow-collected-good-events-stream
raw = ${?ENRICH_STREAMS_IN_RAW}
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = snowplow-collected-good-events-stream
# Stream/topic where the event that failed enrichment will be stored
bad = snowplow-collected-bad-events-stream
bad = ${?ENRICH_STREAMS_OUT_BAD}
# Stream/topic where the pii tranformation events will end up
# pii = {{outPii}}
# pii = ${?ENRICH_STREAMS_OUT_PII}
partitionKey = event_id
partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}
sourceSink {
enabled = kinesis
enabled = ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}
region = ap-southeast-2
aws {
accessKey = env
accessKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY}
secretKey = env
secretKey = ${?ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY}
}
maxRecords = 10000
initialPosition = TRIM_HORIZON
initialTimestamp = "2020-09-10T10:00:00Z"
backoffPolicy {
minBackoff = 1000
minBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MIN_BACKOFF}
maxBackoff = 5000
maxBackoff = ${?ENRICH_STREAMS_SOURCE_SINK_BACKOFF_POLICY_MAX_BACKOFF}
}
}
buffer {
byteLimit = 1000000000
byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 10 # Not supported by Kafka; will be ignored
recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 5000
timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
}
appName = "zanui-enricher-app"
appName = ${?ENRICH_STREAMS_APP_NAME}
}
}
Run Command:
java -jar snowplow-stream-enrich-kinesis-1.0.0.jar --config enricher.conf --resolver file:resolver.json
====================
S3 Loader Config
source = "kinesis"
sink = "kinesis"
aws {
accessKey = "env"
secretKey = "env"
}
# Config for NSQ
nsq {
channelName = "nsqSourceChannelName"
# Host name for NSQ tools
host = "127.0.0.1"
# HTTP port for nsqd
port = 4150
# HTTP port for nsqlookupd
lookupPort = 4161
}
kinesis {
initialPosition = "TRIM_HORIZON"
initialTimestamp = "2017-05-17T10:00:00Z"
maxRecords = 10000
region = "ap-southeast-2"
appName = "zanui-enricher-app"
}
streams {
inStreamName = "snowplow-collected-good-events-stream"
outStreamName = "snowplow-collected-bad-events-stream"
buffer {
byteLimit = 1000000000 # Not supported by NSQ; will be ignored
recordLimit = 10
timeLimit = 5000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "ap-southeast-2"
bucket = "snowplow-enriched-good-events"
partitionedBucket = "snowplow-enriched-good-events/partitioned"
dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
outputDirectory = "zanui-enriched/good"
filenamePrefix = "zanui-output"
format = "gzip"
# Maximum Timeout that the application is allowed to fail for (in milliseconds)
maxTimeout = 300000 # 5 minutes
}
Run Command:
java -jar snowplow-s3-loader-0.6.0.jar --config my.conf
But this Snowplow S3 Loader not doing anything so i used Data Fireshose to transfer stream to S3 bucket.
@PaulBoocock now can you help me?
You might find this post useful as others have faced similar questions when around Enrich, S3 Loader and DynamoDB: Does Stream Enrich need DynamoDB connection?
Also searching for DynamoDB on this forum might yield you some help.
Unfortunately I’m not really able to help you much further but I this does seem to be an issue with how you have configured things on AWS, perhaps permissions.
@PaulBoocock i have gone through all those post and have given permission to create, write, read to dynamoDB. I think that is not the issue.
Ok can you make me sure do we need to do database setup if we are not using custom schema? If not where those data are saved?
Yes you do need DynamoDB as Stream Enrich uses the KCL (Kinesis Client Library) under the hood which in turn uses DynamoDB to manage its state.
@PaulBoocock, that was created in dynamodb already. It is working fine.
So whilst you might not be using custom schemas in your pipeline, the pipeline will still leverage schemas that are available on iglucentral.com and it will cache those in the pipeline. So, generally speaking, the idea of a custom schema is the same as the core schemas that are available publically for standard snowplow events, so you’ll need to make sure your pipeline is set up to work with these.
For example, when you track a page view using the JavaScript tracker, then it will use the web_page schema.
so do we need to register to iglucentral.com to setup the pipeline? I have used default resolver.json.
You don’t need to register, all the schemas are public, but you should make sure iglucentral.com is listed in your resolver.json.
{
"schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
"data": {
"cacheSize": 500,
"repositories": [
{
"name": "Iglu Central",
"priority": 0,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://iglucentral.com"
}
}
},
{
"name": "Iglu Central - GCP Mirror",
"priority": 1,
"vendorPrefixes": [ "com.snowplowanalytics" ],
"connection": {
"http": {
"uri": "http://mirror01.iglucentral.com"
}
}
}
]
}
}
That all looks fine. I can’t be sure what the Firehose error is regarding as I’ve never used that with Snowplow data. However, when I consider that error along with the error in the S3 loader, it does look like something is wrong with the payloads that they are trying to read. I’m not quite sure where the error is unfortunately as I’ve never seen that before.
I don’t see anything that is glaringly wrong with the configurations so my guess is this is something to do with how Kinesis is configured (for raw events) or something to do with the server configurations you are running the JARs on (either the collector or enrich) but if I’m honest, I’m really not sure at this point. You could attempt to set it up with the docker images (available on Docker Hub) instead but I appreciate that is quite a change from the approach you have taken so far.
@PaulBoocock i had setup Snowplow S3 loader but nothing happens when i trigger JavaScript trigger in the console where S3 loader is running. So i had to use aws firehose.
My snowplow structure:
And the configuration of Snowplow S3 loader:
source = "kinesis"
sink = "kinesis"
aws {
accessKey = "env"
secretKey = "env"
}
nsq {
channelName = "nsqSourceChannelName"
host = "127.0.0.1"
port = 4150
lookupPort = 4161
}
kinesis {
initialPosition = "TRIM_HORIZON"
initialTimestamp = "2017-05-17T10:00:00Z"
maxRecords = 10000
region = "ap-southeast-2"
appName = "application-enricher-app"
}
streams {
inStreamName = "snowplow-collected-good-events-stream"
outStreamName = "snowplow-collected-bad-events-stream"
buffer {
byteLimit = 1000000000 # Not supported by NSQ; will be ignored
recordLimit = 10
timeLimit = 5000 # Not supported by NSQ; will be ignored
}
}
s3 {
region = "ap-southeast-2"
bucket = "snowplow-enriched-good-events"
partitionedBucket = "snowplow-enriched-good-events/partitioned"
dateFormat = "{YYYY}/{MM}/{dd}/{HH}"
outputDirectory = "application-enriched/good"
filenamePrefix = "application-output"
format = "gzip"
maxTimeout = 300000 # 5 minutes
}