Re: BroadcastState dropped when data deleted in Kafka

2021-03-03 Thread bat man
I created a new descriptor and rulestream used it in the second process
function and this works fine.

public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

public static final MapStateDescriptor rulesDescriptor2 =

new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));


BroadcastStream rulesStream =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor);

BroadcastStream rulesStream2 =
rulesDataStream.broadcast(TransformDescriptors.Descriptors.rulesDescriptor2);


SingleOutputStreamOperator>
keyedSingleOutputStream =
rawEventStream.
connect(rulesStream).
process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

SingleOutputStreamOperator rtEventDataStream =
keyedSingleOutputStream.
keyBy((keyed) -> keyed.getKey()).
connect(rulesStream2).
process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Fri, Feb 26, 2021 at 3:38 PM Arvid Heise  wrote:

> Hi,
>
> I have no idea what's going on. There is no mechanism in DataStream to
> react to deleted records.
>
> Can you reproduce it locally and debug through it?
>
>
>
> On Wed, Feb 24, 2021 at 5:21 PM bat man  wrote:
>
>> Hi Arvid,
>>
>> The Flink application was not re-started. I had checked on that.
>> By adding rules to the state of process function you mean the state which
>> is local to the keyedprocess function?
>> From [1] what is being done here -
>>
>> final MapState> state = getRuntimeContext().
>> getMapState(mapStateDesc);
>>
>> state.put(ruleName, stored);
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>
>> Thanks.
>>
>>
>> On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:
>>
>>> Could you double-check if your Flink application was restarted between
>>> Kafka topic was cleared and the time you saw that the rules have been lost?
>>>
>>> I suspect that you deleted the Kafka topic and the Flink application
>>> then failed and restarted. Upon restart it read the empty rule topic.
>>>
>>> To solve it, you probably want to add the rules to the state of your
>>> process function [1]. If you have done that, I'm a bit lost.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>
>>> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>>>
 Hi,

 This is my code below -
 As mentioned earlier the rulesStream us again used in later processing.
 Below you can see the rulesStream is again connected with the result stream
 of the first process stream. Do you think this is the reason rules
 operators state getting overridden when the data in kafka is deleted?
 My question is if the data is not present in kafka then no data is read
 in stream how it is updating the existing state data.

 public static final MapStateDescriptor rulesDescriptor =
 new MapStateDescriptor<>(
 "rules", BasicTypeInfo.INT_TYPE_INFO, 
 TypeInformation.of(Rule.class));

 KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
 DataStream rawEventStream = 
 validateData(getRawEventStream(rawEventKafkaSource,env));

  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);

  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
  DataStream deviceDataStream = getDeviceStream(deviceSource,env);

  BroadcastStream rulesStream = 
 rulesDataStream.broadcast(rulesDescriptor);

  SingleOutputStreamOperator> 
 keyedSingleOutputStream =
  rawEventStream.
  connect(rulesStream).
  process(new 
 DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

  SingleOutputStreamOperator rtEventDataStream =
  keyedSingleOutputStream.
  keyBy((keyed) -> keyed.getKey()).
  connect(rulesStream).
  process(new 
 DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


 On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
 khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Deletion of messages in Kafka shouldn't affect Flink state in general.
> Probably, some operator in your pipeline is re-reading the topic
> and overwrites the state, dropping what was deleted by Kafka.
> Could you share the code?
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>
>> Hi,
>>
>> I have 2 streams one event data and the other rules. I broadcast the
>> rules stream and then ke

Re: BroadcastState dropped when data deleted in Kafka

2021-02-26 Thread Arvid Heise
Hi,

I have no idea what's going on. There is no mechanism in DataStream to
react to deleted records.

Can you reproduce it locally and debug through it?



On Wed, Feb 24, 2021 at 5:21 PM bat man  wrote:

> Hi Arvid,
>
> The Flink application was not re-started. I had checked on that.
> By adding rules to the state of process function you mean the state which
> is local to the keyedprocess function?
> From [1] what is being done here -
>
> final MapState> state = getRuntimeContext().getMapState
> (mapStateDesc);
>
> state.put(ruleName, stored);
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> Thanks.
>
>
> On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:
>
>> Could you double-check if your Flink application was restarted between
>> Kafka topic was cleared and the time you saw that the rules have been lost?
>>
>> I suspect that you deleted the Kafka topic and the Flink application then
>> failed and restarted. Upon restart it read the empty rule topic.
>>
>> To solve it, you probably want to add the rules to the state of your
>> process function [1]. If you have done that, I'm a bit lost.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>
>> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>>
>>> Hi,
>>>
>>> This is my code below -
>>> As mentioned earlier the rulesStream us again used in later processing.
>>> Below you can see the rulesStream is again connected with the result stream
>>> of the first process stream. Do you think this is the reason rules
>>> operators state getting overridden when the data in kafka is deleted?
>>> My question is if the data is not present in kafka then no data is read
>>> in stream how it is updating the existing state data.
>>>
>>> public static final MapStateDescriptor rulesDescriptor =
>>> new MapStateDescriptor<>(
>>> "rules", BasicTypeInfo.INT_TYPE_INFO, 
>>> TypeInformation.of(Rule.class));
>>>
>>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>>> DataStream rawEventStream = 
>>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>>
>>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>>
>>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>>>
>>>  BroadcastStream rulesStream = 
>>> rulesDataStream.broadcast(rulesDescriptor);
>>>
>>>  SingleOutputStreamOperator> 
>>> keyedSingleOutputStream =
>>>  rawEventStream.
>>>  connect(rulesStream).
>>>  process(new 
>>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>>
>>>  SingleOutputStreamOperator rtEventDataStream =
>>>  keyedSingleOutputStream.
>>>  keyBy((keyed) -> keyed.getKey()).
>>>  connect(rulesStream).
>>>  process(new 
>>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>>>
>>>
>>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi,

 Deletion of messages in Kafka shouldn't affect Flink state in general.
 Probably, some operator in your pipeline is re-reading the topic
 and overwrites the state, dropping what was deleted by Kafka.
 Could you share the code?

 Regards,
 Roman


 On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:

> Hi,
>
> I have 2 streams one event data and the other rules. I broadcast the
> rules stream and then key the data stream on event type. The connected
> stream is processed thereafter.
> We faced an issue where the rules data in the topic got deleted
> because of Kafka retention policy.
> Post this the existing rules data also got dropped in the broadcast
> state and the processing stopped.
>
> As per my understanding the rules which were present in broadcast
> state should still exist even if the data was deleted in Kafka as the 
> rules
> dats was already processed and stored in state map.
>
> PS: I’m reusing the rules stream as broadcast later in processing as
> well. Could this be an issue?
>
> Thanks,
> Hemant
>



Re: BroadcastState dropped when data deleted in Kafka

2021-02-24 Thread bat man
Hi Arvid,

The Flink application was not re-started. I had checked on that.
By adding rules to the state of process function you mean the state which
is local to the keyedprocess function?
>From [1] what is being done here -

final MapState> state = getRuntimeContext().getMapState(
mapStateDesc);

state.put(ruleName, stored);


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

Thanks.


On Wed, Feb 24, 2021 at 7:52 PM Arvid Heise  wrote:

> Could you double-check if your Flink application was restarted between
> Kafka topic was cleared and the time you saw that the rules have been lost?
>
> I suspect that you deleted the Kafka topic and the Flink application then
> failed and restarted. Upon restart it read the empty rule topic.
>
> To solve it, you probably want to add the rules to the state of your
> process function [1]. If you have done that, I'm a bit lost.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:
>
>> Hi,
>>
>> This is my code below -
>> As mentioned earlier the rulesStream us again used in later processing.
>> Below you can see the rulesStream is again connected with the result stream
>> of the first process stream. Do you think this is the reason rules
>> operators state getting overridden when the data in kafka is deleted?
>> My question is if the data is not present in kafka then no data is read
>> in stream how it is updating the existing state data.
>>
>> public static final MapStateDescriptor rulesDescriptor =
>> new MapStateDescriptor<>(
>> "rules", BasicTypeInfo.INT_TYPE_INFO, 
>> TypeInformation.of(Rule.class));
>>
>> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
>> DataStream rawEventStream = 
>> validateData(getRawEventStream(rawEventKafkaSource,env));
>>
>>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>>
>>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>>
>>  BroadcastStream rulesStream = 
>> rulesDataStream.broadcast(rulesDescriptor);
>>
>>  SingleOutputStreamOperator> 
>> keyedSingleOutputStream =
>>  rawEventStream.
>>  connect(rulesStream).
>>  process(new 
>> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>>
>>  SingleOutputStreamOperator rtEventDataStream =
>>  keyedSingleOutputStream.
>>  keyBy((keyed) -> keyed.getKey()).
>>  connect(rulesStream).
>>  process(new 
>> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>>
>>
>> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>>> Probably, some operator in your pipeline is re-reading the topic
>>> and overwrites the state, dropping what was deleted by Kafka.
>>> Could you share the code?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>>>
 Hi,

 I have 2 streams one event data and the other rules. I broadcast the
 rules stream and then key the data stream on event type. The connected
 stream is processed thereafter.
 We faced an issue where the rules data in the topic got deleted because
 of Kafka retention policy.
 Post this the existing rules data also got dropped in the broadcast
 state and the processing stopped.

 As per my understanding the rules which were present in broadcast state
 should still exist even if the data was deleted in Kafka as the rules dats
 was already processed and stored in state map.

 PS: I’m reusing the rules stream as broadcast later in processing as
 well. Could this be an issue?

 Thanks,
 Hemant

>>>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-24 Thread Arvid Heise
Could you double-check if your Flink application was restarted between
Kafka topic was cleared and the time you saw that the rules have been lost?

I suspect that you deleted the Kafka topic and the Flink application then
failed and restarted. Upon restart it read the empty rule topic.

To solve it, you probably want to add the rules to the state of your
process function [1]. If you have done that, I'm a bit lost.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

On Wed, Feb 24, 2021 at 7:30 AM bat man  wrote:

> Hi,
>
> This is my code below -
> As mentioned earlier the rulesStream us again used in later processing.
> Below you can see the rulesStream is again connected with the result stream
> of the first process stream. Do you think this is the reason rules
> operators state getting overridden when the data in kafka is deleted?
> My question is if the data is not present in kafka then no data is read in
> stream how it is updating the existing state data.
>
> public static final MapStateDescriptor rulesDescriptor =
> new MapStateDescriptor<>(
> "rules", BasicTypeInfo.INT_TYPE_INFO, 
> TypeInformation.of(Rule.class));
>
> KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
> DataStream rawEventStream = 
> validateData(getRawEventStream(rawEventKafkaSource,env));
>
>  rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
>  DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);
>
>  deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
>  DataStream deviceDataStream = getDeviceStream(deviceSource,env);
>
>  BroadcastStream rulesStream = 
> rulesDataStream.broadcast(rulesDescriptor);
>
>  SingleOutputStreamOperator> 
> keyedSingleOutputStream =
>  rawEventStream.
>  connect(rulesStream).
>  process(new 
> DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);
>
>  SingleOutputStreamOperator rtEventDataStream =
>  keyedSingleOutputStream.
>  keyBy((keyed) -> keyed.getKey()).
>  connect(rulesStream).
>  process(new 
> DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);
>
>
> On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi,
>>
>> Deletion of messages in Kafka shouldn't affect Flink state in general.
>> Probably, some operator in your pipeline is re-reading the topic
>> and overwrites the state, dropping what was deleted by Kafka.
>> Could you share the code?
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>>
>>> Hi,
>>>
>>> I have 2 streams one event data and the other rules. I broadcast the
>>> rules stream and then key the data stream on event type. The connected
>>> stream is processed thereafter.
>>> We faced an issue where the rules data in the topic got deleted because
>>> of Kafka retention policy.
>>> Post this the existing rules data also got dropped in the broadcast
>>> state and the processing stopped.
>>>
>>> As per my understanding the rules which were present in broadcast state
>>> should still exist even if the data was deleted in Kafka as the rules dats
>>> was already processed and stored in state map.
>>>
>>> PS: I’m reusing the rules stream as broadcast later in processing as
>>> well. Could this be an issue?
>>>
>>> Thanks,
>>> Hemant
>>>
>>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-23 Thread bat man
Hi,

This is my code below -
As mentioned earlier the rulesStream us again used in later processing.
Below you can see the rulesStream is again connected with the result stream
of the first process stream. Do you think this is the reason rules
operators state getting overridden when the data in kafka is deleted?
My question is if the data is not present in kafka then no data is read in
stream how it is updating the existing state data.

public static final MapStateDescriptor rulesDescriptor =
new MapStateDescriptor<>(
"rules", BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(Rule.class));

KafkaSource = getKafkaSource(config.get(RAW_EVENT_TOPIC));
DataStream rawEventStream =
validateData(getRawEventStream(rawEventKafkaSource,env));

 rulesKafkaSource = getKafkaSource(config.get(RULES_TOPIC));
 DataStream rulesDataStream = getRulesStream(rulesKafkaSource,env);

 deviceSource = getKafkaSource(config.get(DEVICE_EVENT_TOPIC));
 DataStream deviceDataStream = getDeviceStream(deviceSource,env);

 BroadcastStream rulesStream = rulesDataStream.broadcast(rulesDescriptor);

 SingleOutputStreamOperator>
keyedSingleOutputStream =
 rawEventStream.
 connect(rulesStream).
 process(new
DynamicKeyFunction()).name("keyed").uid("keyed").setParallelism(5);

 SingleOutputStreamOperator rtEventDataStream =
 keyedSingleOutputStream.
 keyBy((keyed) -> keyed.getKey()).
 connect(rulesStream).
 process(new
DynamicTransformFunction()).name("rt").uid("rt").setParallelism(5);


On Tue, Feb 23, 2021 at 10:32 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi,
>
> Deletion of messages in Kafka shouldn't affect Flink state in general.
> Probably, some operator in your pipeline is re-reading the topic
> and overwrites the state, dropping what was deleted by Kafka.
> Could you share the code?
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:
>
>> Hi,
>>
>> I have 2 streams one event data and the other rules. I broadcast the
>> rules stream and then key the data stream on event type. The connected
>> stream is processed thereafter.
>> We faced an issue where the rules data in the topic got deleted because
>> of Kafka retention policy.
>> Post this the existing rules data also got dropped in the broadcast state
>> and the processing stopped.
>>
>> As per my understanding the rules which were present in broadcast state
>> should still exist even if the data was deleted in Kafka as the rules dats
>> was already processed and stored in state map.
>>
>> PS: I’m reusing the rules stream as broadcast later in processing as
>> well. Could this be an issue?
>>
>> Thanks,
>> Hemant
>>
>


Re: BroadcastState dropped when data deleted in Kafka

2021-02-23 Thread Khachatryan Roman
Hi,

Deletion of messages in Kafka shouldn't affect Flink state in general.
Probably, some operator in your pipeline is re-reading the topic
and overwrites the state, dropping what was deleted by Kafka.
Could you share the code?

Regards,
Roman


On Tue, Feb 23, 2021 at 7:12 AM bat man  wrote:

> Hi,
>
> I have 2 streams one event data and the other rules. I broadcast the rules
> stream and then key the data stream on event type. The connected stream is
> processed thereafter.
> We faced an issue where the rules data in the topic got deleted because of
> Kafka retention policy.
> Post this the existing rules data also got dropped in the broadcast state
> and the processing stopped.
>
> As per my understanding the rules which were present in broadcast state
> should still exist even if the data was deleted in Kafka as the rules dats
> was already processed and stored in state map.
>
> PS: I’m reusing the rules stream as broadcast later in processing as well.
> Could this be an issue?
>
> Thanks,
> Hemant
>


BroadcastState dropped when data deleted in Kafka

2021-02-22 Thread bat man
Hi,

I have 2 streams one event data and the other rules. I broadcast the rules
stream and then key the data stream on event type. The connected stream is
processed thereafter.
We faced an issue where the rules data in the topic got deleted because of
Kafka retention policy.
Post this the existing rules data also got dropped in the broadcast state
and the processing stopped.

As per my understanding the rules which were present in broadcast state
should still exist even if the data was deleted in Kafka as the rules dats
was already processed and stored in state map.

PS: I’m reusing the rules stream as broadcast later in processing as well.
Could this be an issue?

Thanks,
Hemant