confluent schema registry doesn't validate schema for json data
I am trying to send json data to topic which is attached to schema registry. However, when I am sending data it's accepting everything.
client.properties
bootstrap.servers=pkc-xxx.northamerica-northeast1.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='Y0LiwuvfvDts/iQEXntX/yyy';
sasl.mechanism=PLAIN
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
confluent.value.schema.validation=true
latest.compatibility.strict=false
auto.register.schemas=false
use.latest.version=true
# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://xxx.us-east1.gcp.confluent.cloud
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=xxx:YrE7Oztqrx7FvGKF6Ofk+yyy+YySi
Topic Schema
{
"$id": "http://example.com/myURI.schema.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": false,
"description": "Sample schema to help you get started.",
"properties": {
"age": {
"description": "The string type is used for strings of text.",
"type": "string"
}
},
"required": [
"age"
],
"title": "SampleRecord",
"type": "object"
}
@Test
public void testMapUser() throws IOException {
String topic = "gcp.test_user_schema";
final Properties props = new Properties();
InputStream inputStream = Files.newInputStream(Paths.get("src/main/resources/client.properties"));
props.load(inputStream);
Producer<String, Map> producer = new KafkaProducer<>(props);
Gson gson = new Gson();
File initialFile = new File("src/test/resources/user.json");
InputStream in = Files.newInputStream(initialFile.toPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
HashMap event = gson.fromJson(reader, HashMap.class);
ProducerRecord<String, Map> kafkaRecord = new ProducerRecord<>(topic, event);
// Send the record to Kafka topic
producer.send(kafkaRecord);
producer.send(kafkaRecord, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Message sent successfully!");
System.out.println("Partition: " + metadata.partition());
System.out.println("Offset: " + metadata.offset());
}
});
producer.flush();
}
user.json
{
"age": 30,
"name": "gaurang"
}
Topic Details
Update: Use JsonNode object as suggest in comment but still the same issue. topic accepts all the messages.
@Test
public void TestJsonNodeObject() throws IOException, ExecutionException, InterruptedException {
String topic = "gcp.test_user_schema";
final Properties props = new Properties();
InputStream inputStream = Files.newInputStream(Paths.get("src/main/resources/client.properties"));
props.load(inputStream);
Producer<String, JsonNode> producer = new KafkaProducer<>(props);
File initialFile = new File("src/test/resources/user.json");
InputStream in = Files.newInputStream(initialFile.toPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(reader);
ProducerRecord<String, JsonNode> kafkaRecord = new ProducerRecord<>(topic, jsonNode);
// Send the record to Kafka topic
producer.send(kafkaRecord).get();
producer.flush();
}
Comments
Post a Comment