I have flink job and the current flow looks like below Source_Kafka->*operator-1*(key by partition)->*operator-2*(enrich the record)-*Sink1-Operator* & *Sink2-Operator *
With this flow the current problem is at operator-2, the core logic runs here is to lookup some reference status data from redis cache and enrich the stream, this works fine if job runs well but recently I saw if job failed at this operator or sink operators, entire jobs gets restarts and stream gets repossessed from source, that causes different reference status(if reference status in cache changes during this restart) in enrichment, as per the business requirement I need to enrich with reference status when stream received at my job. 1. Is there any way to just reprocess sink1,sink2 operators? 2. How to just resume Sink2 during some cases like Sink-1 was successful where Sink2 failed