Spring - Apache Kafka
Apache Kafka Support
Apache Kafka is supported by providing auto-configuration of the spring-kafka project.Kafka configuration is controlled by external configuration properties in spring.kafka.*. For
example, you might declare the following section in application.properties:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
To create a topic on startup, add a bean of type NewTopic. If the topic already exists, the bean is ignored.
See KafkaProperties for more supported options.
Sending a Message
Spring’s KafkaTemplate is auto-configured, and you can autowire it directly in your own beans, asshown in the following example:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined,
a KafkaTransactionManager is automatically configured. Also, if a
RecordMessageConverter bean is defined, it is automatically associated to the autoconfigured
KafkaTemplate.
Receiving a Message
When the Apache Kafka infrastructure is present, any bean can be annotated with @KafkaListenerto create a listener endpoint. If no KafkaListenerContainerFactory has been defined, a default one is
automatically configured with keys defined in spring.kafka.listener.*.
The following component creates a listener endpoint on the someTopic topic:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
If a KafkaTransactionManager bean is defined, it is automatically associated to the container factory.
Similarly, if a ErrorHandler, AfterRollbackProcessor or ConsumerAwareRebalanceListener bean is
defined, it is automatically associated to the default factory.
Depending on the listener type, a RecordMessageConverter or BatchMessageConverter bean is
associated to the default factory. If only a RecordMessageConverter bean is present for a batch
listener, it is wrapped in a BatchMessageConverter.
A custom ChainedKafkaTransactionManager must be marked @Primary as it usually
references the auto-configured KafkaTransactionManager bean.
Kafka Streams
Spring for Apache Kafka provides a factory bean to create a StreamsBuilder object and manage thelifecycle of its streams. Spring Boot auto-configures the required KafkaStreamsConfiguration bean as
long as kafka-streams is on the classpath and Kafka Streams is enabled via the @EnableKafkaStreams annotation.
Enabling Kafka Streams means that the application id and bootstrap servers must be set. The
former can be configured using spring.kafka.streams.application-id, defaulting to
spring.application.name if not set. The latter can be set globally or specifically overridden just for
streams.
Several additional properties are available using dedicated properties; other arbitrary Kafka
properties can be set using the spring.kafka.streams.properties namespace. See also Additional
Kafka Properties for more information.
To use the factory bean, simply wire StreamsBuilder into your @Bean as shown in the following
example:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
}
By default, the streams managed by the StreamBuilder object it creates are started automatically.
You can customize this behaviour using the configprop:spring.kafka.streams.auto-startup[]
property.
Additional Kafka Properties
The properties supported by auto configuration are shown in Common Application properties. Notethat, for the most part, these properties (hyphenated or camelCase) map directly to the Apache
Kafka dotted properties. Refer to the Apache Kafka documentation for details.
The first few of these properties apply to all components (producers, consumers, admins, and
streams) but can be specified at the component level if you wish to use different values. Apache
Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot autoconfiguration
supports all HIGH importance properties, some selected MEDIUM and LOW
properties, and any properties that do not have a default value.
Only a subset of the properties supported by Kafka are available directly through the
KafkaProperties class. If you wish to configure the producer or consumer with additional properties
that are not directly supported, use the following properties:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
This sets the common prop.one Kafka property to first (applies to producers, consumers and
admins), the prop.two admin property to second, the prop.three consumer property to third, the
prop.four producer property to fourth and the prop.five streams property to fifth.
You can also configure the Spring Kafka JsonDeserializer as follows:
spring.kafka.consumer.valuedeserializer=
org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
Similarly, you can disable the JsonSerializer default behavior of sending type information in
headers:
spring.kafka.producer.valueserializer=
org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
Properties set in this way override any configuration item that Spring Boot
explicitly supports.
Testing with Embedded Kafka
Spring for Apache Kafka provides a convenient way to test projects with an embedded ApacheKafka broker. To use this feature, annotate a test class with @EmbeddedKafka from the spring-kafkatest
module. For more information, please see the Spring for Apache Kafka reference manual.
To make Spring Boot auto-configuration work with the aforementioned embedded Apache Kafka
broker, you need to remap a system property for embedded broker addresses (populated by the
EmbeddedKafkaBroker) into the Spring Boot configuration property for Apache Kafka. There are
several ways to do that:
• Provide a system property to map embedded broker addresses into
configprop:spring.kafka.bootstrap-servers[] in the test class:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY,
"spring.kafka.bootstrap-servers");
}
• Configure a property name on the @EmbeddedKafka annotation:
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
• Use a placeholder in configuration properties:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Comments
Post a Comment