A second command-line tool has been added to ktools. The command kafka-console-consumer-filter
allows for filtering JSON,
JSON Schema, and Avro serialized messages by JSON Path expressions. In addition to filtering a message, JSON Paths
can also be used for highlighting elements within the JSON.
Introduction
This tool is designed around questions like:
- Did an event with a given JSON Element make it to Kafka?
- I want to quickly check events on the topic over the past 2 hours w/out having to write an ISO 8601 string.
- I want to stream data as it is happening highlighting what I’m most interested about.
Why
As a developer, I embrace command-line tools.
Having the ability to use a CLI is extremely helpful.
The need for seeing and displaying content on a topic is an important ability for troubleshooting, validation, and for new developers.
In the past, I would chain kafka-console-consumer
with various unix operations, e.g. grep
but this would have its struggles.
I also found the date-time navigation of using CLI tools to be quite frustrating. In addition, not being able to stop consumption by a given time also made it quite challenging to verify a simple question, “Was a message created two to three hours ago?”
Overview
The best way to give an overview of this tool is through examples. Here are a few examples to see if this CLI tool can help you.
Find all messages that have a price attribute where the price is less than 1.00.
specifically
Filter any message that doesn’t have a price element that is less than 1.00.
Highlight the items in blue, the SKU in yellow, prices less than 1.00 in red, and any price greater than 30.00 in green.
Search the entire topic, exiting when it reaches offsets associated with “right now”.
Include the topic key and metadata as part of the output.
syntax
kafka-console-consumer-filter --bootstrap-server localhost:9092 --topic orders-pickup \
--filter "$..[?(@.price < 1.0)]" \
--highlight "BLUE=$..items" \
--highlight "RED=$..[?(@.price < 1.0)].price" \
--highlight "GREEN=$..[?(@.price > 30.0)].price" \
--highlight "YELLOW=$..sku" \
--include-key \
--include-metadata \
--start earliest \
--end now
output
Find all messages that have 4 or more line items.
specifically
- Filter only orders that have at least 4 elements in the “items” array.
- Highlight the item array as blue, prices with the default color (red), and all name attributes in purple.
- start from 1 day ago and stop when messages exist after the time this command starts executing.
syntax
kafka-console-consumer-filter --bootstrap-server localhost:9092 --topic orders-pickup \
--filter "$..[?( @.items.length() >= 4 )]" \
--highlight "BLUE=$..items" \
--highlight "$..price" \
--highlight "Purple=$..name" \
--rewind 1d \
--end now
output
Display Avro in JSON format, do not filter anything out, but highlight orderId’s as blue and prices as red.
specifically
Select
avro
as the format, providing the URL to the schema registry.Start from March 11th, 2024 at 17:00 UTC, and stop at the time when the command started.
Highlight the orderId in blue and all price elements in red.
kafka-console-consumer-filter --bootstrap-server localhost:9092 --topic purchase-orders \
--format avro \
--schema-registry http://localhost:8081 \
--highlight "BLUE=$..orderId" \
--highlight "$..price" \
--start "2024-03-11T17Z" \
--end now
Configuration
Connection
The connection parameters are similar to standard Kafka CLI tools, with a few exceptions:
While the
bootstrap.servers
kafka consumer property is required, it can be provided in the--consumer.config
property file, or by the option--bootstrap-server
.Since Apache Kafka uses
--command-config
for tools likekafka-topics
, and--consumer.config
for tools likekafka-console-consumer
, this tool supports both--consumer.config
and--consumer-config
. Why? Because I always forget what the middle delimiter is.
or-
.The connection with Apache Kafka is through the
assignment()
API, notsubscribe()
. So no consumer offsets are ever committed to the cluster, and an internal group.id is created only to aid in any metric gathering.Schema configurations, when required, are maintained in their configuration file and can be supplied with
--schema-registry.config
(or--schema-registry-config
).
Topic
--topic
- The name of the topic to consume.
--partitions
- All partitions are assigned unless a list of partitions is provided.
- Partitions that do not exist (e.g. too large of value) are ignored.
Navigation
- Navigation is done by setting an optional start time, end time, and max message count. The start and end times are based on two settings to make date selection easier.
Start Time
--start
to provide a start time in a flexible format,yyyy-MM-dd(T, )HH:mm:ss.SSSZ
.- 3 token-based values are allowed:
now
,earliest
,latest
.now
andlatest
are the same thing, using the current timestamp as the starting point.earliest
will start as the earliest offset in each partition.
now
is the default.- The ‘T’ delimiter can also be a space, provided is interpreted as a single argument by your shell (e.g. quote it).
- Hours, Minutes, Seconds, Microseconds, and Timezone are optional and will be defaulted, if not provided.
- Timezone defaults to system timezone, so use Z if you mean UTC.
- Keep in Mind a Date w/out Hours is going to adjust hours based on the default timezone, so still add a timezone if needed.
- 3 token-based values are allowed:
--rewind
is the ability to quickly change the start time by providing a duration.Units
- ms - milliseconds (default), s - seconds, m - minutes, h - hours, d - days.
The default is
0ms
.To rewind 2 hours from now is simply,
--rewind 2h
.Both
--start
and--rewind
can be on the command line together; rewind will be applied to whatever start time is given.- keep in mind that rewind goes backward, so if you want to go forward from the provided start time, negate it.
earliest
will use an epoch of 0 for seeking, so the use of earliest and rewind is not something I expect someone to do.
Keep in mind rewind is typically used w/out
--start
as a quick way to move backward from now. This is why it is a positive value that is subtracted from the start time. So3d
is 3 days earlier than the start time.
End Time
End time is optional, if not provided, the consumer will not end by time (could still end by max messages being reached).
To process the end time, at least 1 message in every partition after the end time needs to be processed by the command line tool; since event time != processing time. These extra messages are not displayed.
--end
to provide a start time in a flexible format,yyyy-MM-dd(T, )HH:mm:ss.SSSZ
.- Same formatting rules as
--start
. - There is no default.
- Same formatting rules as
--forward
changes the end time by providing a duration.- Same formatting rules as
--rewind
. - There is no default, meaning if both
--end
and--forward
are missing, the consumer will not end based on a date-time. - Where a positive
--rewind
moves the start time earlier, a positive--forward
moves end time later. This all makes sense to my use cases, but I could see the positive/negative behavior could be confusing (but I hope the name of the command line switches makes it easier).
- Same formatting rules as
Max Messages
--max-messages
is the number of messages to consume before quitting.Repeating for a given start time, the results may not be identical, since the number of messages pulled from a partition is non-deterministic.
Filtering and Display
These are the core commands of this tool. This filters the data you want to display and the rules on how they are displayed.
--format
- json - the value is JSON serialized, as with the standard Apache Kafka JSON Serializer.
- json-schema - the value is serialized with the Confluent JSON Schema Serializers that leverage the Schema Registry.
- avro - the value is serialized with the Confluent Avro Serializers that leverage the Schema Registry.
--filter
is a JSON Path expression that must match the value message to be outputted to the screen.For Avro, this is a JSON Path on how the Avro is rendered, this means that the type string for union types could cause the JSON to not be quite as expected.
Jayway open-source project is used for parsing JSON Path expressions, so if a JSON Path expression isn’t working as expected; the implementation does have its nuances.
--highlight
(or--highlights
)- If your terminal honors ANSI coloring, highlights can be used to make those elements stand out in the output.
- syntax:
COLOR=Path
- If
COLOR
is omitted, then the default color is used for highlighting. - JSON Path expressions can overlap, as shown in one of the examples above.
--default-color
- The default color is
red
, but any of the available ascii colors can be used. - The color options are: black, red, green, yellow, blue, purple, cyan, and white.
- The default color is
--include-key
- Include the key in the output, the key will be represented in String form and added to the top-level JSON.
- By putting the key into the outputted JSON, JSON Path expressions can be used to filter by key.
- Keep in mind, that the value is then nested in a
value
instance and can impact JSON Path expressions.
--include-metadata
- Include the message in the output and the metadata is added to a top-level object called
metadata
withtopic
,partition
,offset
, andtimestamp
elements within it. - By putting the metadata into the outputted JSON, JSON Path expressions can be used to filter by anything in the metadata.
- Keep in mind, that the value is then nested in a
value
instance and can impact JSON Path expressions.
- Include the message in the output and the metadata is added to a top-level object called
Project
This CLI tool is the second tool to be added to the KTools open-source project. For more details on this project, please see the release page for the first tool kafka-topic-truncated. It is the initial release of this command, so please be patient and let me know if there are issues, enhancements, or other questions.
Reach out
Please contact us if you have ideas or suggestions. Use the project’s GitHub Issues page for specifics for the project.