If you are an Apache Kafka developer looking to write stream-processing applications in Flink, the initial setup isn’t so obvious. Apache Flink has their own opinions on consuming and producing to Kafka along with its integration with Confluent’s Schema Registry. Here are steps and a working example of Apache Kafka and Apache Flink streaming platform up in no time.
Introduction
Apache Flink is a major platform in stream processing; especially in managed services. Here is a look at standing up Apache Flink with integration with Apache Kafka, Confluent’s Schema Registry, and Avro Serialization.
When both the Kafka key and value are to be part of the streaming pipeline (not just the value) nested generics come into play and type handling gets tricky.
Resources
A full container-based ecosystem is available at dev-local. It is based on docker-compose and provides container instances of Apache Kafka, Apache Flink, Kafka Connect, and more.
A demonstration of this article is provided the flink folder of the dev-local-demos project.
Challenges
The key challenges uncovered:
- Committing Offsets
- Using Offsets
- Serialization and Deserialization
- Confluent’s Schema Registry Integration
- Java Generics and Type Erasure
The Details
Committing Offsets
By default, Flink does not commit Kafka consumer offsets. This means when the application restarts, it will consume either from the earliest or latest, depending on the default setting. With a setting, offsets are committed when Flint creates a checkpoint. Just don’t forget to do so when setting up the Kafka source.
Set commit.offsets.on.checkpoint
to true and also add a Kafka group.id
to your consumer.
KafkaSource<Tuple2<EventKey, EventValue>> source = KafkaSource.<Tuple2<EventKey, EventValue>>builder()
...
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("group.id", "flink-processor")
...
.build();
Additionally, check-pointing must be enabled. It is very simple to do, but also very easy to forget.
StreamExecutionEnvironment.getExecutionEnvironment().enableCheckpointing(100L);
Using Offsets
Committing offsets doesn’t mean they will be used; the KafkaSource instance has to be configured to use them.
The committingOffsets of the OffsetsInitializer
instructs the source instance to use them.
Be sure also to define the behavior when offsets do not yet exist by selecting the appropriate OffsetResetStrategy
.
KafkaSource<Tuple2<EventKey, EventValue>> source = KafkaSource.<Tuple2<EventKey, EventValue>>builder()
...
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
...
.build();
Serialization and Deserialization
The Kafka Producer and Consumer used by Flink leverages the byte[] serializer and deserializer and leaves the marshaling of data to Flink. If you only needed to work with message values, setup is easier. However, when it comes to stream processing; leveraging a Kafka message key is usually necessary; so best to understand how that works.
Implement these interfaces KafkaRecordDeserializationSchema
and KafkaRecordSerializationSchema
and use them in the KafkaSource and KafkaSink builders respectively.
The method names within KafkaSource and KafkaSink are not consistent.
For KafkaSource is it setDeserializer
and takes the KafkaRecordDeserializationSchema
.
For KafkaSink it is setRecordSerializer
and takes the KafkaRecordSerializationSchema
.
KafkaSource<Tuple2<EventKey, EventValue>> source = KafkaSource.<Tuple2<EventKey, EventValue>>builder()
...
.setDeserializer(new MyAvroDeserialization<>(EventKey.class, EventValue.class, "http://schema-registry:8081"))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
...
.build();
KafkaSink<Tuple2<EventKey, EventValue>> sink = KafkaSink.<Tuple2<EventKey, EventValue>>builder()
...
.setRecordSerializer(new MyAvroSerialization<>(EventKey.class, EventValue.class, topic, "http://schema-registry:8081"))
...
.build();
Confluent’s Schema Registry Integration
Flink provides their own Confluent Schema, ConfluentRegistryAvroDeserializationSchema
as part of the flink-avro-confluent-registry
library.
Leverage this within your MyAvroDeserialization
and MyAvroSerialization
implementation.
The configuration of Schema Registry and subject naming conventions need to be implemented accordingly.
ConfluentRegistryAvroDeserializationSchema.forSpecific(pojoClass, schemaRegistryUrl)
For topic-based serialization, the subject would be {topic}-key
and {topic}-value
.
ConfluentRegistryAvroSerializationSchema.forSpecific(pojoClass, subject, schemaRegistryUrl);
Java Generics and Type Erasure
KafkaStreams makes both key
and value
part of the processor API and domain-specific language (DSL).
This reduces the complexities of leveraging generics.
In Flink, it is a single object.
Capturing both Key and Value objects within Flink requires more nuance with generics.
Fortunately, Flink’s Tuple (and its 26 concrete implementations Tuple0, Tuple1, …, Tuple25) make it possible to write serializers and deserializers generically. The core concept is that the serializer and deserializer need to provide type information in a way where the nested types are not erased. This is through TypeInformation.
Creating a TypeHint works similarly to TypeReference in the Jackson JSON library. A specific instance allows the code to handle the erased types.
TypeInformation<Tuple2<EventKey, EventValue>> typeInformation = TypeInformation.of(
new TypeHint<Tuple2<EventKey, EventValue>>() {
}
);
With the use of TupleTypeInfo
a single instance of KafkaRecordDeserializationSchema
and KafkaRecordSerializationSchema
can be created for
handling marshaling of Avro to SpecificRecord implementations.
new TupleTypeInfo<>(TypeInformation.of(keyClass), TypeInformation.of(valueClass));
Putting it all Together
This demonstration is available in the dev-local-demos
project, flink.
A simple ETL-based processor with a transformation of an Order
to a PurchaseOrder
(as shown in the demonstration code), is achieved with a very functional transformation.
In this example, the business logic is the convert
method within the map
operation.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(100L);
KafkaSource<Tuple2<OrderKey, Order>> source = KafkaSource.<Tuple2<OrderKey, Order>>builder()
.setBootstrapServers(bootStrapServers)
.setTopics("input")
.setDeserializer(new AvroDeserialization<>(OrderKey.class, Order.class, schemaRegistryUrl))
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("group.id", "FLINK")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
KafkaSink<Tuple2<OrderKey, PurchaseOrder>> sink = KafkaSink.<Tuple2<OrderKey, PurchaseOrder>>builder()
.setBootstrapServers(bootStrapServers)
.setRecordSerializer(new AvroSerialization<>(OrderKey.class, PurchaseOrder.class, "output", schemaRegistryUrl))
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
.map(kv -> {
return new Tuple2<>(kv.f0, convert(kv.f1));
}, new TupleTypeInfo<>(TypeInformation.of(OrderKey.class), TypeInformation.of(PurchaseOrder.class)))
.sinkTo(sink);
env.execute();
Running your application.
Simply build the application with dependencies bundled, and run it with the flink run command.
Parallelization is done with the -p
command.
flink run -p 4 --detached application-all-dependencies.jar
Conclusion
With an overview of how to use Apache Kafka and Confluent’s Schema Registry with Apache Flink, time to explore.