How to write Junit5 test cases in Springboot for apache camel Producer Template sendBodyAndHeader() to publish a json event to kafka
Need a help to write junit5 test case in springboot for a post api where it uses apache camel producer template to send message to kafka. Please find the controller class details for which junit test cases are required. Note-I don't have any service/repository layer for this.This is standalone controller which is responsible to publish message to kafka by using camel producer template.Thanks is advance.
Controller Class-->
`
`import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rms.inventory.savr.audit.model.AuditInfo;
import java.util.Map;
import javax.annotation.PostConstruct;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MockKafkaProducerController {
@Autowired ProducerTemplate producerTemplate;
@Value("${audit.inbound.endpoint.kafka.uri:audit-json-topic}")
private String auditTopicKafka;
@Value("${camel.component.kafka.consumer.supply}")
private String supplyLineUpdateKafkaTopic;
@Value("${camel.component.kafka.consumer.demand}")
private String demandLineUpdateKafkaTopic;
@Value("${camel.component.kafka.consumer.supply-bucket}")
private String supplybucketTopicKafka;
@Value("${camel.component.kafka.consumer.demand-bucket}")
private String demandbucketTopicKafka;
@Value("${camel.component.kafka.consumer.availability}")
private String availabilityTopicKafka;
private Map<String, String> dataTypeTopicMap;
@PostConstruct
void init() {
dataTypeTopicMap =
Map.of(
"SUPPLY_LINE",
supplyLineUpdateKafkaTopic,
"SUPPLY_BUCKET",
supplybucketTopicKafka,
"DEMAND_LINE",
demandLineUpdateKafkaTopic,
"DEMAND_BUCKET",
demandbucketTopicKafka,
"AVAILABILITY",
availabilityTopicKafka);
}
@PostMapping("/api/mock/producer")
public ResponseEntity<Boolean> saveAuditInfo(@RequestBody AuditInfo auditInfo)
public ResponseEntity<Boolean> saveAuditInfo(
@RequestBody AuditInfo auditInfo, @RequestHeader("AUDIT_TYPE") String auditType)
throws JsonProcessingException {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
if (auditType == null || auditType.isEmpty()) {
auditType = "SUPPLY_LINE";
}
String topicName = dataTypeTopicMap.get(auditType);
// producerTemplate.
producerTemplate.sendBodyAndHeader(
auditTopicKafka, objectMapper.writeValueAsString(auditInfo), "messageFormat", "CANONICAL");
topicName, objectMapper.writeValueAsString(auditInfo), "messageFormat", "CANONICAL");
return ResponseEntity.ok(Boolean.TRUE);
}
@PostMapping("/api/inventory/audit/mock/producer")
public ResponseEntity<Boolean> publishInventoryAudit(@RequestBody String auditInfo)
public ResponseEntity<Boolean> publishInventoryAudit(
@RequestBody String auditInfo, @RequestHeader("AUDIT_TYPE") String auditType)
throws JsonProcessingException {
producerTemplate.sendBody(auditTopicKafka, auditInfo);
if (auditType == null || auditType.isEmpty()) {
auditType = "SUPPLY_LINE";
}
String topicName = dataTypeTopicMap.get(auditType);
producerTemplate.sendBody(topicName, auditInfo);
return ResponseEntity.ok(Boolean.TRUE);
}
}`
`
I tried to mock producer template but not able to fix.
Comments
Post a Comment