Iglu schema with nested properties? (Segment webhook)

Hi there,

I’m trying to use the Iglu webhook to receive data from Segment.

Here is an example of data from their “page” event webhook:

{
    "version"       : 1,
    "type"          : "page",
    "userId"        : "019mr8mf4r",
    "properties"    : {
        "path"      : "/pricing",
        "referrer"  : "https://segment.com",
        "title"     : "Segment Pricing",
        "url"       : "https://segment.com/pricing"
    },
    "timestamp" : "2012-12-02T00:30:08.276Z",
    "name": "Test page"
}

As you can see, the details of the event (path, referrer, title , url) are nested inside properties. From what I’ve seen in their docs and their UI, I don’t think I can change the way they return the data.

Is there a way to describe such nested properties in the Iglu jsonschema?

I tested with the following jsonschema found on iglucentral and an autogenerated Redshift DDL, but only the name column has been filled in Redshift (i.e. Test page). All the other columns (path, referrer, search, title, url) are empty. The jsonschema is 7 years old so maybe Segment changed their webhook data since then (was flat before and now it’s nested)?

Yes - this can be nested under properties for example but you are correct in that the old JSON schema no longer conforms to what it looks like the current webhooks format is.

If you are just tracking page events I’d be tempted to define your own schema so that you can override this and then specify that as part of the webhook URL in the Iglu webhook adapter.

If you want to do capture more complex events (e.g., tracks) then I think it’s probably worth considering Segment Functions (or equivalent) to transform and map the events to your Snowplow equivalent events (as this can’t be easily translated to an Iglu schema as track will not have the same schema for all events).

Thanks @mike. So would something like this work?

{
	"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
	"description": "Schema for Segment page entity (via webhook), https://segment.com/docs/spec/page/#properties",
	"self": {
		"vendor": "com.segment",
		"name": "page",
		"format": "jsonschema",
		"version": "1-0-0"
	},
	"type": "object",
    "properties": {
        "name": {
            "type": "string"
        },
        "properties": {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "maxLength": 8192
                },
                "referrer": {
                    "type": "string"
                },
                "search": {
                    "type": "string"
                },
                "title": {
                    "type": "string"
                },
                "url": {
                    "type": "string",
                    "format": "uri",
                    "maxLength": 8192
                }
            }
        }
    },
	"additionalProperties": true
}

and

CREATE TABLE IF NOT EXISTS atomic.com_segment_page_1 (
    "schema_vendor"                        VARCHAR(128)  ENCODE ZSTD NOT NULL,
    "schema_name"                          VARCHAR(128)  ENCODE ZSTD NOT NULL,
    "schema_format"                        VARCHAR(128)  ENCODE ZSTD NOT NULL,
    "schema_version"                       VARCHAR(128)  ENCODE ZSTD NOT NULL,
    "root_id"                              CHAR(36)      ENCODE RAW  NOT NULL,
    "root_tstamp"                          TIMESTAMP     ENCODE ZSTD NOT NULL,
    "ref_root"                             VARCHAR(255)  ENCODE ZSTD NOT NULL,
    "ref_tree"                             VARCHAR(1500) ENCODE ZSTD NOT NULL,
    "ref_parent"                           VARCHAR(255)  ENCODE ZSTD NOT NULL,
    "name"                                 VARCHAR(8192) ENCODE ZSTD,
    "properties.path"                      VARCHAR(8192) ENCODE ZSTD,
    "properties.referrer"                  VARCHAR(8192) ENCODE ZSTD,
    "properties.search"                    VARCHAR(8192) ENCODE ZSTD,
    "properties.title"                     VARCHAR(8192) ENCODE ZSTD,
    "properties.url"                       VARCHAR(8192) ENCODE ZSTD,
	FOREIGN KEY (root_id) REFERENCES atomic.events(event_id)
)
DISTSTYLE KEY
DISTKEY (root_id)
SORTKEY (root_tstamp);

COMMENT ON TABLE atomic.com_segment_page_1 IS 'iglu:com.segment/page/jsonschema/1-0-0';

I would do
additionalProperties => false
referrer => allow nullable ("type": ["string", "null"])
search => allow nullable

as well as including other meta fields in your original payload (e.g., version, type, userId etc).

for the Redshift table definition Snowplow (rdb loader specifically) will generate and run this for you automatically so you shouldn’t need to execute this assuming you are on a reasonably recent version of the pipeline.

1 Like

Thanks @Mike. I’m trying to add the modified jsonschema in my registry.

As I already had "iglu:com.segment/page/jsonschema/1-0-0" in my registry and because I couldn’t delete it ({"message":"DELETE is forbidden on production registry"}) or replace it ({"message":"Schema already exists"}), I created another one with a different version: "iglu:com.segment/page/jsonschema/2-0-0". If I do a GET on my registry I see both.

Now I’m sending events on both schemas but I see only the events for 1-0-0 in the logs of the shredder and the loader. I don’t see any events for 2-0-0.

Any idea what could cause this?

The URLs:

  • [COLLECTOR HOST]/com.snowplowanalytics.iglu/v1?schema=iglu%3Acom.segment%2Fpage%2Fjsonschema%2F1-0-0&aid=[APP_ID]
  • [COLLECTOR HOST]/com.snowplowanalytics.iglu/v1?schema=iglu%3Acom.segment%2Fpage%2Fjsonschema%2F2-0-0&aid=[APP_ID]

This is my iglu resolver:

{
  "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-1",
  "data": {
    "cacheSize": 500,
    "repositories": [
      {
        "name": "my iglu server",
        "priority": 0,
        "vendorPrefixes": ["com.snowplowanalytics"],
        "connection": {
          "http": {
            "uri": "[IGLU SERVER HOST]/api",
            "apikey": "[API KEY]"
          }
        }
      }
    ]
  }
}

and my config:

{
  # Human-readable identificator, can be random
  "name": "Acme Redshift",
  # Machine-readable unique identificator, must be UUID
  "id": "123e4567-e89b-12d3-a456-426655440000",

  # Data Lake (S3) region
  "region": "us-west-2",
  # SQS topic name used by Shredder and Loader to communicate
  "messageQueue": ${SQS_TOPIC},

  # Shredder-specific configs
  "shredder": {
    # "batch" for Spark job and "stream" for fs2 streaming app
    "type" : "stream",
    # For batch: path to enriched archive (must be populated separately with run=YYYY-MM-DD-hh-mm-ss directories) for S3 input
    # "input": "s3://bucket/input/",
    # For stream: appName, streamName, region triple for kinesis
    "input": {
     # kinesis and file are the only options for stream shredder
     "type": "kinesis",
     # KCL app name - a DynamoDB table will be created with the same name
     "appName": ${DYNAMO_TABLE},
     # Kinesis Stream name
     "streamName": ${KINESIS_STREAM_ENRICHER_GOOD},
     # Kinesis region
     "region": "us-west-2",
     # Kinesis position: LATEST or TRIM_HORIZON
     "position": "TRIM_HORIZON"
    },
    # For stream shredder : frequency to emit loading finished message - 5,10,15,20,30,60 etc minutes
    "windowing": "5 minutes",
    # Path to shredded archive
    "output": {
      # Path to shredded output
      "path": ${S3_BUCKET_SCHREDDED},
      # Shredder output compression, GZIP or NONE
      "compression": "GZIP"
    }
  },

  # Schema-specific format settings (recommended to leave all three groups empty and use TSV as default)
  "formats": {
    # Format used by default (TSV or JSON)
    "default": "TSV",
    # Schemas to be shredded as JSONs, corresponding JSONPath files must be present. Automigrations will be disabled
    "json": [
      "iglu:com.acme/json-event/jsonschema/1-0-0",
      "iglu:com.acme/json-event/jsonschema/2-*-*"
    ],
    # Schemas to be shredded as TSVs, presence of the schema on Iglu Server is necessary. Automigartions enabled
    "tsv": [ ],
    # Schemas that won't be loaded
    "skip": [
      "iglu:com.acme/skip-event/jsonschema/1-*-*"
    ]
  },

  # Optional. S3 path that holds JSONPaths
  #"jsonpaths": "s3://bucket/jsonpaths/",

  # Warehouse connection details
  "storage" : {
    # Database, redshift is the only acceptable option
    "type": "redshift",
    # Redshift hostname
    "host": ${REDSHIFT_HOST},
    # Database name
    "database": ${REDSHIFT_DATABASE},
    # Database port
    "port": 5439,
    # AWS Role ARN allowing Redshift to load data from S3
    "roleArn": ${REDSHIFT_IAM_ROLE_ARN},
    # DB schema name
    "schema": ${REDSHIFT_SCHEMA},
    # DB user with permissions to load data
    "username": ${REDSHIFT_USER},
    # DB password
    "password": ${REDSHIFT_PASSWORD},
    # Custom JDBC configuration
    "jdbc": {"ssl": true},
    # MAXERROR, amount of acceptable loading errors
    "maxError": 10
  },

  # Additional steps. analyze, vacuum and transit_load are valid values
  "steps": ["analyze"],

  # Observability and reporting options
  "monitoring": {
  #   # Snowplow tracking (optional)
  #   "snowplow": {
  #     "appId": "redshift-loader",
  #     "collector": "snplow.acme.com",
  #   }

  #   # Optional, for tracking runtime exceptions
  #   "sentry": {
  #     "dsn": "http://sentry.acme.com"
  #   },

  #   # Optional, configure how metrics are reported
  #   "metrics": {
  #     # Optional, send metrics to StatsD server
  #     "statsd": {
  #       "hostname": "localhost",
  #       "port": 8125,
  #       # Any key-value pairs to be tagged on every StatsD metric
  #       "tags": {
  #         "app": "rdb-loader"
  #       }
  #       # Optional, override the default metric prefix
  #       # "prefix": "snowplow.rdbloader."
  #     },

  #     # Optional, print metrics on stdout (with slf4j)
  #     "stdout": {
  #       # Optional, override the default metric prefix
  #       # "prefix": "snowplow.rdbloader."
  #     }
  #   }
  }
}

This largely looks correct to me - are the events being successfully enriched / validated? If not they may not be making it to the shredding / loading stage.

Hard to tell from the enricher logs, we receive dozens of events per second and the logs are generic.

I tried something else:

I added a version 3-0-0 to the registry, which is a copy of 1-0-0 (the one that works), and sent the same event to both 1-0-0 and 3-0-0, and again only the 1-0-0 makes it to the shredding / loading stage.

Makes me wonder if the shredder & loader actually use my Iglu server. Could it be that they use Iglu Central even if I passed my Iglu server as resolver? Or maybe I’m not passing the resolver correctly? Wouldn’t it exit with an error if the resolver wasn’t passed correctly?

These are the Dockerfiles:

FROM snowplow/snowplow-rdb-stream-shredder:1.2.0

ENV JAVA_TOOL_OPTIONS="-Xms6G -Xmx6G"

WORKDIR /app

EXPOSE 8000

CMD [\
  "--config", \
  # Must be in base 64. To generate it: base64 config.hocon
  "[BASE 64]", \
  "--iglu-config", \
  # Must be in base 64. To generate it: base64 iglu-resolver.json
  "[BASE 64]" \
]
FROM snowplow/snowplow-rdb-loader:1.2.0

ENV JAVA_TOOL_OPTIONS="-Xms6G -Xmx6G"

WORKDIR /app

EXPOSE 8000

CMD [\
  "--config", \
  # Must be in base 64. To generate it: base64 config.hocon
  "[BASE 64]", \
  "--iglu-config", \
  # Must be in base 64. To generate it: base64 iglu-resolver.json
  "[BASE 64]" \
]

Your resolver file looks largely ok - the rdb shredder / loader will check the format of the file but it won’t error out if the repository can’t be resolved or if the schema can’t be found. Iglu Central should only be used if it’s present in the resolver file.

If 1-0-0 is making it through and 3-0-0 isn’t I’d say it’s probably a schema validation or lookup issue in which case looking through bad rows (and filtering to schema_validation errors) is likely to be your best bet. These won’t be emitted to stdout (unless you are using the stdout sink) but instead should be available in wherever your enricher configuration is pointing to (i.e., Kinesis / PubSub / Kafka / RabbitMQ).

1 Like

Just an update in case it could help someone else.

The issue was actually in my enricher: It used Iglu Central in its Iglu Resolver.

Hence why I couldn’t see my custom events in the shredder and loader’s logs.

No idea why I thought about editing the shredder and loader’s resolvers but not the one for the enricher…

I guess the confusing part was that my first custom event (com.segment/page/jsonschema/1-0-0) was correctly processed but not the other ones (com.segment/page/jsonschema/2-0-0, com.segment/page/jsonschema/3-0-0).

But I found the explication too: com.segment/page/jsonschema/1-0-0 is actually defined on Iglu Central. Hence why the enricher was able to process this custom event but not the other ones.

1 Like