Hello,
I’m testing the kafka pipeline, and I’m stuck at moving enriched data from Kafka to Postgres using the kafka-jdbc-sink-connector.
The point I’m stuck at right now is data mapping, i.e. how to configure the connector to read the enriched snowplow output from the kafka topic, so that it can sink it to Postgres. Some of the enriched data is in JSON, and some in TSV, so how do I get the connector to read that data? The JSONconverter that comes with the kafka connector may be able to read the JSON data, but not the TSV data it.
Has anyone successfully set this up, and can share config details? @simplesteph @magaton
One idea is to use the scala analytics SDK to convert enriched data into JSON, then load the resulting JSON into the kafka connector. If that’s possible, how exactly could I set that up (I’m not an advanced programmer, but I can work with code examples)?
Here are the docs I’ve been referencing:
http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_connector.html
http://docs.confluent.io/current/connect/connect-jdbc/docs/sink_config_options.html
My connect-json-standalone.properties file:
# Sample configuration for a standalone Kafka Connect worker that uses Avro serialization and
# integrates the the Schema Registry. This sample configuration assumes a local installation of
# Confluent Platform with all services running on their default ports.
# Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
bootstrap.servers=my-host:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data.
# Every Connect user will need to configure these based on the format they want their data in
# when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter.schema.registry.url=http://my-host:8081
#value.converter.schema.registry.url=http://my-host:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# The internal converter used for offsets and config data is configurable and must be specified,
# but most users will always want to use the built-in default. Offset and config data is never
# visible outside of Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
# Confuent Control Center Integration -- uncomment these lines to enable Kafka client interceptors
# that will report audit data that can be displayed and analyzed in Confluent Control Center
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
My sink-postgresql.properties file:
name=sink-postgresql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=enriched-events
connection.url=jdbc:postgresql://my-host/snowplow-database
connection.user=$DATABASE_USER
connection.password=$DATABASE_PASSWORD
auto.create=false
table.name.format=atomic
record_key=??