The current implementation in TLKafkaProducer provides only a fraction of the possible Producer configuration as typed configuration. In particular, e.g. settings for encryption etc. are missing. However, since TLKafkaProducer was designed to be non-extensible (the method that actually instantiates the KafkaProducer considers hard coded only very specific parameters from the configuration and cannot be overridden), this cannot be easily provided later.
Request
The implementation should be changed so that - analogous to KafkaConsumerService - the configuration can be extended. Either one would have to open private API(getProperties() and newKafkaProducer()) or make the configuration extensible (analogous to ConsumerDispatcherConfiguration with annotations).
Implementation
- The missing Kafka properties were added.
- Using CommonClientConfig.getUntypedProperties() any properties can be configured. These are treated by Top-Logic as Map<String, String> and overwrite the explicitly defined properties in case of conflicts. This allows them to be used by Kafka when the property type changes to bypass the then deprecated typing. These properties are not parsed and there is no check if such a property exists in Kafka and the value is valid. Example:
{{#!xml <producer class="com.top_logic.kafka.services.producer.TLKafkaProducer"
name="KBChange-Producer"
topic="%DATA_CHANGE_TOPIC_PRODUCER%"
>
<untyped-properties>
<property name="bootstrap.servers" value="%KAFKA_PRODUCER_SERVER%"/>
<property name="my.special.property" value="3.14.15"/>
</untyped-properties>
</producer> }}
- Several relevant methods have been made protected and documented so that subclasses can override them.
Code Migration
The names of the following Kafka consumer/producer properties have been unified. The relevant name is now always the name in Kafka itself, so you don't have to look up what a Kafka property is called in Top-Logic anymore.
- acknowledgeLevel => acks
- compressionType => compression.type
- maxRequestSize => max.request.size
- request-timeout => request.timeout.ms
- key.deserializer => key.deserializer.typed.config
- key-deserializer-class => key.deserializer
- key-serializer-class => key.serializer
- keySerializer => key.serializer.typed.config
- value-deserializer-class => value.deserializer
- value.deserializer => value.deserializer.typed.config
- value-serializer-class => value.serializer
- valueSerializer => value.serializer.typed.config
Some methods of CommonClientConfig and its derivatives have changed names. But these should not have been used in projects. Therefore this is only theoretically relevant.
ConsumerDispatcher.getProperties(...) has been replaced by getAllProperties().
Test
No special test for better configurability. For Kafka in general there are the tests in test.com.top_logic.kafka as well as test.com.top_logic.kafka.demo.