Hi,

This is my code below -
As mentioned earlier the rulesStream us again used in later processing.
Below you can see the rulesStream is again connected with the result stream
of the first process stream. Do you think this is the reason rules
operators state getting overridden when the data in kafka is deleted?
My question is if the data is not present in kafka then no data is read in
stream how it is updating the existing state data.

public static final MapStateDescriptor<Integer, Rule> rulesDescriptor =
        new MapStateDescriptor<>(
                "rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
DataStream<RawEvent> rawEventStream =
validateData(getRawEventStream(rawEventKafkaSource,env));

 rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
 DataStream<Rule> rulesDataStream = getRulesStream(rulesKafkaSource,env);

 deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
 DataStream<Device> deviceDataStream = getDeviceStream(deviceSource,env);

 BroadcastStream<Rule> rulesStream = rulesDataStream.broadcast(rulesDescriptor);

 SingleOutputStreamOperator<Keyed<RawEvent, String, Integer>>
keyedSingleOutputStream =
         rawEventStream.
                 connect(rulesStream).
                 process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

 SingleOutputStreamOperator<RTEvent> rtEventDataStream =
         keyedSingleOutputStream.
                 keyBy((keyed) -> keyed.getKey()).
                 connect(rulesStream).
                 process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Deletion of messages in Kafka shouldn't affect Flink state in general.
> Probably, some operator in your pipeline is re-reading the topic
> and overwrites the state, dropping what was deleted by Kafka.
> Could you share the code?
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 7:12 AM bat man <tintin0...@gmail.com> wrote:
>
>> Hi,
>>
>> I have 2 streams one event data and the other rules. I broadcast the
>> rules stream and then key the data stream on event type. The connected
>> stream is processed thereafter.
>> We faced an issue where the rules data in the topic got deleted because
>> of Kafka retention policy.
>> Post this the existing rules data also got dropped in the broadcast state
>> and the processing stopped.
>>
>> As per my understanding the rules which were present in broadcast state
>> should still exist even if the data was deleted in Kafka as the rules dats
>> was already processed and stored in state map.
>>
>> PS: I’m reusing the rules stream as broadcast later in processing as
>> well. Could this be an issue?
>>
>> Thanks,
>> Hemant
>>
>

Reply via email to