I have implemented kafka stream application. Let's say one of the object's field which the stream is currently processing contains a number instead of a string value. Currently when there is an exception thrown in the processing logic eg. .transform() method, whole stream is killed and my application stops to process data.
I would like to skip such invalid record and keep processing next records available on a input topic. Additionally I don't want to implement any try-catch statements in my stream processing code.
To achieve this, I implemented StreamsUncaughtExceptionHandler so it returns StreamThreadExceptionResponse.REPLACE_THREAD enum in order to spawn new thread and keep on processing next records waiting on the input topic. However, it turned out that the stream consumer offset is not committed and when new a thread is started, it takes old record which just have killed the previous stream thread... Since the logic is the same, new thread will also fail to process the error record and again fail. Some kind of a loop spawning new thread and failing on a same record every time.
Is there any clean way of skipping failing record and keep the stream processing next records?
Please note, I am not asking about DeserializationExceptionHandler or ProductionExceptionHandler.
CodePudding user response:
When it comes to the application-level code, it is mostly up to the application how the exception is handled. This use case has come up before. See these previous Stack Overflow threads.
How to stop sending to kafka topic when control goes to catch block Functional kafka spring
Try to see if those answers can be applied to your scenario.
CodePudding user response:
You can filter the event that dont match a pattern or validate the events before you transform them
