Thanks Gordon, here's the ticket: https://issues.apache.org/jira/browse/FLINK-9334
If you'd like me to have a stab at it, feel free to assign the ticket to me. On Thu, Apr 12, 2018 at 10:28 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Juno, > > Thanks for reporting back, glad to know that it's not an issue :) > > In general, connector specific configurations should always happen at the > connector level, per-connector. > The flink-conf.yaml file is usually for cluster wide configurations. > > And yes, it might be helpful to have a code snippet to demonstrate the > configuration for partition discovery. > Could you open a JIRA for that? > > Cheers, > Gordon > > On Tue, Apr 10, 2018, 8:44 AM Juho Autio <juho.au...@rovio.com> wrote: > >> Ahhh looks like I had simply misunderstood where that property should go. >> >> The docs correctly say: >> > To enable it, set a non-negative value for >> > flink.partition-discovery.interval-millis >> in the __provided properties config__ >> >> So it should be set in the Properties that are passed in the constructor >> of FlinkKafkaConsumer! >> >> I had somehow assumed that this should go to flink-conf.yaml (maybe >> because it starts with "flink."?), and obviously the FlinkKafkaConsumer >> doesn't read that. >> >> Sorry for the trouble. If anything, I guess a piece of example code >> might've helped me avoid this mistake. The docs are clear though, I just >> had become blind to this detail as I thought I had already read it. >> >> On Thu, Apr 5, 2018 at 10:26 AM, Juho Autio <juho.au...@rovio.com> wrote: >> >>> Still not working after I had a fresh build from https://github.com/ >>> apache/flink/tree/release-1.5. >>> >>> When the job starts this is logged: >>> >>> 2018-04-05 09:29:38,157 INFO >>> org.apache.flink.configuration.GlobalConfiguration >>> - Loading configuration property: >>> flink.partition-discovery.interval-millis, >>> 60000 >>> >>> So that's 1 minute. >>> >>> As before, I added one more partition to a topic that is being consumed. >>> Secor started consuming it as expected, but Flink didn't – or at least it >>> isn't reporting anything about doing so. The new partition is not shown in >>> Flink task metrics or consumer offsets committed by Flink. >>> >>> How could I investigate this further? How about that additional logging >>> for partition discovery? >>> >>> On Thu, Mar 22, 2018 at 3:09 PM, Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org> wrote: >>> >>>> Hi, >>>> >>>> I think you’ve made a good point: there is currently no logs that tell >>>> anything about discovering a new partition. We should probably add this. >>>> >>>> And yes, it would be great if you can report back on this using either >>>> the latest master, release-1.5 or release-1.4 branches. >>>> >>>> On 22 March 2018 at 10:24:09 PM, Juho Autio (juho.au...@rovio.com) >>>> wrote: >>>> >>>> Thanks, that sounds promising. I don't know how to check if it's >>>> consuming all partitions? For example I couldn't find any logs about >>>> discovering a new partition. However, did I understand correctly that this >>>> is also fixed in Flink dev? If yes, I could rebuild my 1.5-SNAPSHOT and try >>>> again. >>>> >>>> On Thu, Mar 22, 2018 at 4:18 PM, Tzu-Li (Gordon) Tai < >>>> tzuli...@apache.org> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> Can you confirm that the new partition is consumed, but only that >>>>> Flink’s reported metrics do not include them? >>>>> If yes, then I think your observations can be explained by this issue: >>>>> https://issues.apache.org/jira/browse/FLINK-8419 >>>>> >>>>> <https://issues.apache.org/jira/browse/FLINK-8419> >>>>> This issue should have been fixed in the recently released 1.4.2 >>>>> version. >>>>> >>>>> Cheers, >>>>> Gordon >>>>> >>>>> On 22 March 2018 at 8:02:40 PM, Juho Autio (juho.au...@rovio.com) >>>>> wrote: >>>>> >>>>> According to the docs*, flink.partition-discovery.interval-millis can >>>>> be set to enable automatic partition discovery. >>>>> >>>>> I'm testing this, apparently it doesn't work. >>>>> >>>>> I'm using Flink Version: 1.5-SNAPSHOT Commit: 8395508 >>>>> and FlinkKafkaConsumer010. >>>>> >>>>> I had my flink stream running, consuming an existing topic with 3 >>>>> partitions, among some other topics. >>>>> I modified partitions of an existing topic: 3 -> 4**. >>>>> I checked consumer offsets by secor: it's now consuming all 4 >>>>> partitions. >>>>> I checked consumer offset by my flink stream: it's still consuming >>>>> only the 3 original partitions. >>>>> >>>>> I also checked the Task Metrics of this job from Flink UI and it only >>>>> offers Kafka related metrics to be added for 3 partitions (0,1 & 2). >>>>> >>>>> According to Flink UI > Job Manager > Configuration: >>>>> flink.partition-discovery.interval-millis=60000 >>>>> – so that's just 1 minute. It's already more than 20 minutes since I >>>>> added the new partition, so Flink should've picked it up. >>>>> >>>>> How to debug? >>>>> >>>>> >>>>> Btw, this job has external checkpoints enabled, done once per minute. >>>>> Those are also succeeding. >>>>> >>>>> *) https://ci.apache.org/projects/flink/flink-docs- >>>>> master/dev/connectors/kafka.html#kafka-consumers-topic- >>>>> and-partition-discovery >>>>> >>>>> **) >>>>> >>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --describe >>>>> --topic my_topic >>>>> Topic:my_topic PartitionCount:3 ReplicationFactor:1 Configs: >>>>> >>>>> ~/kafka$ bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST --alter >>>>> --topic my_topic --partitions 4 >>>>> Adding partitions succeeded! >>>>> >>>>> >>>>> >>>> >>> >>