Hi Juno,

Thanks for reporting back, glad to know that it's not an issue :)

In general, connector specific configurations should always happen at the
connector level, per-connector.
The flink-conf.yaml file is usually for cluster wide configurations.

And yes, it might be helpful to have a code snippet to demonstrate the
configuration for partition discovery.
Could you open a JIRA for that?

Cheers,
Gordon

On Tue, Apr 10, 2018, 8:44 AM Juho Autio <juho.au...@rovio.com> wrote:

> Ahhh looks like I had simply misunderstood where that property should go.
>
> The docs correctly say:
> > To enable it, set a non-negative value for
> flink.partition-discovery.interval-millis in the __provided properties
> config__
>
> So it should be set in the Properties that are passed in the constructor
> of FlinkKafkaConsumer!
>
> I had somehow assumed that this should go to flink-conf.yaml (maybe
> because it starts with "flink."?), and obviously the FlinkKafkaConsumer
> doesn't read that.
>
> Sorry for the trouble. If anything, I guess a piece of example code
> might've helped me avoid this mistake. The docs are clear though, I just
> had become blind to this detail as I thought I had already read it.
>
> On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <juho.au...@rovio.com> wrote:
>
>> Still not working after I had a fresh build from
>> https://github.com/apache/flink/tree/release-1.5.
>>
>> When the job starts this is logged:
>>
>> 2018-04-05 09:29:38,157 INFO
>> org.apache.flink.configuration.GlobalConfiguration            - Loading
>> configuration property: flink.partition-discovery.interval-millis, 60000
>>
>> So that's 1 minute.
>>
>> As before, I added one more partition to a topic that is being consumed.
>> Secor started consuming it as expected, but Flink didn't – or at least it
>> isn't reporting anything about doing so. The new partition is not shown in
>> Flink task metrics or consumer offsets committed by Flink.
>>
>> How could I investigate this further? How about that additional logging
>> for partition discovery?
>>
>> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
>> > wrote:
>>
>>> Hi,
>>>
>>> I think you’ve made a good point: there is currently no logs that tell
>>> anything about discovering a new partition. We should probably add this.
>>>
>>> And yes, it would be great if you can report back on this using either
>>> the latest master, release-1.5 or release-1.4 branches.
>>>
>>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com)
>>> wrote:
>>>
>>> Thanks, that sounds promising. I don't know how to check if it's
>>> consuming all partitions? For example I couldn't find any logs about
>>> discovering a new partition. However, did I understand correctly that this
>>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try
>>> again.
>>>
>>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> Can you confirm that the new partition is consumed, but only that
>>>> Flink’s reported metrics do not include them?
>>>> If yes, then I think your observations can be explained by this issue:
>>>> https://issues.apache.org/jira/browse/FLINK-8419
>>>>
>>>> <https://issues.apache.org/jira/browse/FLINK-8419>
>>>> This issue should have been fixed in the recently released 1.4.2
>>>> version.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com)
>>>> wrote:
>>>>
>>>> According to the docs*, flink.partition-discovery.interval-millis can
>>>> be set to enable automatic partition discovery.
>>>>
>>>> I'm testing this, apparently it doesn't work.
>>>>
>>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508
>>>> and FlinkKafkaConsumer010.
>>>>
>>>> I had my flink stream running, consuming an existing topic with 3
>>>> partitions, among some other topics.
>>>> I modified partitions of an existing topic: 3 -> 4**.
>>>> I checked consumer offsets by secor: it's now consuming all 4
>>>> partitions.
>>>> I checked consumer offset by my flink stream: it's still consuming only
>>>> the 3 original partitions.
>>>>
>>>> I also checked the Task Metrics of this job from Flink UI and it only
>>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2).
>>>>
>>>> According to Flink UI > Job Manager > Configuration:
>>>> flink.partition-discovery.interval-millis=60000
>>>> – so that's just 1 minute. It's already more than 20 minutes since I
>>>> added the new partition, so Flink should've picked it up.
>>>>
>>>> How to debug?
>>>>
>>>>
>>>> Btw, this job has external checkpoints enabled, done once per minute.
>>>> Those are also succeeding.
>>>>
>>>> *)
>>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery
>>>>
>>>> **)
>>>>
>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe
>>>> --topic my_topic
>>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs:
>>>>
>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic
>>>>  my_topic --partitions 4
>>>> Adding partitions succeeded!
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to