So many applications leverage environment variables for configuration, especially when they are deployed within a container. With just a little bit of code, you can leverage the same behavior for your Java Kafka clients.
Introduction
The Kafka configuration convention makes it super easy-to-use environment variables for configuration. The convention of all properties are lowercase and all separation is with periods, properties can be pulled safely from the environment.
Overrides
- loop through all environment variables, filtering out only those with a prefix, such as “KAFKA_CLIENT_”.
- replace all “_” characters with “.” and lowercase everything. If _ ever becomes a client property, use an escape (e.g. __ -> _).
- abstract out the
System.getEnv()
to allow for a means to use an alternate source for unit-testing (implementation details left as an exercise).
private static Map<String, String> environment() {
return System.getEnv();
}
public static Map<String, String> environmentProperties(String prefix) {
return environment().entrySet().stream()
.filter(e -> e.getKey().startsWith(prefix))
.map(e -> {
String key = e.getKey().substring(prefix.length()).replace("_", ".").toLowerCase();
return Map.entry(key, e.getValue());
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
Now at the heart of it, that’s it; but yes, it isn’t quite that simple.
Connection (& Secrets)
Now, the handling of secrets typically requires more restrictions within an organization.
Typically, I prefer to load all connection settings from a shared-secret mounted in the container.
By putting all connection settings in this file, e.g. bootstrap.servers
, security.protocol
, sasl.mechanism
, and
any secrets those settings that change based where it is deployed, are part of the container initialization; not application configuration.
public static Map<String, Object> load(String propertyFile) {
try {
File file = new File(propertyFile);
if (file.exists() && file.isFile()) {
Properties properties = new Properties();
try (InputStream is = new FileInputStream(file)){
properties.load(is);
return new HashMap<>(properties.entrySet().stream()
.collect(Collectors.toMap(e->e.getKey().toString(),Map.Entry::getValue)));
}
} else {
throw new RuntimeException("unable to read property file.");
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
Now conneciton information extracted from an actual property file, there are more secure ways of pulling that data into the container.
Putting it all Together
Now, when developing an application; the development team would knoe what are more ideal defaults and what settings cannot be modified (if any). Use a 4 step process of:
private Map<String, Object> properties() {
Map<String, Object> map = new HashMap<>();
- Defaults
map.putAll(Map.ofEntries(
Map.entry(ProducerConfig.BATCH_SIZE_CONFIG, 200_000L),
Map.entry(ProducerConfig.LINGER_MS_CONFIG, 200L),
Map.entry(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy")
));
- Overrides
map.putAll(KafkaUtil.environmentProperties("KAFKA_CLIENT_"));
- Connection Properties (Secrets)
map.putAll(PropertiesUtil.load("/secrets/connection.properties"));
- Immutables
map.putAll(Map.ofEntries(
Map.entry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()),
Map.entry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())
));
return map;
}
Summary
With just a little bit code, it is easy to make an easy dynamically configure your kafka clients while keeping connection and secret properties secure.