Problem with Kafka deserialization Python
I new in Kafka and Python but I should create consumer:)
I created simple consumer and got result, but the data in Kafka is store in Avro that's why I need to make deserialization.
I tried variant like this
import os
from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
if __name__ == "__main__":
class test(object):
def __init__(self,test_id=None,dep=None,descr=None,stor_key=None,pos=None,time_dt=None):
self.test_id = test_id
self.dep = dep
self.descr = descr
self.stor_key = stor_key
self.pos = pos
self.time_dt = time_dt
def dict_to_klf(obj, ctx):
if obj is None:
return None
return test(test_id=obj['test_id'],
dep=obj['dep'],
descr=obj['descr'],
stor_key=obj['stor_key'],
pos=obj['pos'],
time_dt=obj['time_dt'])
schema = "descr.avsc"
path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}\\{schema}") as f:
schema_str = f.read()
sr_conf = {'url': ':8081'}
schema_registry_client = SchemaRegistryClient(sr_conf)
avro_deserializer = AvroDeserializer(schema_registry_client,
schema_str,
dict_to_klf)
consumer_config = {
"bootstrap.servers": "com:9092",
"group.id": "descr_events",
"auto.offset.reset": "earliest"
}
consumer = Consumer(consumer_config)
consumer.subscribe(['descr'])
while True:
msg = consumer.poll(1)
if msg is None:
continue
user = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
print(msg.topic())
print("-------------------------")
And got error
fastavro._schema_common.SchemaParseException: Default value <undefined> must match schema type: long
SCHEMA_PATH = "descr.avsc" looks like
{
"type": "record",
"name": "klf",
"namespace": "test_ns",
"fields": [
{
"name": "descr",
"type": "string",
"default": "undefined"
},
{
"name": "test_id",
"type": "long",
"default": "undefined"
},
{
"name": "dep",
"type": "string",
"default": "undefined"
},
{
"name": "stor_key",
"type": "string",
"default": "undefined"
},
{
"name": "time_dt",
"type": "string",
"default": "undefined"
},
{
"name": "pos",
"type": "string",
"default": "undefined"
}
]
}
What will I need to change to get the result with data?
Comments
Post a Comment