NodeJs Unstructured Events - http://iglucentral.com/

Hey,

I’m using the NodeJS Event Tracker Library. I’m running into Schema errors and my events end up in bad rows, adapter/version mismatch. I’m currently using Iglu Schemas hosted on IgluCentral. It’s not clear to me in docs how to use adapters such as Google,Segment,Mailchimp.

If we want to use the out the box schemas hosted on iglu central how would we construct the payload and what extra files do we need to add to the snowplow-enrichment config files?

My end goal is to use iglu:com.google.tag-manager.server-side/purchase/jsonschema/1-0-0 to send events to snowplow enrichment app on server and pass validation.

My Code

function initialiseEmitter (url) {
  const e = snowplow.gotEmitter({
    endpoint: url, // http://localhost:8080 => running locally
    protocol: snowplow.HttpProtocol.HTTPS,
    port: 8080,
    method: snowplow.HttpMethod.POST,
    bufferSize: 5,
    callback: (err, res) => {
      if (err) {
        log.info('error.emitting.snowplow.event', { err });
      } else {
        log.info('snowplow.running');
      }
    }
  });
  return e;
}
      const snowplowTracker = snowplow.initialise(snowplowUrl);
      snowplowTracker.setUserId(userData.customerId);
      snowplowTracker.track(buildSelfDescribingEvent({
        event: {
          schema: 'iglu:com.google.analytics.measurement-protocol/page_view/jsonschema/1-0-0',
          data: {
            documentLocationUrl: 'test',
            documentHostName: 'test',
            documentPath: 'test',
            documentTitle: 'test'
          }
        }
      }));

My Error

{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/adapter_failures/jsonschema/1-0-0","data":{"processor":{"artifact":"snowplow-enrich-kinesis","version":"3.2.3"},"failure":{"timestamp":"2022-10-23T15:35:17.003134Z","vendor":"config","version":"getuser","messages":[{"field":"vendor/version","value":"config/getuser","expectation":"vendor/version combination is not supported"}]},"payload":{"vendor":"config","version":"getuser","querystring":[{"name":"index","value":"0"}],"contentType":null,"body":null,"collector":"ssc-2.8.0-kinesis","encoding":"UTF-8","hostname":"18.168.88.230","timestamp":"2022-10-23T15:35:14.934Z","ipAddress":"45.61.185.198","useragent":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:76.0) Gecko/20100101 Firefox/76.0","refererUri":null,"headers":["Timeout-Access: <function1>","X-Forwarded-For: 45.61.185.198","X-Forwarded-Proto: http","X-Forwarded-Port: 80","Host: 18.168.88.230","X-Amzn-Trace-Id: Root=1-63555f32-1a8595d273547e2d2e75ab00","User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:76.0) Gecko/20100101 Firefox/76.0","Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8","Accept-Language: en-GB, en;q=0.5","Accept-Encoding: gzip, deflate","Upgrade-Insecure-Requests: 1"],"networkUserId":"aadf37d1-ebd7-48c4-a849-7378106be458"}}}

This seems to be logical place to find out how to do this but it’s an empty page: Tutorial: define, track and query your own custom event | Snowplow Documentation

Hi @sFrampton
Thanks for pointing this out. I’ve raised it internally. Will update you when I know more.
Cheers,
Eddie

@EddieM - I wondering if someone on Snowplow team could help with issue I’m facing with node browser tracker.

Curl Request is processed by enrichment app fine

{
    "schema": "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4",
    "data": [
        {
            "e": "ue",
            "tv": "curl",
            "p": "srv",
            "ue_pr": "{\"schema\":\"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0\",\"data\":{\"schema\":\"iglu:com.google.analytics.measurement-protocol/page_view/jsonschema/1-0-0\",\"data\":{\"documentLocationUrl\":\"test\",\"documentHostName\":\"test\",\"documentPath\":\"test\",\"documentTitle\":\"test\"}}}"
        }
    ]
}

NodeJS event emitted in the app is rejected. As far as I can see these requests are identical but the node npm package fails and curl request works fine.

const res = snowplowTracker.track(buildSelfDescribingEvent({
        event: {
          schema: 'iglu:com.google.analytics.measurement-protocol/page_view/jsonschema/1-0-0',
          // iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4
          data: {
            documentLocationUrl: 'test',
            documentHostName: 'test',
            documentPath: 'test',
            documentTitle: 'test'
          }
        }
      }));
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/adapter_failures/jsonschema/1-0-0","data":{"processor":{"artifact":"snowplow-enrich-kinesis","version":"3.2.3"},"failure":{"timestamp":"2022-10-23T17:48:03.502222Z","vendor":".git","version":"config","messages":[{"field":"vendor/version","value":".git/config","expectation":"vendor/version combination is not supported"}]},"payload":{"vendor":".git","version":"config","querystring":[],"contentType":null,"body":null,"collector":"ssc-2.8.0-kinesis","encoding":"UTF-8","hostname":"13.40.253.245","timestamp":"2022-10-23T17:48:01.377Z","ipAddress":"41.142.47.1","useragent":"python-httpx/0.23.0","refererUri":null,"headers":["Timeout-Access: <function1>","X-Forwarded-For: 41.142.47.1","X-Forwarded-Proto: http","X-Forwarded-Port: 80","Host: 13.40.253.245","X-Amzn-Trace-Id: Root=1-63557e51-6bd663e973cd03d11777efbe","Accept: */*","Accept-Encoding: gzip, deflate, br","User-Agent: python-httpx/0.23.0"],"networkUserId":"4d6eac11-9f01-49bf-9a70-659cc745e847"}}}

Hey @sFrampton,

Regarding the tracking code – I think it looks good, but I suspect that you are running into validation errors with the event body. If you check the login schema here, you can see that the only property that it contains is the method and doesn’t allow for additional properties. Since you are tracking documentLocationUrl and other properties, I expect that the event is not passing validation and that’s why it’s going to the bad rows.

I am a bit confused by the error message that you posted and will let others comment on it if they recognize it, but wanted to flag the above problem anyway in case it helps.

1 Like

Nevermind my last suggestion, I posted the message just after your last message in which you used a different schema that contains the tracked properties.

@matus no worries, I think the body is okay because it works in curl request.

Main issue when I decode base64 response is vendor mismatch error only when events are sent from NodeJS npm package.

Hi @sFrampton and @matus ,

The error shows:

{
  "field": "vendor/version",
  "value": "config/getuser",
  "expectation": "vendor/version combination is not supported"
}

It seems that the events are sent to config/getuser path in the URI of the collector, whereas they should get sent to com.snowplowanalytics.snowplow/tp2.

@matus is the path automatically inferred depending on the event type or could it be misconfigured somehow ?

1 Like

@BenB Thanks for explaining error better. I’ve shared my enrichment config & collector config. I’m using "@snowplow/node-tracker": "^3.6.0"

As far as I can see I don’t see how I can set path or URI to config/getuser. I’ll play around with my init config see if I’m done something irregular.

collector {
  interface = "0.0.0.0"
  port = 8080
  ssl {
    enable = false
    redirect = false
    port = 8443
  }
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }
  crossDomain {
    enabled = false
    # Domains that are granted access, *.acme.com will match http://acme.com and http://sub.acme.com
    domains = [ "*" ]
    # Whether to only grant access to HTTPS or both HTTPS and HTTP sources
    secure = true
  }
  cookie {
    enabled = true
    expiration = "365 days"
    name = sp
    domains = []
    #fallbackDomain = ""
    secure = true
    httpOnly = false
    sameSite = "None"
  }
  doNotTrackCookie {
    enabled = false
    name = ""
    value = ""
  }
  cookieBounce {
    enabled = false
    name = "n3pc"
    fallbackNetworkUserId = "00000000-0000-4000-A000-000000000000"
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }
  enableDefaultRedirect = false
  redirectMacro {
    enabled = false
    placeholder = "[TOKEN]"
  }
  rootResponse {
    enabled = false
    statusCode = 302
    headers = {}
    body = "302, redirecting"
  }
  cors {
    accessControlMaxAge = "5 seconds"
  }
  prometheusMetrics {
    enabled = false
  }
  streams {
    good = "collector-payloads"
    bad = "bad-collector-payloads"
    useIpAddressAsPartitionKey = false
    sink {
      enabled = kinesis
      region = "eu-west-2"
      threadPoolSize = 10
      aws {
        accessKey = env
        secretKey = env
      }
      backoffPolicy {
        minBackoff = 3000
        maxBackoff = 10000
      }
    }
    buffer {
      byteLimit = 3145728
      recordLimit = 500
      timeLimit = 5000
    }
  }
  telemetry {
     disable = false
     interval = 60 minutes
     # Connection properties for the receiving pipeline
     method = POST
     url = telemetry-g.snowplowanalytics.com
     port = 443
     secure = true
  }
}
akka {
  loglevel = WARNING
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  http.server {
    remote-address-header = on
    raw-request-uri-header = on
    parsing {
      max-uri-length = 32768
      uri-parsing-mode = relaxed
    }
    max-connections = 2048
  }
}

{
  # Where to read collector payloads from
  "input": {
    "type": "Kinesis"

    # Optional. Name of the application which the KCL daemon should assume
    "appName": "snowplow-enrich-kinesis"

    # Name of the Kinesis stream to read from
    "streamName": "collector-payloads"

    # Optional. Region where the Kinesis stream is located
    # This field is optional if it can be resolved with AWS region provider chain.
    # It checks places like env variables, system properties, AWS profile file.
    # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
    "region": "eu-west-2"

    # Optional, set the initial position to consume the Kinesis stream
    # Must be TRIM_HORIZON, LATEST or AT_TIMESTAMP
    # LATEST: most recent data.
    # TRIM_HORIZON: oldest available data.
    # AT_TIMESTAMP: start from the record at or after the specified timestamp
    "initialPosition": {
      "type": "TRIM_HORIZON"
    }
    # "initialPosition": {
    #   "type": "AT_TIMESTAMP"
    #   "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
    # }

    # Optional, set the mode for retrieving records.
    "retrievalMode": {
      "type": "Polling"

      # Maximum size of a batch returned by a call to getRecords.
      # Records are checkpointed after a batch has been fully processed,
      # thus the smaller maxRecords, the more often records can be checkpointed
      # into DynamoDb, but possibly reducing the throughput.
      "maxRecords": 10000
    }
    # "retrievalMode": {
    #   "type": "FanOut"
    # }

    # Optional. Size of the internal buffer used when reading messages from Kinesis,
    # each buffer holding up to maxRecords from above
    "bufferSize": 3

    # Optional. Settings for backoff policy for checkpointing.
    # Records are checkpointed after all the records of the same chunk have been enriched
    "checkpointBackoff": {
      "minBackoff": 100 milliseconds
      "maxBackoff": 10 seconds
      "maxRetries": 10
    }

    # Optional, endpoint url configuration to override aws kinesis endpoints
    # Can be used to specify local endpoint when using localstack
    # "customEndpoint": "http://localhost:4566"

    # Optional, endpoint url configuration to override aws dyanomdb endpoint for Kinesis checkpoints lease table
    # Can be used to specify local endpoint when using localstack
    # "dynamodbCustomEndpoint": "http://localhost:4569"

    # Optional, endpoint url configuration to override aws cloudwatch endpoint for metrics
    # Can be used to specify local endpoint when using localstack
    # "cloudwatchCustomEndpoint": "http://localhost:4582"
  }

  "output": {
    # Enriched events output
    "good": {
      "type": "Kinesis"

      # Name of the Kinesis stream to write to
      "streamName": "enriched"

      # Optional. Region where the Kinesis stream is located
      # This field is optional if it can be resolved with AWS region provider chain.
      # It checks places like env variables, system properties, AWS profile file.
      # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
      "region": "eu-west-2"

      # Optional. How the output stream/topic will be partitioned in Kinesis
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible partition keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # "partitionKey": "user_id"

      # Optional. Policy to retry if writing to kinesis fails.
      # This policy is used in 2 places:
      # - When the PutRecords request errors
      # - When the requests succeeds but some records couldn't get inserted
      "backoffPolicy": {
        "minBackoff": 100 milliseconds
        "maxBackoff": 10 seconds
        "maxRetries": 10
      }

      # Optional. Limits the number of events in a single PutRecords request.
      # Several requests are made in parallel
      # Maximum allowed: 500
      "recordLimit": 500

      # Optional. Limits the number of bytes in a single PutRecords request,
      # including records and partition keys.
      # Several requests are made in parallel
      # Maximum allowed: 5 MB
      "byteLimit": 5242880

      # Optional. Use a custom Kinesis endpoint.
      # Can be used for instance to work locally with localstack
      # "customEndpoint": "https://localhost:4566"
    }

    # Pii events output
    "pii": {
      "type": "Kinesis"

      # Name of the Kinesis stream to write to
      "streamName": "enriched"

      # Optional. Region where the Kinesis stream is located
      # This field is optional if it can be resolved with AWS region provider chain.
      # It checks places like env variables, system properties, AWS profile file.
      # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
      "region": "eu-west-2"

      # Optional. How the output stream/topic will be partitioned in Kinesis
      # Possible partition keys are: event_id, event_fingerprint, domain_userid, network_userid,
      # user_ipaddress, domain_sessionid, user_fingerprint
      # Refer to https://github.com/snowplow/snowplow/wiki/canonical-event-model to know what the
      # possible parittion keys correspond to.
      # Otherwise, the partition key will be a random UUID.
      # "partitionKey": "user_id"

      # Optional. Policy to retry if writing to kinesis fails.
      # This policy is used in 2 places:
      # - When the PutRecords request errors
      # - When the requests succeeds but some records couldn't get inserted
      "backoffPolicy": {
        "minBackoff": 100 milliseconds
        "maxBackoff": 10 seconds
        "maxRetries": 10
      }

      # Optional. Limits the number of events in a single PutRecords request.
      # Several requests are made in parallel
      # Maximum allowed: 500
      "recordLimit": 500

      # Optional. Limits the number of bytes in a single PutRecords request,
      # including records and partition keys.
      # Several requests are made in parallel
      # Maximum allowed: 5 MB
      "byteLimit": 5242880

      # Optional. Use a custom Kinesis endpoint.
      # Can be used for instance to work locally with localstack
      # "customEndpoint": "https://localhost:4566"
    }

    # Bad rows output
    "bad": {
      "type": "Kinesis"

      # Name of the Kinesis stream to write to
      "streamName": "enriched-bad"

      # Optional. Region where the Kinesis stream is located
      # This field is optional if it can be resolved with AWS region provider chain.
      # It checks places like env variables, system properties, AWS profile file.
      # https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/regions/providers/DefaultAwsRegionProviderChain.html
      "region": "eu-west-2"

      # Optional. Policy to retry if writing to kinesis fails.
      # This policy is used in 2 places:
      # - When the PutRecords request errors
      # - When the requests succeeds but some records couldn't get inserted
      "backoffPolicy": {
        "minBackoff": 100 milliseconds
        "maxBackoff": 10 seconds
        "maxRetries": 10
      }

      # Optional. Limits the number of events in a single PutRecords request.
      # Several requests are made in parallel
      # Maximum allowed: 500
      "recordLimit": 500

      # Optional. Limits the number of bytes in a single PutRecords request,
      # including records and partition keys.
      # Several requests are made in parallel
      # Maximum allowed: 5 MB
      "byteLimit": 5242880

      # Optional. Use a custom Kinesis endpoint.
      # Can be used for instance to work locally with localstack
      # "customEndpoint": "https://localhost:4566"
    }
  }

  # Optional. Concurrency of the app
  "concurrency" : {
    # Number of events that can get enriched at the same time within a chunk
    "enrich": 256
    # Number of chunks that can get sunk at the same time
    # WARNING: if greater than 1, records can get checkpointed before they are sunk
    "sink": 1
  }

  # Optional, period after which enrich assets should be checked for updates
  # no assets will be updated if the key is absent
  "assetsUpdatePeriod": "7 days"

  # Optional, configuration of remote adapters
  "remoteAdapters": {
    # how long enrich waits to establish a connection to remote adapters
    "connectionTimeout": "10 seconds",
    # how long enrich waits to get a response from remote adapters
    "readTimeout": "45 seconds",
    # how many connections enrich opens at maximum for remote adapters
    # increasing this could help with throughput in case of adapters with high latency
    "maxConnections": 10,
    # a list of remote adapter configs
    "configs": [
      {
        "vendor": "com.example",
        "version": "v1",
        "url": "https://remote-adapter.com"
      }
    ]
  }

  "monitoring": {

    # Optional, configure how metrics are reported
    "metrics": {

      # Optional. Send metrics to a StatsD server on localhost
      "statsd": {
        "hostname": "localhost"
        "port": 8125

        # Required, how frequently to report metrics
        "period": "10 seconds"

        # Any key-value pairs to be tagged on every StatsD metric
        "tags": {
          "app": enrich
        }

        # Optional, override the default metric prefix
        # "prefix": "snowplow.enrich."
      }

      # Optional. Log to stdout using Slf4j
      "stdout": {
        "period": "10 seconds"

        # Optional, override the default metric prefix
        # "prefix": "snowplow.enrich."
      }

      # Optional. Send KCL and KPL metrics to Cloudwatch
      "cloudwatch": true
    }
  }

  # Optional, configure telemetry
  # All the fields are optional
  "telemetry": {

    # Set to true to disable telemetry
    "disable": false

    # Interval for the heartbeat event
    "interval": 15 minutes

    # HTTP method used to send the heartbeat event
    "method": POST

    # URI of the collector receiving the heartbeat event
    "collectorUri": collector-g.snowplowanalytics.com

    # Port of the collector receiving the heartbeat event
    "collectorPort": 443

    # Whether to use https or not
    "secure": true

    # Identifier intended to tie events together across modules,
    # infrastructure and apps when used consistently
    "userProvidedId": my_pipeline

    # ID automatically generated upon running a modules deployment script
    # Intended to identify each independent module, and the infrastructure it controls
    "autoGeneratedId": hfy67e5ydhtrd

    # Unique identifier for the VM instance
    # Unique for each instance of the app running within a module
    "instanceId": 665bhft5u6udjf

    # Name of the terraform module that deployed the app
    "moduleName": enrich-kinesis-ce

    # Version of the terraform module that deployed the app
    "moduleVersion": 1.0.0
  }

  # Optional. To activate/deactive enrich features that are still in beta
  # or that are here for transition.
  # This section might change in future versions
  "featureFlags" : {

    # Enrich 3.0.0 introduces the validation of the enriched events against atomic schema
    # before emitting.
    # If set to false, a bad row will be emitted instead of the enriched event
    # if validation fails.
    # If set to true, invalid enriched events will be emitted, as before.
    # WARNING: this feature flag will be removed in a future version
    # and it will become impossible to emit invalid enriched events.
    # More details: https://github.com/snowplow/enrich/issues/517#issuecomment-1033910690
    "acceptInvalid": false

    # In early versions of enrich-kinesis and enrich-pubsub (pre-3.1.4), the Javascript enrichment
    # incorrectly ran before the currency, weather, and IP Lookups enrichments. Set this flag to true
    # to keep the erroneous behaviour of those previous versions. This flag will be removed in a
    # future version.
    # More details: https://github.com/snowplow/enrich/issues/619
    "legacyEnrichmentOrder": false
  }
}

Good point, the path to the collector is automatically set to /com.snowplowanalytics.snowplow/tp2 for POST. So the endpoint in snowplow.gotEmitter should be just the collector domain. It shouldn’t event contain the protocol as that it is automatically prepended as well (https:// in this case).

@sFrampton could you please make sure that the url that you set when creating the emitter only contains the domain without the procol and path? So it should be something like localhost:8080 or any other domain.

@matus - thanks for help, this is my emitter config. I’ve hard coded the url for now - unfortunately I still get same vendor issue.

function initialiseEmitter (url) {
  const e = gotEmitter({
    endpoint: 'localhost:8080',
    port: 8080,
    method: snowplow.HttpMethod.POST,
    bufferSize: 1,
    callback: (err, res) => {
      if (err) {
        log.info('error.emitting.snowplow.event', { err });
      } else {
        log.info('snowplow.emitted.event');
      }
    }
  });
  return e;
}
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/adapter_failures/jsonschema/1-0-0","data":{"processor":{"artifact":"snowplow-enrich-kinesis","version":"3.2.3"},"failure":{"timestamp":"2022-10-23T15:35:17.003134Z","vendor":"config","version":"getuser","messages":[{"field":"vendor/version","value":"config/getuser","expectation":"vendor/version combination is not supported"}]},"payload":{"vendor":"config","version":"getuser","querystring":[{"name":"index","value":"0"}],"contentType":null,"body":null,"collector":"ssc-2.8.0-kinesis","encoding":"UTF-8","hostname":"18.168.88.230","timestamp":"2022-10-23T15:35:14.934Z","ipAddress":"45.61.185.198","useragent":"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:76.0) Gecko/20100101 Firefox/76.0","refererUri":null,"headers":["Timeout-Access: <function1>","X-Forwarded-For: 45.61.185.198","X-Forwarded-Proto: http","X-Forwarded-Port: 80","Host: 18.168.88.230","X-Amzn-Trace-Id: Root=1-63555f32-1a8595d273547e2d2e75ab00","User-Agent: Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:76.0) Gecko/20100101 Firefox/76.0","Accept: text/html, application/xhtml+xml, application/xml;q=0.9, image/webp, */*;q=0.8","Accept-Language: en-GB, en;q=0.5","Accept-Encoding: gzip, deflate","Upgrade-Insecure-Requests: 1"],"networkUserId":"aadf37d1-ebd7-48c4-a849-7378106be458"}}}

Okay @matus @BenB , I checked out source code of gotEmitter of NodeJS tracker.

By passing locahost:8080 I was getting URL below

"targetUrl" "https://localhost:8080:8080/com.snowplowanalytics.snowplow/tp2"

Passing localhost:

"targetUrl" "https://localhost:8080/com.snowplowanalytics.snowplow/tp2"
function initialiseEmitter (url) {
  const e = gotEmitter({
    endpoint: 'localhost',
    port: 8080,
    protocol: snowplow.HttpProtocol.HTTPS,
    method: snowplow.HttpMethod.POST,
    bufferSize: 1,
    callback: (err, res) => {
      if (err) {
        log.info('error.emitting.snowplow.event', { err });
      } else {
        log.info('snowplow.emitted.event');
      }
    }
  });
  return e;
}
2 Likes

Great that you could get it to work @sFrampton !