Can't deserialize data in the Kafka Stream using Spring Cloud Streams
I am creating a simple Kafka Streaming application. My Producer is producing protobuf serialize messages to one topic and I'm using that topic in Kafka Streaming application to consumer messages. I'm trying to deserialize messages using valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
in my application.yml file. I'm getting below errors.
Errors:
org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 2, 0, 10, 13, 84, 105, 109, 101, 114, 32, 109, 101, 115, 115, 97, 103, 101, 16, 1, 34, 12, 8, -126, -107, -127, -120, 6, 16, -12, -88, -117, -114, 2]] from topic [MYINPUT-TOPIC]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1ff0a0d (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[jackson-core-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[jackson-core-2.11.3.jar:2.11.3]
My application.yml
configuration file:
spring:
cloud:
function:
definition: process
stream:
bindings:
process-in-0:
consumer:
max-attempts: 3
valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
back-off-initial-interval: 100
retryable-exceptions:
javax:
validation:
ValidationException: false
destination: MYINPUT-TOPIC
group: consumer-group
concurrency: 2
kafka:
streams:
binder:
brokers: localhost:9092
schema-registry-u-r-l: http://localhost:8081
auto-offset-reset: "earliest"
configuration:
commit-interval-ms: 100
process method to print actual deserialize message in the log:
@Component
@Slf4j
public class ProcessStream {
//Here below Timer object is Protobuf's auto-generated class, I am using it to deserialize messages.
//I'm getting byte Aarry on this method when I'm debugging it.
@Bean
public Consumer<KStream<String, Timer>> process() {
return (InputStream) -> {
InputStream.foreach((k,v) -> log.info(String.format("key: %s, value: %s",k, v)));
};
}
}
Please help me to resolve this issue. How can I deserialize messages using protobuf in Kafka Stream?
from Recent Questions - Stack Overflow https://ift.tt/37628zh
https://ift.tt/eA8V8J
Comments
Post a Comment