After a bit more digging I found that the "isRestored" flag doesn't work 
correctly if there are operators chained to the sink that have state: 
https://issues.apache.org/jira/browse/FLINK-7623 
<https://issues.apache.org/jira/browse/FLINK-7623>

Blocker issue for 1.3.3 and 1.4.0.

Best,
Aljoscha
> On 6. Sep 2017, at 16:05, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> After discussing this between Stefan and me we think that this should 
> actually work.
> 
> Do you have the log output from restoring the Kafka Consumer? It would be 
> interesting to see whether any of those print:
>  - 
> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>  
> <https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611>
>  - 
> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>  
> <https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554>
> 
>> On 6. Sep 2017, at 14:45, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> 
>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
>> store state in a union state, i.e. all sources get all partition on restore 
>> and if they didn't get any they know that they are new. There is no specific 
>> logic for detecting this situation, it's just that the partition discoverer 
>> will be seeded with this information and it will know if it discovers a new 
>> partition whether it can take ownership of that partition.
>> 
>> I'm sure Gordon (cc'ed) could explain it better than I did.
>> 
>>> On 6. Sep 2017, at 14:36, Gyula Fóra <gyf...@apache.org 
>>> <mailto:gyf...@apache.org>> wrote:
>>> 
>>> Wouldnt it be enough that Kafka sources store some empty container for 
>>> there state if it is empty, compared to null when it should be bootstrapped 
>>> again?
>>> 
>>> Gyula
>>> 
>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>> ezt 
>>> írta (időpont: 2017. szept. 6., Sze, 14:31):
>>> The problem here is that context.isRestored() is a global flag and not 
>>> local to each operator. It says "yes this job was restored" but the source 
>>> would need to know that it is actually brand new and never had any state. 
>>> This is quite tricky to do, since there is currently no way (if I'm 
>>> correct) to differentiate between "I got empty state but others maybe got 
>>> state" and "this source never had state and neither had other parallel 
>>> instances".
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com 
>>>> <mailto:s.rich...@data-artisans.com>> wrote:
>>>> 
>>>> Thanks for the report, I will take a look.
>>>> 
>>>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org 
>>>>> <mailto:gyf...@apache.org>>:
>>>>> 
>>>>> Hi all,
>>>>> 
>>>>> We are running into some problems with the kafka source after changing 
>>>>> the uid and restoring from the savepoint.
>>>>> What we are expecting is to clear the partition state, and set it up all 
>>>>> over again, but what seems to happen is that the consumer thinks that it 
>>>>> doesnt have any partitions assigned.
>>>>> 
>>>>> This was supposed to be fixed in 
>>>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>>>>  
>>>>> <https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475>
>>>>> but appears to be reworked/reverted in the latest release : 
>>>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>>>  
>>>>> <https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547>
>>>>> 
>>>>> What is the expected behaviour here?
>>>>> 
>>>>> Thanks!
>>>>> Gyula
>>>> 
>>> 
>> 
> 

Reply via email to