ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate
I am using kafka to publish both async and sync messages to the broker .One listener would listen to the topic and respond for both sync and async calls. I am using same request topic for both the templates .. When using fire and forget(Async) I don't see any issues since listener would listen to the messages randomly from topic.When using synchronous call I am getting timeout exception.
- Do I need to maintain multiple listeners for different templates ?
- With same topic for both synchronous and async operations would there be any issues?
KafkaConfig.java
//Template for synchornous call
@Bean
public ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate (
ProducerFactory<String, Model> pf,
ConcurrentMessageListenerContainer<String, Model> repliesContainer)
{
ReplyingKafkaTemplate<String, Model, Model> replyTemplate =
new ReplyingKafkaTemplate<>(pf, repliesContainer);
replyTemplate.setSharedReplyTopic(true);
return replyTemplate;
}
@Bean //register ConcurrentMessageListenerContainer bean
public ConcurrentMessageListenerContainer<String, Model> repliesContainer (
ConcurrentKafkaListenerContainerFactory<String, Model> containerFactory)
{
ConcurrentMessageListenerContainer<String, Model> repliesContainer =
containerFactory.createContainer("responseTopic");
repliesContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
//Template for asynchronous call
@Bean
@Qualifier("kafkaTemplate")
public KafkaTemplate<String, Model> kafkaTemplate (
ProducerFactory<String, Model> pf,
ConcurrentKafkaListenerContainerFactory<String, Model> factory)
{
KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(pf);
factory.setReplyTemplate(kafkaTemplate);
return kafkaTemplate;
}
Here is service class
@Service
public class KafkaService
{
@Autowired
private ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate;
@Autowired
private KafkaTemplate<String, Model> kafkaTemplate;
@Autowired
private KafkaConfig config;
public Object sendAndReceive (Model model)
{
ProducerRecord<String, Model> producerRecord =
new ProducerRecord("requestTopic", model);
producerRecord.headers()
.add(
new RecordHeader(KafkaHeaders.REPLY_TOPIC, "replyTopic"));
RequestReplyFuture<String, Model, Model> replyFuture =
replyingKafkaTemplate.sendAndReceive(producerRecord, Duration.ofSeconds(timeout));
ConsumerRecord<String, Model> consumerRecord =
replyFuture.get(timeout, TimeUnit.SECONDS);
return consumerRecord.value();
}
public ResponseEntity<Object> send (final Model model)
{
final ProducerRecord<String, Model> producerRecord =
new ProducerRecord("requestTopic", model);
final ListenableFuture<SendResult<String, Model>> future =
kafkaTemplate.send(producerRecord);
final SendResult<String, Model> sendResult = future.get(timeout, TimeUnit.SECONDS);
return new ResponseEntity<>(sendResult, HttpStatus.ACCEPTED);
}
}
Here is the listener class.
@Slf4j
@Service
public class MessageListener
{
@KafkaListener(groupId = "${group.id}", topics = "requestTopic", containerFactory = "kafkaListenerContainerFactory")
@SendTo
public Model consumer (Model model)
{
switch (model.getType()) {
case "async":
System.out.println("Async messages are retrieved");
case "sync":
System.out.println("Sync messages are retrieved");
return model;
}
return model;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory (
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
}
Comments
Post a Comment