Ok, thanks for the clarification. :)

Gyula

On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek <aljos...@apache.org> wrote:

> It might be old but it's not forgotten, the issue I created is actually
> marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.
>
> The issue in Kafka is about new topics/partitions not being discovered or
> something else? That would be the expected behaviour in Flink < 1.4.0.
>
> Best,
> Aljoscha
>
> On 12. Oct 2017, at 16:40, Gyula Fóra <gyf...@apache.org> wrote:
>
> Hey,
>
> I know it's old discussion but there also seems to be a problem with the
> logic in the kafka source alone regarding new topics added after a
> checkpoint.
>
> Maybe there is a ticket for this already and I just missed it.
>
> Cheers,
> Gyula
>
> Gyula Fóra <gyf...@apache.org> ezt írta (időpont: 2017. szept. 14., Cs,
> 14:53):
>
>> Good job for figuring this out!
>> This certainly seems to explain our problems.
>>
>> Thanks!
>> Gyula
>>
>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. szept.
>> 14., Cs, 14:46):
>>
>>> After a bit more digging I found that the "isRestored" flag doesn't work
>>> correctly if there are operators chained to the sink that have state:
>>> https://issues.apache.org/jira/browse/FLINK-7623
>>>
>>> Blocker issue for 1.3.3 and 1.4.0.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Sep 2017, at 16:05, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>> After discussing this between Stefan and me we think that this should
>>> actually work.
>>>
>>> Do you have the log output from restoring the Kafka Consumer? It would
>>> be interesting to see whether any of those print:
>>>  -
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>>  -
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>>
>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek <aljos...@apache.org> wrote:
>>>
>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT
>>> consumer which also has discovery of new partitions. Starting from
>>> 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all
>>> partition on restore and if they didn't get any they know that they are
>>> new. There is no specific logic for detecting this situation, it's just
>>> that the partition discoverer will be seeded with this information and it
>>> will know if it discovers a new partition whether it can take ownership of
>>> that partition.
>>>
>>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>>
>>> On 6. Sep 2017, at 14:36, Gyula Fóra <gyf...@apache.org> wrote:
>>>
>>> Wouldnt it be enough that Kafka sources store some empty container for
>>> there state if it is empty, compared to null when it should be bootstrapped
>>> again?
>>>
>>> Gyula
>>>
>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2017. szept.
>>> 6., Sze, 14:31):
>>>
>>>> The problem here is that context.isRestored() is a global flag and not
>>>> local to each operator. It says "yes this job was restored" but the source
>>>> would need to know that it is actually brand new and never had any state.
>>>> This is quite tricky to do, since there is currently no way (if I'm
>>>> correct) to differentiate between "I got empty state but others maybe got
>>>> state" and "this source never had state and neither had other parallel
>>>> instances".
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 6. Sep 2017, at 13:56, Stefan Richter <s.rich...@data-artisans.com>
>>>> wrote:
>>>>
>>>> Thanks for the report, I will take a look.
>>>>
>>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra <gyf...@apache.org>:
>>>>
>>>> Hi all,
>>>>
>>>> We are running into some problems with the kafka source after changing
>>>> the uid and restoring from the savepoint.
>>>> What we are expecting is to clear the partition state, and set it up
>>>> all over again, but what seems to happen is that the consumer thinks that
>>>> it doesnt have any partitions assigned.
>>>>
>>>> This was supposed to be fixed in
>>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>>> but appears to be reworked/reverted in the latest release :
>>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>>
>>>> What is the expected behaviour here?
>>>>
>>>> Thanks!
>>>> Gyula
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>>
>

Reply via email to