2021-12-04

Drop a message Kafka Streams Topology

I would like to know if there is a way to drop a record/message from a stream Topology?

I have a setup like the following:

 builder.stream("my-source-topic")
                .map(CustomMapper)
                .mapValues(CustomValueMapper)
                .filterNot(CustomFilter)
                .transformValues(CustomValueTransformer)
                .toStream()

Each CustomMapper/CustomFilter etc overrides their respective apply/transform methods they could look like the following, as noted the error might be unrecoverable and this is an ok solution these messages will be handled manually and a respective log is written. Assuming the unrecoverable error happens during the first map how do i now prevent the later Stages from even processing the record, i would like to stop the processing of this record and move to the next record.

@Override
    public V transform(K readOnlyKey, V value) {
        try {
        // do some logic
        } catch(Exception e){
            // process error - this might be unrecoverable.
            
            dropRecord(); // this is what i would be looking for if possible
        }
    }

I could kill the thread and have a customUncaughtExceptionHandler reschedule the thread which would not commit the offset and therefor try to process the faulty record again.

Creating a wrapper for the objects passed would require to add a check in each proccessing step to see if the record is still valid.

Adding a .branch() before each processing step to would also require a decent amount of rework.



from Recent Questions - Stack Overflow https://ift.tt/3G46TsH
https://ift.tt/eA8V8J

No comments:

Post a Comment