2023-02-28

Problem with Kafka deserialization Python

I new in Kafka and Python but I should create consumer:)

I created simple consumer and got result, but the data in Kafka is store in Avro that's why I need to make deserialization.

I tried variant like this

import os
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer


if __name__ == "__main__":

    class test(object):

        def __init__(self,test_id=None,dep=None,descr=None,stor_key=None,pos=None,time_dt=None):
            self.test_id = test_id
            self.dep = dep
            self.descr = descr
            self.stor_key = stor_key
            self.pos = pos
            self.time_dt = time_dt


def dict_to_klf(obj, ctx):

   if obj is None:
        return None

   return test(test_id=obj['test_id'],
                    dep=obj['dep'],
                    descr=obj['descr'],
                    stor_key=obj['stor_key'],
                    pos=obj['pos'],
                    time_dt=obj['time_dt'])

schema = "descr.avsc"

path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}\\{schema}") as f:
        schema_str = f.read()
        
sr_conf = {'url': ':8081'}
schema_registry_client = SchemaRegistryClient(sr_conf)     


avro_deserializer = AvroDeserializer(schema_registry_client,
                                         schema_str,
                                         dict_to_klf)   

consumer_config = {
        "bootstrap.servers": "com:9092",
        "group.id": "descr_events",
        "auto.offset.reset": "earliest"
                  }

consumer = Consumer(consumer_config)

consumer.subscribe(['descr'])

while True:
        msg = consumer.poll(1)
        if msg is None:
          continue

        user = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))

        print(msg.topic())
        print("-------------------------")

And got error

fastavro._schema_common.SchemaParseException: Default value <undefined> must match schema type: long

SCHEMA_PATH = "descr.avsc" looks like

{
    "type": "record",
    "name": "klf",
    "namespace": "test_ns",
    "fields": [ 
        {
            "name": "descr",
            "type": "string",
            "default": "undefined"
        },
        {
            "name": "test_id",
            "type": "long",
            "default": "undefined"
        },
        {
            "name": "dep",
            "type": "string",
            "default": "undefined"
        },
        {
            "name": "stor_key",
            "type": "string",
            "default": "undefined"
        },
        {
            "name": "time_dt",
            "type": "string",
            "default": "undefined"
        },
        {
            "name": "pos",
            "type": "string",
            "default": "undefined"
        }
    ]
    
}

What will I need to change to get the result with data?



No comments:

Post a Comment