2023-09-15

confluent schema registry doesn't validate schema for json data

I am trying to send json data to topic which is attached to schema registry. However, when I am sending data it's accepting everything.

client.properties

bootstrap.servers=pkc-xxx.northamerica-northeast1.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxx' password='Y0LiwuvfvDts/iQEXntX/yyy';
sasl.mechanism=PLAIN
key.serializer=org.apache.kafka.common.serialization.StringSerializer
#value.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
confluent.value.schema.validation=true
latest.compatibility.strict=false
auto.register.schemas=false
use.latest.version=true


# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://xxx.us-east1.gcp.confluent.cloud
basic.auth.credentials.source=USER_INFO
basic.auth.user.info=xxx:YrE7Oztqrx7FvGKF6Ofk+yyy+YySi

Topic Schema

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "description": "Sample schema to help you get started.",
  "properties": {
    "age": {
      "description": "The string type is used for strings of text.",
      "type": "string"
    }
  },
  "required": [
    "age"
  ],
  "title": "SampleRecord",
  "type": "object"
}
  @Test
    public void testMapUser() throws IOException {
        String topic = "gcp.test_user_schema";
        final Properties props = new Properties();
        InputStream inputStream = Files.newInputStream(Paths.get("src/main/resources/client.properties"));
        props.load(inputStream);
        Producer<String, Map> producer = new KafkaProducer<>(props);

        Gson gson = new Gson();
        File initialFile = new File("src/test/resources/user.json");
        InputStream in = Files.newInputStream(initialFile.toPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
        HashMap event = gson.fromJson(reader, HashMap.class);


        ProducerRecord<String, Map> kafkaRecord = new ProducerRecord<>(topic, event);

        // Send the record to Kafka topic
        producer.send(kafkaRecord);
        producer.send(kafkaRecord, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.println("Message sent successfully!");
                System.out.println("Partition: " + metadata.partition());
                System.out.println("Offset: " + metadata.offset());
            }
        });

        producer.flush();
    }

user.json

{
  "age": 30,
  "name": "gaurang"
}

Topic Details

enter image description here

Update: Use JsonNode object as suggest in comment but still the same issue. topic accepts all the messages.

   @Test
    public void TestJsonNodeObject() throws IOException, ExecutionException, InterruptedException {
        String topic = "gcp.test_user_schema";
        final Properties props = new Properties();
        InputStream inputStream = Files.newInputStream(Paths.get("src/main/resources/client.properties"));
        props.load(inputStream);
        Producer<String, JsonNode> producer = new KafkaProducer<>(props);
        
        File initialFile = new File("src/test/resources/user.json");
        InputStream in = Files.newInputStream(initialFile.toPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));

        ObjectMapper objectMapper = new ObjectMapper();
        JsonNode jsonNode = objectMapper.readTree(reader);
        ProducerRecord<String, JsonNode> kafkaRecord = new ProducerRecord<>(topic, jsonNode);

        // Send the record to Kafka topic
        producer.send(kafkaRecord).get();
        producer.flush();
    }



No comments:

Post a Comment