2023-03-29

How can I configure AMQ broker on Openshift 4 to ensure messages are consumed by only one pod in a publish-subscriber model?

I am using Openshift 4 to deploy multiple microservices which are connected via an AMQ broker using a publish-subscribe messaging model. However, when I increase number of pod to 2, I am running into an issue where all pods are consuming the same message, rather than just one.

Can someone suggest how to configure java code below or the AMQ broker to ensure that messages are consumed by only one pod in a publish-subscriber model? Are there any specific settings I should be aware of or changes I need to make to my configuration? Thank you.

Configuration:

@Bean
public JmsListenerContainerFactory<DefaultMessageListenerContainer> jmsListenerContainerPublisherFactory(ConnectionFactory connectionFactory,
                                                                                                         DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(throwable -> {
        log.info("An error has occurred in the transaction: " + throwable.getMessage());
        log.error("Error: ", throwable);
    });
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(true);
    return factory;
}

Listener:

@JmsListener(destination = "${queue.dummyObject}",
        containerFactory = "jmsListenerContainerPublisherFactory")
public void onConsumePublishedMessage(String message) throws JsonProcessingException {
    DummyObjectDTO dummyObjectDTO = mapper.readValue(message, DummyObjectDTO.class);
    LOG.info(" Received onConsumePublishedMessage message : " + dummyObjectDTO);
}

Producer:

private  <T> T  sendFeatured(T value, String queue, boolean publish, Selector selector) {
    *
    *
    jmsTemplate.setPubSubDomain(true);
    jmsTemplate.convertAndSend(queue, objectAsJson);
    *
    *
    return value;
}


No comments:

Post a Comment