Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Aljoscha Krettek
Still not nice, though, and it took a while to finalise discovery for 1.4. ;-)

If you need that now you might be able to back port the 1.4 consumer to 1.3.

> On 12. Oct 2017, at 17:05, Gyula Fóra  wrote:
> 
> Ok, thanks for the clarification. :)
> 
> Gyula
> 
> 
> On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek  > wrote:
> It might be old but it's not forgotten, the issue I created is actually 
> marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.
> 
> The issue in Kafka is about new topics/partitions not being discovered or 
> something else? That would be the expected behaviour in Flink < 1.4.0.
> 
> Best,
> Aljoscha
> 
>> On 12. Oct 2017, at 16:40, Gyula Fóra > > wrote:
>> 
>> Hey,
>> 
>> I know it's old discussion but there also seems to be a problem with the 
>> logic in the kafka source alone regarding new topics added after a 
>> checkpoint. 
>> 
>> Maybe there is a ticket for this already and I just missed it.
>> 
>> Cheers,
>> Gyula
>> 
>> Gyula Fóra > ezt írta (időpont: 
>> 2017. szept. 14., Cs, 14:53):
>> Good job for figuring this out!
>> This certainly seems to explain our problems.
>> 
>> Thanks!
>> Gyula
>> 
>> Aljoscha Krettek > ezt írta 
>> (időpont: 2017. szept. 14., Cs, 14:46):
>> After a bit more digging I found that the "isRestored" flag doesn't work 
>> correctly if there are operators chained to the sink that have state: 
>> https://issues.apache.org/jira/browse/FLINK-7623 
>> 
>> 
>> Blocker issue for 1.3.3 and 1.4.0.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Sep 2017, at 16:05, Aljoscha Krettek >> > wrote:
>>> 
>>> After discussing this between Stefan and me we think that this should 
>>> actually work.
>>> 
>>> Do you have the log output from restoring the Kafka Consumer? It would be 
>>> interesting to see whether any of those print:
>>>  - 
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>>  
>>> 
>>>  - 
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>>  
>>> 
>>> 
 On 6. Sep 2017, at 14:45, Aljoscha Krettek > wrote:
 
 Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
 which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
 store state in a union state, i.e. all sources get all partition on 
 restore and if they didn't get any they know that they are new. There is 
 no specific logic for detecting this situation, it's just that the 
 partition discoverer will be seeded with this information and it will know 
 if it discovers a new partition whether it can take ownership of that 
 partition.
 
 I'm sure Gordon (cc'ed) could explain it better than I did.
 
> On 6. Sep 2017, at 14:36, Gyula Fóra  > wrote:
> 
> Wouldnt it be enough that Kafka sources store some empty container for 
> there state if it is empty, compared to null when it should be 
> bootstrapped again?
> 
> Gyula
> 
> Aljoscha Krettek > ezt 
> írta (időpont: 2017. szept. 6., Sze, 14:31):
> The problem here is that context.isRestored() is a global flag and not 
> local to each operator. It says "yes this job was restored" but the 
> source would need to know that it is actually brand new and never had any 
> state. This is quite tricky to do, since there is currently no way (if 
> I'm correct) to differentiate between "I got empty state but others maybe 
> got state" and "this source never had state and neither had other 
> parallel instances".
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 13:56, Stefan Richter > > wrote:
>> 
>> Thanks for the report, I will take a look.
>> 
>>> Am 06.09.2017 um 11:48 schrieb Gyula 

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Gyula Fóra
Ok, thanks for the clarification. :)

Gyula

On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek  wrote:

> It might be old but it's not forgotten, the issue I created is actually
> marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.
>
> The issue in Kafka is about new topics/partitions not being discovered or
> something else? That would be the expected behaviour in Flink < 1.4.0.
>
> Best,
> Aljoscha
>
> On 12. Oct 2017, at 16:40, Gyula Fóra  wrote:
>
> Hey,
>
> I know it's old discussion but there also seems to be a problem with the
> logic in the kafka source alone regarding new topics added after a
> checkpoint.
>
> Maybe there is a ticket for this already and I just missed it.
>
> Cheers,
> Gyula
>
> Gyula Fóra  ezt írta (időpont: 2017. szept. 14., Cs,
> 14:53):
>
>> Good job for figuring this out!
>> This certainly seems to explain our problems.
>>
>> Thanks!
>> Gyula
>>
>> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
>> 14., Cs, 14:46):
>>
>>> After a bit more digging I found that the "isRestored" flag doesn't work
>>> correctly if there are operators chained to the sink that have state:
>>> https://issues.apache.org/jira/browse/FLINK-7623
>>>
>>> Blocker issue for 1.3.3 and 1.4.0.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Sep 2017, at 16:05, Aljoscha Krettek  wrote:
>>>
>>> After discussing this between Stefan and me we think that this should
>>> actually work.
>>>
>>> Do you have the log output from restoring the Kafka Consumer? It would
>>> be interesting to see whether any of those print:
>>>  -
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>>  -
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>>
>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
>>>
>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT
>>> consumer which also has discovery of new partitions. Starting from
>>> 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all
>>> partition on restore and if they didn't get any they know that they are
>>> new. There is no specific logic for detecting this situation, it's just
>>> that the partition discoverer will be seeded with this information and it
>>> will know if it discovers a new partition whether it can take ownership of
>>> that partition.
>>>
>>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>>
>>> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
>>>
>>> Wouldnt it be enough that Kafka sources store some empty container for
>>> there state if it is empty, compared to null when it should be bootstrapped
>>> again?
>>>
>>> Gyula
>>>
>>> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
>>> 6., Sze, 14:31):
>>>
 The problem here is that context.isRestored() is a global flag and not
 local to each operator. It says "yes this job was restored" but the source
 would need to know that it is actually brand new and never had any state.
 This is quite tricky to do, since there is currently no way (if I'm
 correct) to differentiate between "I got empty state but others maybe got
 state" and "this source never had state and neither had other parallel
 instances".

 Best,
 Aljoscha

 On 6. Sep 2017, at 13:56, Stefan Richter 
 wrote:

 Thanks for the report, I will take a look.

 Am 06.09.2017 um 11:48 schrieb Gyula Fóra :

 Hi all,

 We are running into some problems with the kafka source after changing
 the uid and restoring from the savepoint.
 What we are expecting is to clear the partition state, and set it up
 all over again, but what seems to happen is that the consumer thinks that
 it doesnt have any partitions assigned.

 This was supposed to be fixed in
 https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
 but appears to be reworked/reverted in the latest release :
 https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547

 What is the expected behaviour here?

 Thanks!
 Gyula




>>>
>>>
>>>
>


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Aljoscha Krettek
It might be old but it's not forgotten, the issue I created is actually marked 
as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.

The issue in Kafka is about new topics/partitions not being discovered or 
something else? That would be the expected behaviour in Flink < 1.4.0.

Best,
Aljoscha

> On 12. Oct 2017, at 16:40, Gyula Fóra  wrote:
> 
> Hey,
> 
> I know it's old discussion but there also seems to be a problem with the 
> logic in the kafka source alone regarding new topics added after a 
> checkpoint. 
> 
> Maybe there is a ticket for this already and I just missed it.
> 
> Cheers,
> Gyula
> 
> Gyula Fóra > ezt írta (időpont: 
> 2017. szept. 14., Cs, 14:53):
> Good job for figuring this out!
> This certainly seems to explain our problems.
> 
> Thanks!
> Gyula
> 
> Aljoscha Krettek > ezt írta 
> (időpont: 2017. szept. 14., Cs, 14:46):
> After a bit more digging I found that the "isRestored" flag doesn't work 
> correctly if there are operators chained to the sink that have state: 
> https://issues.apache.org/jira/browse/FLINK-7623 
> 
> 
> Blocker issue for 1.3.3 and 1.4.0.
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 16:05, Aljoscha Krettek > > wrote:
>> 
>> After discussing this between Stefan and me we think that this should 
>> actually work.
>> 
>> Do you have the log output from restoring the Kafka Consumer? It would be 
>> interesting to see whether any of those print:
>>  - 
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>  
>> 
>>  - 
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>  
>> 
>> 
>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek >> > wrote:
>>> 
>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
>>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
>>> store state in a union state, i.e. all sources get all partition on restore 
>>> and if they didn't get any they know that they are new. There is no 
>>> specific logic for detecting this situation, it's just that the partition 
>>> discoverer will be seeded with this information and it will know if it 
>>> discovers a new partition whether it can take ownership of that partition.
>>> 
>>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>> 
 On 6. Sep 2017, at 14:36, Gyula Fóra > wrote:
 
 Wouldnt it be enough that Kafka sources store some empty container for 
 there state if it is empty, compared to null when it should be 
 bootstrapped again?
 
 Gyula
 
 Aljoscha Krettek > ezt 
 írta (időpont: 2017. szept. 6., Sze, 14:31):
 The problem here is that context.isRestored() is a global flag and not 
 local to each operator. It says "yes this job was restored" but the source 
 would need to know that it is actually brand new and never had any state. 
 This is quite tricky to do, since there is currently no way (if I'm 
 correct) to differentiate between "I got empty state but others maybe got 
 state" and "this source never had state and neither had other parallel 
 instances".
 
 Best,
 Aljoscha
 
> On 6. Sep 2017, at 13:56, Stefan Richter  > wrote:
> 
> Thanks for the report, I will take a look.
> 
>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra > >:
>> 
>> Hi all,
>> 
>> We are running into some problems with the kafka source after changing 
>> the uid and restoring from the savepoint.
>> What we are expecting is to clear the partition state, and set it up all 
>> over again, but what seems to happen is that the consumer thinks that it 
>> doesnt have any partitions assigned.
>> 
>> This was supposed to be fixed in 
>> 

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Gyula Fóra
Hey,

I know it's old discussion but there also seems to be a problem with the
logic in the kafka source alone regarding new topics added after a
checkpoint.

Maybe there is a ticket for this already and I just missed it.

Cheers,
Gyula

Gyula Fóra  ezt írta (időpont: 2017. szept. 14., Cs,
14:53):

> Good job for figuring this out!
> This certainly seems to explain our problems.
>
> Thanks!
> Gyula
>
> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
> 14., Cs, 14:46):
>
>> After a bit more digging I found that the "isRestored" flag doesn't work
>> correctly if there are operators chained to the sink that have state:
>> https://issues.apache.org/jira/browse/FLINK-7623
>>
>> Blocker issue for 1.3.3 and 1.4.0.
>>
>> Best,
>> Aljoscha
>>
>> On 6. Sep 2017, at 16:05, Aljoscha Krettek  wrote:
>>
>> After discussing this between Stefan and me we think that this should
>> actually work.
>>
>> Do you have the log output from restoring the Kafka Consumer? It would be
>> interesting to see whether any of those print:
>>  -
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>  -
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>
>> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
>>
>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer
>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we
>> store state in a union state, i.e. all sources get all partition on restore
>> and if they didn't get any they know that they are new. There is no
>> specific logic for detecting this situation, it's just that the partition
>> discoverer will be seeded with this information and it will know if it
>> discovers a new partition whether it can take ownership of that partition.
>>
>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>
>> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
>>
>> Wouldnt it be enough that Kafka sources store some empty container for
>> there state if it is empty, compared to null when it should be bootstrapped
>> again?
>>
>> Gyula
>>
>> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
>> 6., Sze, 14:31):
>>
>>> The problem here is that context.isRestored() is a global flag and not
>>> local to each operator. It says "yes this job was restored" but the source
>>> would need to know that it is actually brand new and never had any state.
>>> This is quite tricky to do, since there is currently no way (if I'm
>>> correct) to differentiate between "I got empty state but others maybe got
>>> state" and "this source never had state and neither had other parallel
>>> instances".
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Sep 2017, at 13:56, Stefan Richter 
>>> wrote:
>>>
>>> Thanks for the report, I will take a look.
>>>
>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
>>>
>>> Hi all,
>>>
>>> We are running into some problems with the kafka source after changing
>>> the uid and restoring from the savepoint.
>>> What we are expecting is to clear the partition state, and set it up all
>>> over again, but what seems to happen is that the consumer thinks that it
>>> doesnt have any partitions assigned.
>>>
>>> This was supposed to be fixed in
>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>> but appears to be reworked/reverted in the latest release :
>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>
>>> What is the expected behaviour here?
>>>
>>> Thanks!
>>> Gyula
>>>
>>>
>>>
>>>
>>
>>
>>


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-14 Thread Gyula Fóra
Good job for figuring this out!
This certainly seems to explain our problems.

Thanks!
Gyula

Aljoscha Krettek  ezt írta (időpont: 2017. szept. 14.,
Cs, 14:46):

> After a bit more digging I found that the "isRestored" flag doesn't work
> correctly if there are operators chained to the sink that have state:
> https://issues.apache.org/jira/browse/FLINK-7623
>
> Blocker issue for 1.3.3 and 1.4.0.
>
> Best,
> Aljoscha
>
> On 6. Sep 2017, at 16:05, Aljoscha Krettek  wrote:
>
> After discussing this between Stefan and me we think that this should
> actually work.
>
> Do you have the log output from restoring the Kafka Consumer? It would be
> interesting to see whether any of those print:
>  -
> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>  -
> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>
> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
>
> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer
> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we
> store state in a union state, i.e. all sources get all partition on restore
> and if they didn't get any they know that they are new. There is no
> specific logic for detecting this situation, it's just that the partition
> discoverer will be seeded with this information and it will know if it
> discovers a new partition whether it can take ownership of that partition.
>
> I'm sure Gordon (cc'ed) could explain it better than I did.
>
> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
>
> Wouldnt it be enough that Kafka sources store some empty container for
> there state if it is empty, compared to null when it should be bootstrapped
> again?
>
> Gyula
>
> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
> 6., Sze, 14:31):
>
>> The problem here is that context.isRestored() is a global flag and not
>> local to each operator. It says "yes this job was restored" but the source
>> would need to know that it is actually brand new and never had any state.
>> This is quite tricky to do, since there is currently no way (if I'm
>> correct) to differentiate between "I got empty state but others maybe got
>> state" and "this source never had state and neither had other parallel
>> instances".
>>
>> Best,
>> Aljoscha
>>
>> On 6. Sep 2017, at 13:56, Stefan Richter 
>> wrote:
>>
>> Thanks for the report, I will take a look.
>>
>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
>>
>> Hi all,
>>
>> We are running into some problems with the kafka source after changing
>> the uid and restoring from the savepoint.
>> What we are expecting is to clear the partition state, and set it up all
>> over again, but what seems to happen is that the consumer thinks that it
>> doesnt have any partitions assigned.
>>
>> This was supposed to be fixed in
>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>> but appears to be reworked/reverted in the latest release :
>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>
>> What is the expected behaviour here?
>>
>> Thanks!
>> Gyula
>>
>>
>>
>>
>
>
>


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-14 Thread Aljoscha Krettek
After a bit more digging I found that the "isRestored" flag doesn't work 
correctly if there are operators chained to the sink that have state: 
https://issues.apache.org/jira/browse/FLINK-7623 


Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha
> On 6. Sep 2017, at 16:05, Aljoscha Krettek  wrote:
> 
> After discussing this between Stefan and me we think that this should 
> actually work.
> 
> Do you have the log output from restoring the Kafka Consumer? It would be 
> interesting to see whether any of those print:
>  - 
> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>  
> 
>  - 
> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>  
> 
> 
>> On 6. Sep 2017, at 14:45, Aljoscha Krettek > > wrote:
>> 
>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
>> store state in a union state, i.e. all sources get all partition on restore 
>> and if they didn't get any they know that they are new. There is no specific 
>> logic for detecting this situation, it's just that the partition discoverer 
>> will be seeded with this information and it will know if it discovers a new 
>> partition whether it can take ownership of that partition.
>> 
>> I'm sure Gordon (cc'ed) could explain it better than I did.
>> 
>>> On 6. Sep 2017, at 14:36, Gyula Fóra >> > wrote:
>>> 
>>> Wouldnt it be enough that Kafka sources store some empty container for 
>>> there state if it is empty, compared to null when it should be bootstrapped 
>>> again?
>>> 
>>> Gyula
>>> 
>>> Aljoscha Krettek > ezt 
>>> írta (időpont: 2017. szept. 6., Sze, 14:31):
>>> The problem here is that context.isRestored() is a global flag and not 
>>> local to each operator. It says "yes this job was restored" but the source 
>>> would need to know that it is actually brand new and never had any state. 
>>> This is quite tricky to do, since there is currently no way (if I'm 
>>> correct) to differentiate between "I got empty state but others maybe got 
>>> state" and "this source never had state and neither had other parallel 
>>> instances".
>>> 
>>> Best,
>>> Aljoscha
>>> 
 On 6. Sep 2017, at 13:56, Stefan Richter > wrote:
 
 Thanks for the report, I will take a look.
 
> Am 06.09.2017 um 11:48 schrieb Gyula Fóra  >:
> 
> Hi all,
> 
> We are running into some problems with the kafka source after changing 
> the uid and restoring from the savepoint.
> What we are expecting is to clear the partition state, and set it up all 
> over again, but what seems to happen is that the consumer thinks that it 
> doesnt have any partitions assigned.
> 
> This was supposed to be fixed in 
> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>  
> 
> but appears to be reworked/reverted in the latest release : 
> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>  
> 
> 
> What is the expected behaviour here?
> 
> Thanks!
> Gyula
 
>>> 
>> 
> 



Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Aljoscha Krettek
After discussing this between Stefan and me we think that this should actually 
work.

Do you have the log output from restoring the Kafka Consumer? It would be 
interesting to see whether any of those print:
 - 
https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
 

 - 
https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
 


> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
> 
> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
> store state in a union state, i.e. all sources get all partition on restore 
> and if they didn't get any they know that they are new. There is no specific 
> logic for detecting this situation, it's just that the partition discoverer 
> will be seeded with this information and it will know if it discovers a new 
> partition whether it can take ownership of that partition.
> 
> I'm sure Gordon (cc'ed) could explain it better than I did.
> 
>> On 6. Sep 2017, at 14:36, Gyula Fóra > > wrote:
>> 
>> Wouldnt it be enough that Kafka sources store some empty container for there 
>> state if it is empty, compared to null when it should be bootstrapped again?
>> 
>> Gyula
>> 
>> Aljoscha Krettek > ezt írta 
>> (időpont: 2017. szept. 6., Sze, 14:31):
>> The problem here is that context.isRestored() is a global flag and not local 
>> to each operator. It says "yes this job was restored" but the source would 
>> need to know that it is actually brand new and never had any state. This is 
>> quite tricky to do, since there is currently no way (if I'm correct) to 
>> differentiate between "I got empty state but others maybe got state" and 
>> "this source never had state and neither had other parallel instances".
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Sep 2017, at 13:56, Stefan Richter >> > wrote:
>>> 
>>> Thanks for the report, I will take a look.
>>> 
 Am 06.09.2017 um 11:48 schrieb Gyula Fóra >:
 
 Hi all,
 
 We are running into some problems with the kafka source after changing the 
 uid and restoring from the savepoint.
 What we are expecting is to clear the partition state, and set it up all 
 over again, but what seems to happen is that the consumer thinks that it 
 doesnt have any partitions assigned.
 
 This was supposed to be fixed in 
 https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
  
 
 but appears to be reworked/reverted in the latest release : 
 https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
  
 
 
 What is the expected behaviour here?
 
 Thanks!
 Gyula
>>> 
>> 
> 



Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Aljoscha Krettek
Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer which 
also has discovery of new partitions. Starting from 1.4-SNAPSHOT we store state 
in a union state, i.e. all sources get all partition on restore and if they 
didn't get any they know that they are new. There is no specific logic for 
detecting this situation, it's just that the partition discoverer will be 
seeded with this information and it will know if it discovers a new partition 
whether it can take ownership of that partition.

I'm sure Gordon (cc'ed) could explain it better than I did.

> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
> 
> Wouldnt it be enough that Kafka sources store some empty container for there 
> state if it is empty, compared to null when it should be bootstrapped again?
> 
> Gyula
> 
> Aljoscha Krettek > ezt írta 
> (időpont: 2017. szept. 6., Sze, 14:31):
> The problem here is that context.isRestored() is a global flag and not local 
> to each operator. It says "yes this job was restored" but the source would 
> need to know that it is actually brand new and never had any state. This is 
> quite tricky to do, since there is currently no way (if I'm correct) to 
> differentiate between "I got empty state but others maybe got state" and 
> "this source never had state and neither had other parallel instances".
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 13:56, Stefan Richter > > wrote:
>> 
>> Thanks for the report, I will take a look.
>> 
>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra >> >:
>>> 
>>> Hi all,
>>> 
>>> We are running into some problems with the kafka source after changing the 
>>> uid and restoring from the savepoint.
>>> What we are expecting is to clear the partition state, and set it up all 
>>> over again, but what seems to happen is that the consumer thinks that it 
>>> doesnt have any partitions assigned.
>>> 
>>> This was supposed to be fixed in 
>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>>  
>>> 
>>> but appears to be reworked/reverted in the latest release : 
>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>  
>>> 
>>> 
>>> What is the expected behaviour here?
>>> 
>>> Thanks!
>>> Gyula
>> 
> 



Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Gyula Fóra
Wouldnt it be enough that Kafka sources store some empty container for
there state if it is empty, compared to null when it should be bootstrapped
again?

Gyula

Aljoscha Krettek  ezt írta (időpont: 2017. szept. 6.,
Sze, 14:31):

> The problem here is that context.isRestored() is a global flag and not
> local to each operator. It says "yes this job was restored" but the source
> would need to know that it is actually brand new and never had any state.
> This is quite tricky to do, since there is currently no way (if I'm
> correct) to differentiate between "I got empty state but others maybe got
> state" and "this source never had state and neither had other parallel
> instances".
>
> Best,
> Aljoscha
>
> On 6. Sep 2017, at 13:56, Stefan Richter 
> wrote:
>
> Thanks for the report, I will take a look.
>
> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
>
> Hi all,
>
> We are running into some problems with the kafka source after changing the
> uid and restoring from the savepoint.
> What we are expecting is to clear the partition state, and set it up all
> over again, but what seems to happen is that the consumer thinks that it
> doesnt have any partitions assigned.
>
> This was supposed to be fixed in
> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
> but appears to be reworked/reverted in the latest release :
> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>
> What is the expected behaviour here?
>
> Thanks!
> Gyula
>
>
>
>


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Aljoscha Krettek
The problem here is that context.isRestored() is a global flag and not local to 
each operator. It says "yes this job was restored" but the source would need to 
know that it is actually brand new and never had any state. This is quite 
tricky to do, since there is currently no way (if I'm correct) to differentiate 
between "I got empty state but others maybe got state" and "this source never 
had state and neither had other parallel instances".

Best,
Aljoscha

> On 6. Sep 2017, at 13:56, Stefan Richter  wrote:
> 
> Thanks for the report, I will take a look.
> 
>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra > >:
>> 
>> Hi all,
>> 
>> We are running into some problems with the kafka source after changing the 
>> uid and restoring from the savepoint.
>> What we are expecting is to clear the partition state, and set it up all 
>> over again, but what seems to happen is that the consumer thinks that it 
>> doesnt have any partitions assigned.
>> 
>> This was supposed to be fixed in 
>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>  
>> 
>> but appears to be reworked/reverted in the latest release : 
>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>  
>> 
>> 
>> What is the expected behaviour here?
>> 
>> Thanks!
>> Gyula
> 



Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Stefan Richter
Thanks for the report, I will take a look.

> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
> 
> Hi all,
> 
> We are running into some problems with the kafka source after changing the 
> uid and restoring from the savepoint.
> What we are expecting is to clear the partition state, and set it up all over 
> again, but what seems to happen is that the consumer thinks that it doesnt 
> have any partitions assigned.
> 
> This was supposed to be fixed in 
> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>  
> 
> but appears to be reworked/reverted in the latest release : 
> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>  
> 
> 
> What is the expected behaviour here?
> 
> Thanks!
> Gyula



Empty state restore seems to be broken for Kafka source (1.3.2)

2017-09-06 Thread Gyula Fóra
Hi all,

We are running into some problems with the kafka source after changing the
uid and restoring from the savepoint.
What we are expecting is to clear the partition state, and set it up all
over again, but what seems to happen is that the consumer thinks that it
doesnt have any partitions assigned.

This was supposed to be fixed in
https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
but appears to be reworked/reverted in the latest release :
https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547

What is the expected behaviour here?

Thanks!
Gyula