ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate

I am using kafka to publish both async and sync messages to the broker .One listener would listen to the topic and respond for both sync and async calls. I am using same request topic for both the templates .. When using fire and forget(Async) I don't see any issues since listener would listen to the messages randomly from topic.When using synchronous call I am getting timeout exception.

  1. Do I need to maintain multiple listeners for different templates ?
  2. With same topic for both synchronous and async operations would there be any issues?


//Template for synchornous call

public ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate (
    ProducerFactory<String, Model> pf,
    ConcurrentMessageListenerContainer<String, Model> repliesContainer)
    ReplyingKafkaTemplate<String, Model, Model> replyTemplate =
        new ReplyingKafkaTemplate<>(pf, repliesContainer);
    return replyTemplate;

@Bean //register ConcurrentMessageListenerContainer bean
public ConcurrentMessageListenerContainer<String, Model> repliesContainer (
    ConcurrentKafkaListenerContainerFactory<String, Model> containerFactory)
    ConcurrentMessageListenerContainer<String, Model> repliesContainer =
    return repliesContainer;

//Template for asynchronous call


public KafkaTemplate<String, Model> kafkaTemplate (
    ProducerFactory<String, Model> pf,
    ConcurrentKafkaListenerContainerFactory<String, Model> factory)
    KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(pf);
    return kafkaTemplate;

Here is service class

public class KafkaService
    private ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate;
    private KafkaTemplate<String, Model> kafkaTemplate;
    private KafkaConfig config;
    public Object sendAndReceive (Model model)

        ProducerRecord<String, Model> producerRecord =
            new ProducerRecord("requestTopic", model);
                new RecordHeader(KafkaHeaders.REPLY_TOPIC, "replyTopic"));
        RequestReplyFuture<String, Model, Model> replyFuture =
            replyingKafkaTemplate.sendAndReceive(producerRecord, Duration.ofSeconds(timeout));
        ConsumerRecord<String, Model> consumerRecord =
            replyFuture.get(timeout, TimeUnit.SECONDS);
        return consumerRecord.value();

    public ResponseEntity<Object> send (final Model model)

        final ProducerRecord<String, Model> producerRecord =
            new ProducerRecord("requestTopic", model);
        final ListenableFuture<SendResult<String, Model>> future =
        final SendResult<String, Model> sendResult = future.get(timeout, TimeUnit.SECONDS);
        return new ResponseEntity<>(sendResult, HttpStatus.ACCEPTED);


Here is the listener class.

public class MessageListener
    @KafkaListener(groupId = "${group.id}", topics = "requestTopic", containerFactory = "kafkaListenerContainerFactory")
    public Model consumer (Model model)
        switch (model.getType()) {
        case "async":
            System.out.println("Async messages are retrieved");
        case "sync":
            System.out.println("Sync messages are retrieved");
            return model;
        return model;

    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory (
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =                new ConcurrentKafkaListenerContainerFactory<>();           
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;


Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation