- I have created a scala stream collector with port 8080
- Then I created a python flask application and initialized tracker like this:
e = Emitter(“http://localhost:8080/”, port=8080) - I have created good and bad stream in my aws account
- I started my flask application and scala stream collector on local which is running.
- But when I check my streams in aws its not receiving any data.
Can you please help ?
Thanks.
Hey @ank29,
Your tracker should take the endpoint address without the protocol and port:
e = Emitter(“localhost:8080”, port=8080)
Best,
Thank you colm. Its working fine now. But when I take data in s3 its unreadable. So how to validate it before putting into kinesis stream from collector?
The short answer is that normally you don’t - the raw stream is a very unfriendly format, it’s thrift-encoded, and when dumped to s3 is lzo compressed.
It is possible to write something to read this and spit out some more readable format, but the amount of work in doing so would be quite a lot more than what I would consider worth it.
You can think of the collector, enrich, and their associated kinesis streams as the one system. If the data is somehow of a format that enrich can’t deal with, it will end up in the bad stream with a failure message that suggests as such. But I would be very, very surprised if that were to happen, so I would say the best path forwards is to just plug it into enrich and work from there.
Totally agreed. Thank you so much Colm for your help.
- I am trying to implement enricher now.
- I have already configured my tracker, collector and streams are getting into kinesis streams.
- But its throwing me error for “Initializing LeaseCoordinator”. Is it compulsory to create dynamoDb table for enricher to work?
Below is my config file for enricher
enrich {
streams {
in {
# Stream/topic where the raw events to be enriched are located
raw = good
raw = ${?ENRICH_STREAMS_IN_RAW}
}
out {
# Stream/topic where the events that were successfully enriched will end up
enriched = enriched
enriched = ${?ENRICH_STREAMS_OUT_ENRICHED}
# Stream/topic where the event that failed enrichment will be stored
bad = bad
bad = ${?ENRICH_STREAMS_OUT_BAD}
# Stream/topic where the pii tranformation events will end up
pii = outpii
pii = ${?ENRICH_STREAMS_OUT_PII}
partitionKey = event_id
partitionKey = ${?ENRICH_STREAMS_OUT_PARTITION_KEY}
}
sourceSink {
enabled = kinesis
enabled = ${?ENRICH_STREAMS_SOURCE_SINK_ENABLED}
# Region where the streams are located (AWS region, pertinent to kinesis sink/source type)
region = ap-southeast-1
region = ${?ENRICH_STREAMS_SOURCE_SINK_REGION}
aws {
accessKey = "***********"
accessKey = ${?COLLECTOR_STREAMS_SINK_AWS_ACCESS_KEY}
secretKey = "**********"
secretKey = ${?COLLECTOR_STREAMS_SINK_AWS_SECRET_KEY}
}
# Maximum number of records to get from Kinesis per call to GetRecords
maxRecords = 10
maxRecords = ${?ENRICH_MAX_RECORDS}
# LATEST: most recent data.
# TRIM_HORIZON: oldest available data.
# "AT_TIMESTAMP": Start from the record at or after the specified timestamp
# Note: This only effects the first run of this application on a stream.
# (pertinent to kinesis source type)
initialPosition = AT_TIMESTAMP
initialPosition = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_POSITION}
# Need to be specified when initial-position is "AT_TIMESTAMP".
# Timestamp format need to be in "yyyy-MM-ddTHH:mm:ssZ".
# Ex: "2017-05-17T10:00:00Z"
# Note: Time need to specified in UTC.
initialTimestamp = "2020-06-23T10:00:00Z"
initialTimestamp = ${?ENRICH_STREAMS_SOURCE_SINK_INITIAL_TIMESTAMP}
# Minimum and maximum backoff periods, in milliseconds
backoffPolicy {
minBackoff = 3000
minBackoff = ${?COLLECTOR_STREAMS_SINK_MIN_BACKOFF}
maxBackoff = 600000
maxBackoff = ${?COLLECTOR_STREAMS_SINK_MAX_BACKOFF}
}
}
buffer {
byteLimit = 5000
byteLimit = ${?ENRICH_STREAMS_BUFFER_BYTE_LIMIT}
recordLimit = 1000 # Not supported by Kafka; will be ignored
recordLimit = ${?ENRICH_STREAMS_BUFFER_RECORD_LIMIT}
timeLimit = 10000
timeLimit = ${?ENRICH_STREAMS_BUFFER_TIME_LIMIT}
}
# Used for a DynamoDB table to maintain stream state.
# Used as the Kafka consumer group ID.
# Used as the Google PubSub subscription name.
appName = ""
appName = ${?ENRICH_STREAMS_APP_NAME}
}
}
List item
Enrich will create the DynamoDB table for you if it doesn’t exist, you don’t need to create the table manually. DynamoDB is used for the KCL (Kinesis consumer) checkpointing so it needs this table.
Thanks mike. Its working fine now.