[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997757#comment-16997757 ] Rong Rong commented on FLINK-9630: -- Hi All, is this connected somehow with FLINK-8497? We are investigating some similar issues with the PartitionDiscovery. Also I think based on the bug report. This doesn't seem to affect Kafka 0.11 and up, yes ? -- Rong > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.4.2, 1.5.0 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997755#comment-16997755 ] Rong Rong commented on FLINK-9630: -- Hi All, Is this a duplicate of FLINK-8497 ? > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.4.2, 1.5.0 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559146#comment-16559146 ] ASF GitHub Bot commented on FLINK-9630: --- yanghua commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-408288375 hi @ubyyj Till and Chesnay are busy releasing Flink 1.6 and 1.5.2, just wait~ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559076#comment-16559076 ] ASF GitHub Bot commented on FLINK-9630: --- ubyyj commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-408274936 @tillrohrmann @zentol would you please take a look, and if everything looks good to you, can you please get this merged? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556893#comment-16556893 ] ASF GitHub Bot commented on FLINK-9630: --- yanghua commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-407974746 To merge PR, you can ping @tillrohrmann and @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556892#comment-16556892 ] ASF GitHub Bot commented on FLINK-9630: --- ubyyj commented on issue #6336: [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … URL: https://github.com/apache/flink/pull/6336#issuecomment-407974446 @zhangminglei would you please help me get this merged? thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553487#comment-16553487 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 @tzulitai is there anything else we need to do before we get this merged? > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551496#comment-16551496 ] ASF GitHub Bot commented on FLINK-9630: --- Github user zhangminglei commented on the issue: https://github.com/apache/flink/pull/6336 Hi, @ubyyj Thanks for your contribution! It looks good to me. > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551463#comment-16551463 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 @zhangminglei @tzulitai would you please take a look? > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544812#comment-16544812 ] ASF GitHub Bot commented on FLINK-9630: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/6336 @ubyyj you can use this command to trigger Travis rebuild : ``` git commit --allow-empty -m "" ``` > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544722#comment-16544722 ] ASF GitHub Bot commented on FLINK-9630: --- GitHub user ubyyj reopened a pull request: https://github.com/apache/flink/pull/6336 [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … …leak on TopicAuthorizationException **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics() get a TopicAuthorizationException. ## Brief change log catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics(). ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running job which consumes from an non-exist kafka topic, and verified the # of opened TCP connection and # file handle did not increase of the task manager process, The fix has beening running in our production for weeks now, without problem * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ubyyj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6336 commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738 Author: yuanyoujun Date: 2018-07-15T13:07:49Z [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544723#comment-16544723 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 trigger travis build > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544721#comment-16544721 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj commented on the issue: https://github.com/apache/flink/pull/6336 unrelavent build failure, close and will reopen again, to trigger travis build again > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544720#comment-16544720 ] ASF GitHub Bot commented on FLINK-9630: --- Github user ubyyj closed the pull request at: https://github.com/apache/flink/pull/6336 > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery > loop will clean itself up after it escapes > partitionDiscoverer.wakeup(); } > // the discovery loop may currently be sleeping in-between > // consecutive discoveries; interrupt to shutdown faster > discoveryLoopThread.interrupt(); > } > // abort the fetcher, if there is one > if (kafkaFetcher != null) > { kafkaFetcher.cancel(); } > } > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
[ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16544541#comment-16544541 ] ASF GitHub Bot commented on FLINK-9630: --- GitHub user ubyyj opened a pull request: https://github.com/apache/flink/pull/6336 [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection … …leak on TopicAuthorizationException **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics() get a TopicAuthorizationException. ## Brief change log catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics(). ## Verifying this change This change added tests and can be verified as follows: - *Manually verified the change by running job which consumes from an non-exist kafka topic, and verified the # of opened TCP connection and # file handle did not increase of the task manager process, The fix has beening running in our production for weeks now, without problem * ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/ubyyj/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6336.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6336 commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738 Author: yuanyoujun Date: 2018-07-15T13:07:49Z [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException > Kafka09PartitionDiscoverer cause connection leak on > TopicAuthorizationException > --- > > Key: FLINK-9630 > URL: https://issues.apache.org/jira/browse/FLINK-9630 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.5.0, 1.4.2 > Environment: Linux 2.6, java 8, Kafka broker 0.10.x >Reporter: Youjun Yuan >Priority: Major > Labels: pull-request-available > Fix For: 1.5.2 > > > when the Kafka topic got deleted, during task starting process, > Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in > getAllPartitionsForTopics(), and it get no chance to close the > kafkaConsumer, hence resulting TCP connection leak (to Kafka broker). > > *this issue can bring down the whole Flink cluster*, because, in a default > setup (fixedDelay with INT.MAX restart attempt), job manager will randomly > schedule the job to any TaskManager that has free slot, and each attemp will > cause the TaskManager to leak a TCP connection, eventually almost every > TaskManager will run out of file handle, hence no taskmanger could make > snapshot, or accept new job. Effectly stops the whole cluster. > > The leak happens when StreamTask.invoke() calls openAllOperators(), then > FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), > when kafkaConsumer.partitionsFor(topic) in > KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a > *TopicAuthorizationException,* no one catches this. > Though StreamTask.open catches Exception and invoks the dispose() method of > each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), > however it does not close the kakfaConsumer in partitionDiscoverer, not even > invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread was > null. > > below is the code of FlinkKakfaConsumerBase.cancel() for your convenience > public void cancel() { > // set ourselves as not running; > // this would let the main discovery loop escape as soon as possible > running = false; > if (discoveryLoopThread != null) { > if (partitionDiscoverer != null) > { // we cannot close the discoverer here, as it is error-prone to > concurrent access; // only wakeup the discoverer, the discovery