Unknown error from Stream Enrich with Localstack

Hi,

I’m trying to get Collector and Enrich running with Localstack, but logs are showing that KCL is catching some exception, but it isn’t providing any information. Is there anyway to get Enrich to log the error? Or better yet, does anyone know why it is happening?

I’m running Kinesis Collector 2.3.0, Kinesis Enrich 2.0.1

Collector is picking up the event and sending it to the correct stream. Here are the logs for Collector

[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
[main] WARN com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - No SQS buffer for surge protection set up (consider setting a SQS Buffer in config.hocon).
[main] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Creating thread pool of size 10
[main] WARN com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - No SQS buffer for surge protection set up (consider setting a SQS Buffer in config.hocon).
[scala-stream-collector-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
[scala-stream-collector-akka.actor.default-dispatcher-2] INFO com.snowplowanalytics.snowplow.collectors.scalastream.KinesisCollector$ - REST interface bound to /0.0.0.0:8080
[pool-1-thread-5] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Writing 1 Thrift records to Kinesis stream etl-raw-good.
[pool-1-thread-10] INFO com.snowplowanalytics.snowplow.collectors.scalastream.sinks.KinesisSink - Successfully wrote 1 out of 1 records to Kinesis stream etl-raw-good.

But Enrich is failing

[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Using workerId: 2c67fc5478c9:dc1a6fa2-1765-4943-b1ea-afc5a8ff4be4
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Running: my-stream-enrich-2.
[main] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Processing raw input stream: etl-raw-good
[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for endpoint as http://host.docker.internal:4566, and region as ap-southeast-1.
[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for endpoint as http://host.docker.internal:4566, and region as ap-southeast-1.
[main] WARN com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Received configuration for region as ap-southeast-1.
[main] INFO com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator - With failover time 10000 ms and epsilon 25 ms, LeaseCoordinator will renew leases every 3308 ms, takeleases every 20050 ms, process maximum of 2147483647 leases and steal 1 lease(s) at a time.
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Shard sync strategy determined as SHARD_END.
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization attempt 1
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initializing LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Syncing Kinesis shard info
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Starting LeaseCoordinator
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Initialization complete. Starting worker loop.
[LeaseCoordinator-0001] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 2c67fc5478c9:dc1a6fa2-1765-4943-b1ea-afc5a8ff4be4 saw 1 total leases, 1 available leases, 1 workers. Target is 1 leases, I have 0 leases, I will take 1 leases
[LeaseCoordinator-0001] INFO com.amazonaws.services.kinesis.leases.impl.LeaseTaker - Worker 2c67fc5478c9:dc1a6fa2-1765-4943-b1ea-afc5a8ff4be4 successfully took 1 leases: shardId-000000000000
[main] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker - Created new shardConsumer for : ShardInfo [shardId=shardId-000000000000, concurrencyToken=b1fa51ce-3d15-42d8-b22a-73184b4bb2d4, parentShardIds=[], checkpoint={SequenceNumber: TRIM_HORIZON,SubsequenceNumber: 0}]
[RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.BlockOnParentShardTask - No need to block on parents [] of shard shardId-000000000000
[RecordProcessor-0000] INFO com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher - Initializing shard shardId-000000000000 with TRIM_HORIZON
[RecordProcessor-0000] INFO com.snowplowanalytics.snowplow.enrich.stream.sources.KinesisSource - Initializing record processor for shard: shardId-000000000000
[RecordProcessor-0000] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask - ShardId shardId-000000000000: Caught exception:
[RecordProcessor-0000] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask - ShardId shardId-000000000000: Caught exception:
[RecordProcessor-0000] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask - ShardId shardId-000000000000: Caught exception:
[RecordProcessor-0000] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask - ShardId shardId-000000000000: Caught exception:
[RecordProcessor-0000] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask - ShardId shardId-000000000000: Caught exception:
[RecordProcessor-0000] ERROR com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask - ShardId shardId-000000000000: Caught exception:

That just repeats until I kill it.

Enrich configuration via environment variables

ENRICH_STREAMS_IN_RAW=etl-raw-good
ENRICH_STREAMS_OUT_ENRICHED=etl-enrich-good
ENRICH_STREAMS_OUT_BAD=etl-enrich-bad
ENRICH_STREAMS_OUT_PARTITION_KEY=event_id
ENRICH_STREAMS_SOURCE_SINK_REGION=ap-southeast-1
ENRICH_STREAMS_SOURCE_SINK_CUSTOM_ENDPOINT=http://host.docker.internal:4566
ENRICH_DYNAMODB_CUSTOM_ENDPOINT=http://host.docker.internal:4566
ENRICH_DISABLE_CLOUDWATCH=true
ENRICH_STREAMS_SOURCE_SINK_AWS_ACCESS_KEY=env
ENRICH_STREAMS_SOURCE_SINK_AWS_SECRET_KEY=env
ENRICH_STREAMS_BUFFER_BYTE_LIMIT=1
ENRICH_STREAMS_BUFFER_RECORD_LIMIT=1
ENRICH_STREAMS_BUFFER_TIME_LIMIT=10
ENRICH_STREAMS_APP_NAME=my-stream-enrich-2
AWS_ACCESS_KEY_ID=A....Z
AWS_SECRET_ACCESS_KEY=A....Z

Dynamo DB table is created and has a record

$ aws --no-paginate --endpoint-url=http://0.0.0.0:4566 dynamodb list-tables

{
    "TableNames": [
        "Music",
        "my-stream-enrich",
        "my-stream-enrich-2"
    ]
}

$ aws --no-paginate --endpoint-url=http://0.0.0.0:4566 dynamodb scan \
    --table-name my-stream-enrich-2
{
    "Items": [
        {
            "checkpoint": {
                "S": "TRIM_HORIZON"
            },
            "ownerSwitchesSinceCheckpoint": {
                "N": "1"
            },
            "checkpointSubSequenceNumber": {
                "N": "0"
            },
            "leaseKey": {
                "S": "shardId-000000000000"
            },
            "leaseOwner": {
                "S": "2c67fc5478c9:dc1a6fa2-1765-4943-b1ea-afc5a8ff4be4"
            },
            "leaseCounter": {
                "N": "9"
            }
        }
    ],
    "Count": 1,
    "ScannedCount": 1
}

If it helps, I was able to get an older version of Enrich working, though I had to edit the source code to handle endpoint overrides and then rebuild the Docker image. (There was a PR that showed how to do it)
But since we are trying to upgrade our Snowplow versions, I figured I’d try to get the new version working with Localstack again.

Any help is appreciated!

I am also happy to provide more information if the above is insufficient.

Thank you!

Hi @chewlee ,

Welcome to Snowplow community !

Did you manage to get it to work ?

Is there anyway to get Enrich to log the error?

Have you tried using DEBUG log level ?