S3-loader not getting from enrich stream

Hi there,

I am trying to set up this configuration as a demo. tracker -> collector -> enrich -> load to s3 using Kinesis streams.

I can see that both the collector stream (stream-good) and the enrich stream (stream-enrich-2) both have data, but nothing gets logged to the terminal and no data is put into the S3 bucket when I run the s3-loader. However, using the same loader config file, but just changing the input stream to the collector stream (stream-good), the process works. Any help would be appreciated.

To me, it seems like there is something I’m doing wrong in my enrich configuration because I seem to be able to get the flow to work if I set the source stream to my collector stream.

Thanks, Tim

Here are the conf files for the collector, enricher, and loader

Collector


collector {
   interface = "0.0.0.0"
  port = 8000

  ssl {
    enable = false
    redirect = false
    port = 9543
  }

  paths {
  
  }
  
  p3p {
    policyRef = "/w3c/p3p.xml"
    CP = "NOI DSP COR NID PSA OUR IND COM NAV STA"
  }

  crossDomain {
    enabled = false
    domains = [ "*" ]
    secure = true
  }

  cookie {
    enabled = true
    expiration = "365 days" # e.g. "365 days"
    name = "sp"
    
    domains = [
        "website.org"     
    ]
    
    fallbackDomain = "website.org"
    secure = false
    httpOnly = false
  
  }

  doNotTrackCookie {
    enabled = false
    name = ""
    value = ""
  }

  cookieBounce {
    enabled = false
    name = "n3pc"
    forwardedProtocolHeader = "X-Forwarded-Proto"
  }


  enableDefaultRedirect = true

  redirectMacro {
    enabled = false

  }

  rootResponse {
    enabled = false
    statusCode = 302
    headers = {
      Location = "https://127.0.0.1/",
      X-Custom = "something"
    }
    body = "302, redirecting"
  }

  cors {
    accessControlMaxAge = 5 seconds
  }

  streams {
    # Events which have successfully been collected will be stored in the good stream/topic
    good = "stream-good"

    # Events that are too big (w.r.t Kinesis 1MB limit) will be stored in the bad stream/topic
    bad = "stream-bad"

    useIpAddressAsPartitionKey = false

    sink {
      enabled = kinesis
      region = "us-west-2"
      threadPoolSize = 10

      aws {
        accessKey = env
        secretKey = env
      }

      backoffPolicy {
        minBackoff = 3000
        maxBackoff = 600000
      }
    }

    buffer {
      byteLimit = 500000
      recordLimit = 1000
      timeLimit = 10000
    }
  }

}

Enrich

enrich {

  streams {
    in {
      # Stream/topic where the raw events to be enriched are located
      raw = "stream-good"
    }

    out {
      # Stream/topic where the events that were successfully enriched will end up
      enriched = "stream-enrich-2"
      # Stream/topic where the event that failed enrichment will be stored
      bad = "stream-enrich-fail"
      # Stream/topic where the pii tranformation events will end up
      pii = "stream-enrich-pii"

      partitionKey = "event_id"
    }

    sourceSink {
      enabled =  "kinesis"
      region = "us-west-2"
      aws {
         accessKey = "env"
         secretKey = "env"
      }

      maxRecords = 10000
      initialPosition = "TRIM_HORIZON"
      backoffPolicy {
        minBackoff = 3000
        maxBackoff = 600000
      }

      buffer {
        byteLimit = 500000
        recordLimit = 1000 # Not supported by Kafka; will be ignored
        timeLimit = 10000
      }
      
      appName = "myApp"
  }


}

S3-loader

source = "kinesis"
sink = "kinesis"
aws {
  accessKey = "env"
  secretKey = "env"
}

nsq {
  channelName = ""
  host = 0.0.0.0
  port = 0
  lookupPort = 0
}

kinesis {
  initialPosition = "TRIM_HORIZON"
  initialTimestamp = "{{timestamp}}"
  maxRecords = 10000
  region = "us-west-2"
  appName = "myApp-s3"
}

streams {
  inStreamName = "stream-enrich-2"
  outStreamName = "stream-storage-fail"
  buffer {
    byteLimit = 500000 # Not supported by NSQ; will be ignored
    recordLimit = 1000
    timeLimit = 10000 # Not supported by NSQ; will be ignored
  }
}

s3 {
  region = "us-west-2"
  bucket = "buckket-2"
  dateFormat = "{YYY}-{MM}-{dd}-{HH}"
  format = "gzip"
  maxTimeout = 30000

}



Ultimately, I just “rebooted”. I deleted the Dynamo table and Kinesis streams associated with the enrich and s3-load step and restarted the programs. Upon restarting these, it all worked.