I would like to know if there is a way to drop a record/message from a stream Topology?
I have a setup like the following:
builder.stream("my-source-topic")
.map(CustomMapper)
.mapValues(CustomValueMapper)
.filterNot(CustomFilter)
.transformValues(CustomValueTransformer)
.toStream()
Each CustomMapper/CustomFilter etc overrides their respective apply/transform methods they could look like the following, as noted the error might be unrecoverable and this is an ok solution these messages will be handled manually and a respective log is written. Assuming the unrecoverable error happens during the first map how do i now prevent the later Stages from even processing the record, i would like to stop the processing of this record and move to the next record.
@Override
public V transform(K readOnlyKey, V value) {
try {
// do some logic
} catch(Exception e){
// process error - this might be unrecoverable.
dropRecord(); // this is what i would be looking for if possible
}
}
I could kill the thread and have a customUncaughtExceptionHandler reschedule the thread which would not commit the offset and therefor try to process the faulty record again.
Creating a wrapper for the objects passed would require to add a check in each proccessing step to see if the record is still valid.
Adding a .branch() before each processing step to would also require a decent amount of rework.
from Recent Questions - Stack Overflow https://ift.tt/3G46TsH
https://ift.tt/eA8V8J
No comments:
Post a Comment