Not all Kafka integration tools are the same. Some integration systems only produce JSON data without a schema. The JDBC Sink Connector requires a schema. Here are steps showcasing a low-code option to push events into a relational database when the source data is schema-less JSON.
Real-Time Ecosystem
This demo is available in the postgres-sink folder in dev-local-demos project. It leverages applications available through containers in dev-local. All you need is a Kafka Cluster with the Confluent Schema Registry, 2 KSQL queries/topic running on ksqlDB, and a JDBC Sink Connector running on a Connect cluster.
Challenges
Relational databases require a schema. The JDBC Sink connector relies on the JDBC API and database-specific drivers to write data from a Kafka topic into a table on the database. These drivers need to know the data types of the fields being written.
Steps
KSQL is code, but it is code executed for you on the platform ksqlDB. This makes it a low-code solution and a very elegant solution to integrate with systems that do not provide a schema. With two queries, you can provide a transformation that makes it super easy for JDBC Sink Connector to consume the data.
Work with streams, not tables
If the goal is to stream the events into the Relational Database table, not persist the data within ksqlDB for stream processing, user streams (not tables) within ksqlDB. This can allow for the retention of Kafka topics only to be as long as needed for the applications to process the events.
Read Data into a Stream
The goal of the first statement is to capture the data in ksqlDB with no transformation.
This query needs to align with how the data is coming from the source system.
This example shows a client that publishes a complete message as JSON with the order_id
also the key of the message.
create or replace stream ORDERS (
"order_id" varchar key,
"user_id" varchar,
"store_id" varchar,
"quantity" bigint,
"amt" decimal(4,2),
"ts" string
) with (kafka_topic='ORDERS', value_format='json', key_format='kafka');
Transform and publish with schema
Specify a key and value format that has a schema, for demonstrations Avro is used. Also, renaming fields to align with the database table’s schema will minimize the number of single message transformations in the connect configuration.
create stream ORDERS_WITH_SCHEMA with(KEY_FORMAT='avro', VALUE_FORMAT='avro')
as
select
"order_id" as ORDER_ID,
"user_id" as USER_ID,
"store_id" as STORE_ID,
"quantity" as QUANTITY,
"amt" as AMT,
PARSE_TIMESTAMP("ts", 'yyyy-MM-dd HH:mm:ss', 'UTC') as TS
from ORDERS;
While this query is good, it can be better. By ensuring the key is itself a record (struct), it will simplify the JDBC Sink Connector configuration and allow for a single instance to handle multiple topics.
create or replace stream ORDERS_WITH_SCHEMA with(KEY_FORMAT='avro', VALUE_FORMAT='avro')
as
SELECT
STRUCT(ORDER_ID:=`order_id`) as PK,
"user_id" as USER_ID,
"store_id" as STORE_ID,
"quantity" as QUANTITY,
"amt" as AMT,
PARSE_TIMESTAMP("ts", 'yyyy-MM-dd HH:mm:ss', 'UTC') as TS
FROM ORDERS
PARTITION BY STRUCT(ORDER_ID:=`order_id`);
When the key is a primitive type, the schema specification only captures the data type, not the field name.
By making it a record, a schema is associated to the key, which is then accessible by the JDBC Sink Connector through the schema-registry.
This eliminates the need to set the pk.fields
in the connector’s configuration.
Foreign Keys
With a topic/table setup disabling foreign-key constraints in the destination database is required, since order guarantees are not achievable between tables. This is a big architect discussion that needs to be settled before architecting out a design where you are migrating data to a relational database.
If this is not possible, a single topic for parent/child events where the message key is the parent’s primary key and the JDBC sink connector is configured with the primary key pulled from the value is possible; this is a fair amount more development and configuration. If the source topic is an aggregate with parents and their children together, then a custom consumer could be the right solution. These are discussions outside this article and tutorial.
Connector Configuration
Whenever it comes to a specific Apache Kafka Connector, please read the configuration documentation closely; as there are so numerous differences between connector configurations. With the JDBC Sink connector, a few specific ones should be called out.
the dialect tells the connector the type of database. This is important to know how to construct the SQL to the database.
When it comes to Postgres, pay close attention to the name.
PostgresDatabaseDialect
andPostgresSqlDatabaseDialect
are incorrect."dialect.name": "PostgreSqlDatabaseDialect"
For connection information, put these in a secret and reference them with the provider.
The
file
provider, FileConfigureProvider, is provided with Apache Kafka distribution and easy to enable."connection.url": "${file:/etc/kafka-connect/secrets/postgres.properties:CONNECTION_URL}", "connection.user" : "${file:/etc/kafka-connect/secrets/postgres.properties:CONNECTION_USER}", "connection.password" : "${file:/etc/kafka-connect/secrets/postgres.properties:CONNECTION_PASSWORD}"
By putting the
connection.url
value in a secret, makes it easier to reuse configuration across environments.Do not change the default of
quote.sql.identifiers
fromalways
tonever
, as invalid JDBC statements may result."quote.sql.identifiers": "always"
In the referenced demonstration, changing this to
never
results in invalid syntax because of theAMT
field.Understand the behavior of the insert mode. If you have an idempotent system and the database isn’t generating sequence numbers as part of the insert,
upsert
will be the typical setting."insert.mode": "upsert"
This setting is the easiest to use and understand in a streaming platform. The syntax of the merge (upsert) SQL statements can have some unexpected database performance issues. Enable trace logging will show the prepared statements in the logs, and doing an explain-plan analysis on these queries can help address performance concerns.
pk.mode
andpk.fields
Setting
pk.mode
torecord.key
is the easiest way to deploy a single connector for multiple tables with different primary key field names. If the key schema is a primitive,pk.fields
is required. However, if the key is a structure, then this can (and should) be omitted; as all the fields in the key’s structure are used as the primary-key.Understand and leverage
partition.assignment.strategy
If you are configuring multiple topics for one connector, you need to understand partition assignment.
The default strategy is RangeAssignor. This means that the same partitions for all topics is shared within the same consumer instance. Not only can you only have as many tasks as the partition count of the topic with the most partitions, but you will also end up with uneven distributions if the partition counts are not the same.
In the provided demonstration, the
ORDERS_WITH_SCHEMA
has 4 partitions and theUSERS_WITH_SCHEMA
has 2 partitions yields to uneven workers and limits the number of tasks to 4, even iftasks.max
=6
.Range Assignor
kafka-consumer-groups --bootstrap-server <broker>:9092 --describe --group connect-postgres
GROUP TOPIC PARTITION CLIENT-ID connect-postgres ORDERS_WITH_SCHEMA 0 connector-consumer-postgres-0 connect-postgres USERS_WITH_SCHEMA 0 connector-consumer-postgres-0 connect-postgres ORDERS_WITH_SCHEMA 1 connector-consumer-postgres-1 connect-postgres USERS_WITH_SCHEMA 1 connector-consumer-postgres-1 connect-postgres ORDERS_WITH_SCHEMA 2 connector-consumer-postgres-2 connect-postgres ORDERS_WITH_SCHEMA 3 connector-consumer-postgres-3 Only 4 working tasks are created (unique client-ids), since the maximum partition count of the topics is 4. Because USERS has 2 partitions 2 workers have 2 partitions, and 2 workers have 1 partition.
Round Robin Assignor
"consumer.override.partition.assignment.strategy": "org.apache.kafka.clients.consumer.RoundRobinAssignor",
kafka-consumer-groups --bootstrap-server <broker>:9092 --describe --group connect-postgres
max.tasks = 6
GROUP TOPIC PARTITION CLIENT-ID connect-postgres ORDERS_WITH_SCHEMA 0 connector-consumer-postgres-0 connect-postgres ORDERS_WITH_SCHEMA 1 connector-consumer-postgres-1 connect-postgres ORDERS_WITH_SCHEMA 2 connector-consumer-postgres-2 connect-postgres ORDERS_WITH_SCHEMA 3 connector-consumer-postgres-3 connect-postgres USERS_WITH_SCHEMA 0 connector-consumer-postgres-4 connect-postgres USERS_WITH_SCHEMA 1 connector-consumer-postgres-5 The maximum number of possible tasks is the total number of partitions over all topics, if the setting is lower, such as
tasks.max
=5
, they are assigned in a round-robin fashion (hence the name).max.tasks = 5
GROUP TOPIC PARTITION CLIENT-ID connect-postgres ORDERS_WITH_SCHEMA 0 connector-consumer-postgres-0 connect-postgres USERS_WITH_SCHEMA 1 connector-consumer-postgres-0 connect-postgres ORDERS_WITH_SCHEMA 1 connector-consumer-postgres-1 connect-postgres ORDERS_WITH_SCHEMA 2 connector-consumer-postgres-2 connect-postgres ORDERS_WITH_SCHEMA 3 connector-consumer-postgres-3 connect-postgres USERS_WITH_SCHEMA 0 connector-consumer-postgres-4
Takeaways
It is not always possible to go to the source system to change how it publishes data. Having a streaming platform, like ksqlDB, is important to your enterprise platform.
Existing Single Message Transforms (SMTs) do not bridge the schema-less to schema transformation. Writing a custom SMT is an option, but would not be available to use in a Confluent Cloud managed Connector.
Building a very simple Kafka Streams application would achieve the same success. While a Kafka Streams solution gives more flexibility, it increases development and operational effort.
While the goal of streaming systems is not just a pass-through from a source system into RDBMS tables, many enterprise systems need some level of this functionality.
Foreign Key constraints lead to challenges, disabling them for an “eventually consistent” database may not be an option. A More advanced solution is needed if they exist in the source tables.
Check out dev-local project and the demo in
postgres-sink
in dev-local-demos, and try it out yourself.
Reach out
Please contact us if you have improvements, want clarification, or just want to talk streaming.