2021-07-30

Can't deserialize data in the Kafka Stream using Spring Cloud Streams

I am creating a simple Kafka Streaming application. My Producer is producing protobuf serialize messages to one topic and I'm using that topic in Kafka Streaming application to consumer messages. I'm trying to deserialize messages using valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde in my application.yml file. I'm getting below errors.

Errors:

org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 2, 0, 10, 13, 84, 105, 109, 101, 114, 32, 109, 101, 115, 115, 97, 103, 101, 16, 1, 34, 12, 8, -126, -107, -127, -120, 6, 16, -12, -88, -117, -114, 2]] from topic [MYINPUT-TOPIC]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1ff0a0d (above 0x0010ffff) at char #1, byte #7)
    at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[jackson-core-2.11.3.jar:2.11.3]
    at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[jackson-core-2.11.3.jar:2.11.3] 

My application.yml configuration file:

spring:
  cloud:
    function:
      definition: process
    stream:
      bindings:
        process-in-0:
          consumer:
            max-attempts: 3
            valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
            back-off-initial-interval: 100
            retryable-exceptions:
              javax:
                validation:
                  ValidationException: false
          destination: MYINPUT-TOPIC
          group: consumer-group
          concurrency: 2
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            schema-registry-u-r-l: http://localhost:8081
            auto-offset-reset: "earliest"
            configuration:
              commit-interval-ms: 100

process method to print actual deserialize message in the log:

@Component
@Slf4j
public class ProcessStream {

//Here below Timer object is Protobuf's auto-generated class, I am using it to deserialize messages. 
//I'm getting byte Aarry on this method when I'm debugging it.  
    @Bean
    public Consumer<KStream<String, Timer>> process() {
        return (InputStream) -> {
            InputStream.foreach((k,v) -> log.info(String.format("key: %s, value: %s",k, v)));
        };
    }
}

Please help me to resolve this issue. How can I deserialize messages using protobuf in Kafka Stream?



from Recent Questions - Stack Overflow https://ift.tt/37628zh
https://ift.tt/eA8V8J

No comments:

Post a Comment