Hello,
I have a snowplow setup which is tracking analytics events for an IoT application. We have an embedded iglu repo set up with schemas for our events. This is mostly working fine.
A little bit of context: AWS IoT core has a concept called Device Shadows which represents device state. AWS maintains the state and sends events with updates on it. Our snowplow collector is forwarded messages from these updates. We have a custom schema written that represents the format of these messages.
However, this schema represents the entire message that AWS sends, which contains some extra properties on top of what the device itself sends to AWS. I want to extract a subschema just for the data the device sends and then reference the subschema in the main schema that the collector sees. The reason to do this is so that the device firmware code can use a schema specific to their messages for code generation/etc.
So it is something like this:
main shadow schema:
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "com.mycompany",
"name": "shadow_update",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
...
"state": {
"type": ["object", "null"],
"properties": {
"reported": {
"oneOf": [
{ "$ref": "iglu:com.mycompany/shadow_state/jsonschema/1-0-0" },
{ "type": "null" }
]
},
...
}
}
},
"additionalProperties": false
}
In the above snippet, the state.reported
property references the separate schema file.
With this setup I am getting errors during schema validation in the enricher. This is the error reported in the file written to the bad events kinesis stream:
com.networknt.schema.JsonSchemaException: java.lang.NullPointerException
at com.networknt.schema.JsonMetaSchema.newValidator(JsonMetaSchema.java:286)
at com.networknt.schema.ValidationContext.newValidator(ValidationContext.java:57)
at com.networknt.schema.JsonSchema.read(JsonSchema.java:203)
at com.networknt.schema.JsonSchema.getValidators(JsonSchema.java:479)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:278)
at com.networknt.schema.PropertiesValidator.validate(PropertiesValidator.java:69)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:279)
at com.networknt.schema.PropertiesValidator.validate(PropertiesValidator.java:69)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:279)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:262)
at com.snowplowanalytics.iglu.client.validator.CirceValidator$.com$snowplowanalytics$iglu$client$validator$CirceValidator$$validateOnReadySchema(CirceValidator.scala:252)
at com.snowplowanalytics.iglu.client.validator.CirceValidator$WithCaching$.$anonfun$validate$4(CirceValidator.scala:304)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.iglu.client.validator.CirceValidator$WithCaching$.$anonfun$validate$3(CirceValidator.scala:303)
at delay @ scalacache.CatsEffect$$anon$2.delay(CatsEffect.scala:37)
at flatMap @ com.snowplowanalytics.iglu.client.validator.CirceValidator$WithCaching$.getFromCacheOrEvaluate(CirceValidator.scala:315)
at map @ com.snowplowanalytics.iglu.client.validator.CirceValidator$WithCaching$.validate(CirceValidator.scala:303)
at leftMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:50)
at leftMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:50)
at realTime @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.currentSeconds(ResolverCache.scala:219)
at map @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.currentSeconds(ResolverCache.scala:219)
at liftF @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.$anonfun$getTimestampedItem$1(ResolverCache.scala:167)
at map @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.$anonfun$getTimestampedItem$1(ResolverCache.scala:167)
at withFilter @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.$anonfun$getTimestampedItem$1(ResolverCache.scala:169)
at map @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.$anonfun$getTimestampedItem$1(ResolverCache.scala:167)
at delay @ scalacache.CatsEffect$$anon$2.delay(CatsEffect.scala:37)
at flatMap @ com.snowplowanalytics.iglu.client.resolver.ResolverCache$.com$snowplowanalytics$iglu$client$resolver$ResolverCache$$getTimestampedItem(ResolverCache.scala:166)
at flatMap @ com.snowplowanalytics.iglu.client.resolver.Resolver.lookupSchemaResult(Resolver.scala:137)
at flatMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.config.ParsedConfigs$.parse(ParsedConfigs.scala:70)
at leftMap @ com.snowplowanalytics.snowplow.enrich.common.fs2.io.FileSystem$.readJson(FileSystem.scala:50)
Caused by: java.lang.NullPointerException
at com.networknt.schema.OneOfValidator$ShortcutValidator.<init>(OneOfValidator.java:38)
at com.networknt.schema.OneOfValidator.<init>(OneOfValidator.java:123)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at com.networknt.schema.ValidatorTypeCode.newValidator(ValidatorTypeCode.java:135)
at com.networknt.schema.JsonMetaSchema.newValidator(JsonMetaSchema.java:279)
at com.networknt.schema.ValidationContext.newValidator(ValidationContext.java:57)
at com.networknt.schema.JsonSchema.read(JsonSchema.java:203)
at com.networknt.schema.JsonSchema.getValidators(JsonSchema.java:479)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:278)
at com.networknt.schema.PropertiesValidator.validate(PropertiesValidator.java:69)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:279)
at com.networknt.schema.PropertiesValidator.validate(PropertiesValidator.java:69)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:279)
at com.networknt.schema.JsonSchema.validate(JsonSchema.java:262)
at com.snowplowanalytics.iglu.client.validator.CirceValidator$.com$snowplowanalytics$iglu$client$validator$CirceValidator$$validateOnReadySchema(CirceValidator.scala:252)
at com.snowplowanalytics.iglu.client.validator.CirceValidator$WithCaching$.$anonfun$validate$4(CirceValidator.scala:304)
at scala.util.Either.flatMap(Either.scala:341)
at com.snowplowanalytics.iglu.client.validator.CirceValidator$WithCaching$.$anonfun$validate$3(CirceValidator.scala:303)
at cats.effect.IO$Map.apply(IO.scala:1720)
at cats.effect.IO$Map.apply(IO.scala:1718)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:196)
at cats.effect.internals.IORunLoop$.restartCancelable(IORunLoop.scala:51)
at cats.effect.internals.IOBracket$BracketStart.run(IOBracket.scala:100)
at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:90)
at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:90)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:90)
at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:80)
at cats.effect.internals.IOBracket$BracketStart.apply(IOBracket.scala:58)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:192)
at cats.effect.internals.IORunLoop$.restart(IORunLoop.scala:41)
at cats.effect.internals.IOBracket$.$anonfun$apply$1(IOBracket.scala:48)
at cats.effect.internals.IOBracket$.$anonfun$apply$1$adapted(IOBracket.scala:34)
at cats.effect.internals.IOAsync$.$anonfun$apply$1(IOAsync.scala:37)
at cats.effect.internals.IOAsync$.$anonfun$apply$1$adapted(IOAsync.scala:37)
at cats.effect.internals.IORunLoop$RestartCallback.start(IORunLoop.scala:464)
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:165)
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:480)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:501)
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:439)
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Also, these files have a base64 encoded string for the .data.payload
propery. Normally the payload property is an object with enriched and raw properties.
What am I doing wrong here? what needs to be done to reference a separate schema in a self-describing iglu schema? Also, is it possible to make the referenced schema just a plain json schema (i.e. not self-describing schema) since these subschemas will never be sent as individual events to snowplow.
Thanks for your help