Recover bad events

Hi, We had a situation with our self hosted snowplow pipeline
for a context here is our snowplow pipeline:
Java tracker > Stream-Collector > nsq > Stream-Enrich > nsq > elasticsearch loader > elasticsearch
due to a server problem our schema wasn’t access able for stream enrichment pod.
now all events in this period stored in Elasticsearch Bad index and we got this json in it

{
  "_index": "good-reindex",
  "_type": "bad",
  "_id": "3193PYsBdCB_N_98Cnkp",
  "_score": 1,
  "_source": {
    "line": "CwBkAAAADTQxLjE5OS4xNDgu......................pZnQvMS0wLTAA",
    "errors": [
      {
        "level": "error",
        "message": "error: Could not find schema with key iglu:com.***/finishOrder/jsonschema/4-0-0 in any repository, tried:\n    level: \"error\"\n    repositories: [\"Iglu Central [HTTP]\",\"Iglu Client Embedded [embedded]\",\"*** Repo [HTTP]\"]\n"
      },
      {
        "level": "error",
        "message": "error: Unexpected exception fetching iglu:com.***/finishOrder/jsonschema/4-0-0 in HTTP Iglu repository *** Repo: java.net.ConnectException: Connection refused (Connection refused)\n    level: \"error\"\n"
      },
      {
        "level": "error",
        "message": "error: Unexpected exception fetching iglu:com.***/finishOrder/jsonschema/4-0-0 in HTTP Iglu repository *** Repo: java.net.ConnectException: Connection refused (Connection refused)\n    level: \"error\"\n"
      },
      {
        "level": "error",
        "message": "error: Unexpected exception fetching iglu:com.***/finishOrder/jsonschema/4-0-0 in HTTP Iglu repository *** Repo: java.net.ConnectException: Connection refused (Connection refused)\n    level: \"error\"\n"
      }
    ],
    "failure_tstamp": ""
  }
}

My question is: Can we re-feed those events/records to Snowplow again so it can be stored in Good index??

The short answer is yes - failed events are lossless so it is possible to replay these events into the pipeline to recover them.

The longer answer is that this is going to be more difficult as we don’t have a process to replay rows into nsq, or to take rows from Elasticsearch and recover them from there. It is doable but you’ll likely need custom code to:

  1. Query Elasticsearch and store the failed lines on disk
  2. Run a recovery process to play these back into NSQ in the raw format (you will need to convert the base 64 format to the collector payload format)

This also looks like quite an old pipeline as it is using the older bad events format, unfortunately all the newer tooling only works with the new format which does make recovery significantly easier.