2022-09-16

ExecutionException:Due to: org.springframework.kafka.requestreply.KafkaReplyTimeoutException: Reply timed out using ReplyingKafkaTemplate

I am using kafka to publish both async and sync messages to the broker .One listener would listen to the topic and respond for both sync and async calls. I am using same request topic for both the templates .. When using fire and forget(Async) I don't see any issues since listener would listen to the messages randomly from topic.When using synchronous call I am getting timeout exception.

  1. Do I need to maintain multiple listeners for different templates ?
  2. With same topic for both synchronous and async operations would there be any issues?

KafkaConfig.java

//Template for synchornous call

@Bean 
public ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate (
    ProducerFactory<String, Model> pf,
    ConcurrentMessageListenerContainer<String, Model> repliesContainer)
{
    ReplyingKafkaTemplate<String, Model, Model> replyTemplate =
        new ReplyingKafkaTemplate<>(pf, repliesContainer);
    replyTemplate.setSharedReplyTopic(true);
    return replyTemplate;
}

@Bean //register ConcurrentMessageListenerContainer bean
public ConcurrentMessageListenerContainer<String, Model> repliesContainer (
    ConcurrentKafkaListenerContainerFactory<String, Model> containerFactory)
{
    ConcurrentMessageListenerContainer<String, Model> repliesContainer =
        containerFactory.createContainer("responseTopic");
    repliesContainer.getContainerProperties().setGroupId(UUID.randomUUID().toString());
    repliesContainer.setAutoStartup(false);
    return repliesContainer;
}

//Template for asynchronous call

@Bean

@Qualifier("kafkaTemplate")
public KafkaTemplate<String, Model> kafkaTemplate (
    ProducerFactory<String, Model> pf,
    ConcurrentKafkaListenerContainerFactory<String, Model> factory)
{
    KafkaTemplate<String, Model> kafkaTemplate = new KafkaTemplate<>(pf);
    factory.setReplyTemplate(kafkaTemplate);
    return kafkaTemplate;
}

Here is service class

@Service
public class KafkaService
{
    @Autowired
    private ReplyingKafkaTemplate<String, Model, Model> replyingKafkaTemplate;
    @Autowired
    private KafkaTemplate<String, Model> kafkaTemplate;
    @Autowired
    private KafkaConfig config;
    public Object sendAndReceive (Model model)
    {

        ProducerRecord<String, Model> producerRecord =
            new ProducerRecord("requestTopic", model);
        producerRecord.headers()
            .add(
                new RecordHeader(KafkaHeaders.REPLY_TOPIC, "replyTopic"));
        RequestReplyFuture<String, Model, Model> replyFuture =
            replyingKafkaTemplate.sendAndReceive(producerRecord, Duration.ofSeconds(timeout));
        ConsumerRecord<String, Model> consumerRecord =
            replyFuture.get(timeout, TimeUnit.SECONDS);
        return consumerRecord.value();

    }
    public ResponseEntity<Object> send (final Model model)
    {

        final ProducerRecord<String, Model> producerRecord =
            new ProducerRecord("requestTopic", model);
        final ListenableFuture<SendResult<String, Model>> future =
            kafkaTemplate.send(producerRecord);
        final SendResult<String, Model> sendResult = future.get(timeout, TimeUnit.SECONDS);
        return new ResponseEntity<>(sendResult, HttpStatus.ACCEPTED);
    }

}

Here is the listener class.

@Slf4j
@Service
public class MessageListener
{
    @KafkaListener(groupId = "${group.id}", topics = "requestTopic", containerFactory = "kafkaListenerContainerFactory")
    @SendTo
    public Model consumer (Model model)
    {
        switch (model.getType()) {
        case "async":
            System.out.println("Async messages are retrieved");
        case "sync":
            System.out.println("Sync messages are retrieved");
            return model;
        }
        return model;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory (
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
    {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =                new ConcurrentKafkaListenerContainerFactory<>();           
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }
}


No comments:

Post a Comment