Hi Juno, Thanks for reporting back, glad to know that it's not an issue :)
In general, connector specific configurations should always happen at the connector level, per-connector. The flink-conf.yaml file is usually for cluster wide configurations. And yes, it might be helpful to have a code snippet to demonstrate the configuration for partition discovery. Could you open a JIRA for that? Cheers, Gordon On Tue, Apr 10, 2018, 8:44 AM Juho Autio <juho.au...@rovio.com> wrote: > Ahhh looks like I had simply misunderstood where that property should go. > > The docs correctly say: > > To enable it, set a non-negative value for > flink.partition-discovery.interval-millis in the __provided properties > config__ > > So it should be set in the Properties that are passed in the constructor > of FlinkKafkaConsumer! > > I had somehow assumed that this should go to flink-conf.yaml (maybe > because it starts with "flink."?), and obviously the FlinkKafkaConsumer > doesn't read that. > > Sorry for the trouble. If anything, I guess a piece of example code > might've helped me avoid this mistake. The docs are clear though, I just > had become blind to this detail as I thought I had already read it. > > On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <juho.au...@rovio.com> wrote: > >> Still not working after I had a fresh build from >> https://github.com/apache/flink/tree/release-1.5. >> >> When the job starts this is logged: >> >> 2018-04-05 09:29:38,157 INFO >> org.apache.flink.configuration.GlobalConfiguration - Loading >> configuration property: flink.partition-discovery.interval-millis, 60000 >> >> So that's 1 minute. >> >> As before, I added one more partition to a topic that is being consumed. >> Secor started consuming it as expected, but Flink didn't – or at least it >> isn't reporting anything about doing so. The new partition is not shown in >> Flink task metrics or consumer offsets committed by Flink. >> >> How could I investigate this further? How about that additional logging >> for partition discovery? >> >> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org >> > wrote: >> >>> Hi, >>> >>> I think you’ve made a good point: there is currently no logs that tell >>> anything about discovering a new partition. We should probably add this. >>> >>> And yes, it would be great if you can report back on this using either >>> the latest master, release-1.5 or release-1.4 branches. >>> >>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com) >>> wrote: >>> >>> Thanks, that sounds promising. I don't know how to check if it's >>> consuming all partitions? For example I couldn't find any logs about >>> discovering a new partition. However, did I understand correctly that this >>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try >>> again. >>> >>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org> wrote: >>> >>>> Hi Juho, >>>> >>>> Can you confirm that the new partition is consumed, but only that >>>> Flink’s reported metrics do not include them? >>>> If yes, then I think your observations can be explained by this issue: >>>> https://issues.apache.org/jira/browse/FLINK-8419 >>>> >>>> <https://issues.apache.org/jira/browse/FLINK-8419> >>>> This issue should have been fixed in the recently released 1.4.2 >>>> version. >>>> >>>> Cheers, >>>> Gordon >>>> >>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com) >>>> wrote: >>>> >>>> According to the docs*, flink.partition-discovery.interval-millis can >>>> be set to enable automatic partition discovery. >>>> >>>> I'm testing this, apparently it doesn't work. >>>> >>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508 >>>> and FlinkKafkaConsumer010. >>>> >>>> I had my flink stream running, consuming an existing topic with 3 >>>> partitions, among some other topics. >>>> I modified partitions of an existing topic: 3 -> 4**. >>>> I checked consumer offsets by secor: it's now consuming all 4 >>>> partitions. >>>> I checked consumer offset by my flink stream: it's still consuming only >>>> the 3 original partitions. >>>> >>>> I also checked the Task Metrics of this job from Flink UI and it only >>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2). >>>> >>>> According to Flink UI > Job Manager > Configuration: >>>> flink.partition-discovery.interval-millis=60000 >>>> – so that's just 1 minute. It's already more than 20 minutes since I >>>> added the new partition, so Flink should've picked it up. >>>> >>>> How to debug? >>>> >>>> >>>> Btw, this job has external checkpoints enabled, done once per minute. >>>> Those are also succeeding. >>>> >>>> *) >>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery >>>> >>>> **) >>>> >>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe >>>> --topic my_topic >>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs: >>>> >>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter --topic >>>> my_topic --partitions 4 >>>> Adding partitions succeeded! >>>> >>>> >>>> >>> >> >