Hi,
You can “skip” the corrupted message by returning `null` from the deserialize
method on the user-provided DeserializationSchema.
This lets the Kafka connector consider the record as processed, advances the
offset, but doesn’t emit anything downstream for it.
Hope this helps!
Cheers,
Gordon
Hi,
You can use FlatMap instead of Map, and only collect valid elements.
Regards,
Kien
On 6/20/2018 7:57 AM, chrisr123 wrote:
First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.
First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.
Use Case:
Use checkpointing.
Read a stream of integers from Kafka, keep a running sum.
If a "bad" Kafka message read, restart app, s