[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17372420#comment-17372420 ] Yun Gao edited comment on FLINK-22085 at 7/2/21, 11:51 AM: --- Hi all, I should have found the reason: for flush timeout = 0, the flush happens only after the records emitted and there is no standalone flush thread, then after failover: # The (source -> map -> map) task (parallel = 8) started, it first try to restore the partition state, after this it broadcast EndOfChannelStateEvent, this would block the channel. # Then for the following records emitted by the (source -> map -> map) tasks, it would not notifyDataAvailable since isBlocked = true. # After the (sink) task (parallel = 1) received all the EndOfChannelStateEvent, it would resume all the subpartitions. After this it would check if the subpartition is available, if so, it would queue the corresponding local input channel # However, if before 3, the (source -> map -> map) task has emitted all the 1000 records, then these record would not be notified during resuming since the subpartition has isBlock = false, but when it check the availability of the subpartition, it would return isAvailable() = false since flush requested = false. Then the data won't be notified in the future The bug could be reproduced locally by add sleep in the UpstreamRecoveryTrackImpl#handleEndOfRecovery to delay the step 3. was (Author: gaoyunhaii): Hi all, I should have found the reason: for flush timeout = 0, the flush happens only after the records emitted and there is no standalone flush thread, then after failover: # The (source -> map -> map) task (parallel = 8) started, it first try to restore the partition state, after this it broadcast EndOfChannelStateEvent, this would block the channel. # Then for the following records emitted by the (source -> map -> map) tasks, it would not notifyDataAvailable since isBlocked = true. # After the (sink) task (parallel = 1) received all the EndOfChannelStateEvent, it would resume all the subpartitions. After this it would check if the subpartition is available, if so, it would queue the corresponding local input channel # However, if before 3, the (source -> map -> map) task has emitted all the 1000 records, then these record would be notified, during resuming, the subpartition set isBlock to false, but when it check the availability of the subpartition, it would return isAvailable() = false since flush requested = false. Then the data won't be notified in the future The bug could be reproduced locally by add sleep in the UpstreamRecoveryTrackImpl#handleEndOfRecovery to delay the step 3. > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.14.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at >
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17371822#comment-17371822 ] Yun Gao edited comment on FLINK-22085 at 6/30/21, 6:00 AM: --- Thanks [~xintongsong] for the review and monitoring the tests~ Afterward I tried to repeat this test on Azure: [https://github.com/apache/flink/pull/16317], and it seems that the issue could reproduce in Azure for some probability. With all these instances I found that: # The failing identity map indeed process required (namely 1000) records after failover. # The sink might lost all the records from some map tasks, but the number of maps get lost is random. And I have not met with the case that a part of records from a single task get lost. As a whole, I think it seems the case is not related to Kafka, but might have some relationship with local network stack. One possible related point is that the test has set buffer timeout to 0. I'll try to further analyze the causes. was (Author: gaoyunhaii): Thanks [~xintongsong] for the review and monitoring the tests~ Afterward I tried to repeat this test on Azure: [https://github.com/apache/flink/pull/16317], and it seems that the issue could reproduce in Azure for some probability. With all these instances I found that: # The failing identity map indeed process required (namely 1000) records after failover. # The sink might lost all the records from some map tasks, but the number of maps get lost is random. And I have not met with the case that a part of records get lost. As a whole, I think it seems the case is not related to Kafka, but might have some relationship with local network stack. One possible related point is that the test has set buffer timeout to 0. I'll try to further analyze the causes. > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.14.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > *2) Root cause explanations* > The test
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367027#comment-17367027 ] Yun Gao edited comment on FLINK-22085 at 6/22/21, 6:13 AM: --- Hi [~lindong] thanks for the explanation and it indeed helps a lot. First sorry that I still have not found the root cause, I further checked the case, and the rough process is 1. A generate job starts up and generates 5 partitions, each with 1000 records. 2. A consumer job starts up, it has two tasks, the first is kafka source -> failing identity map -> validate map with parallelism 8, and the second task is the validating exactly-once sink, with parallelism 1. 3. Since there are only 5 partitions, 3 of the source tasks would finished immediately after startup. 4. One of the failing identity map would trigger failover in the middle. 5. After restarts, the job would re-run. Note that no checkpoint taken in the first run due to finished tasks, the job would run from scratch after failover. And similar to the first run 3 of the 8 source tasks would finish immediately. 6. The original design is that the sink task would check if 5000 records are received, if so, it would throw a SuccessException() to terminate the job. The main problem here is that the debug thread prints that each of the 5 alive failing identity map have processed 1000 records, but the sink still do not throw the exception. I could not see other possible execution path for now. To make it clear how many records the sink has received, I first open [a PR to also add debug thread to print the sink status | https://github.com/apache/flink/pull/16233]. Could some one have a look at this PR first? Very thanks~ was (Author: gaoyunhaii): Hi [~lindong] thanks for the explanation and it indeed helps a lot. First sorry that I still have not found the root cause, I further checked the case, and the rough process is 1. A generate job starts up and generates 5 partitions, each with 1000 records. 2. A consumer job starts up, it has two tasks, the first is kafka source -> failing identity map -> validate map with parallelism 8, and the second task is the validating exactly-once sink, with parallelism 1. 3. Since there are only 5 partitions, 3 of the source tasks would finished immediately after startup. 4. One of the failing identity map would trigger failover in the middle. 5. After restarts, the job would re-run. Note that no checkpoint taken in the first run due to finished tasks, the job would run from scratch after failover. And similar to the first run 3 of the 8 source tasks would finish immediately. 6. The original design is that the sink task would check if 5000 records are received, if so, it would throw a SuccessException() to terminate the job. The main problem here is that the debug thread prints that each of the 5 alive failing identity map have processed 1000 records, but the sink still do not throw the exception. I could not see other possible execution path for now. To make it clear how many records the sink has received, I first open [a PR to also add debug thread to print the sink thread | https://github.com/apache/flink/pull/16233]. Could some one have a look at this PR first? Very thanks~ > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.14.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at >
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17367027#comment-17367027 ] Yun Gao edited comment on FLINK-22085 at 6/22/21, 6:11 AM: --- Hi [~lindong] thanks for the explanation and it indeed helps a lot. First sorry that I still have not found the root cause, I further checked the case, and the rough process is 1. A generate job starts up and generates 5 partitions, each with 1000 records. 2. A consumer job starts up, it has two tasks, the first is kafka source -> failing identity map -> validate map with parallelism 8, and the second task is the validating exactly-once sink, with parallelism 1. 3. Since there are only 5 partitions, 3 of the source tasks would finished immediately after startup. 4. One of the failing identity map would trigger failover in the middle. 5. After restarts, the job would re-run. Note that no checkpoint taken in the first run due to finished tasks, the job would run from scratch after failover. And similar to the first run 3 of the 8 source tasks would finish immediately. 6. The original design is that the sink task would check if 5000 records are received, if so, it would throw a SuccessException() to terminate the job. The main problem here is that the debug thread prints that each of the 5 alive failing identity map have processed 1000 records, but the sink still do not throw the exception. I could not see other possible execution path for now. To make it clear how many records the sink has received, I first open [a PR to also add debug thread to print the sink thread | https://github.com/apache/flink/pull/16233]. Could some one have a look at this PR first? Very thanks~ was (Author: gaoyunhaii): Hi [~lindong] thanks for the explanation and it indeed helps a lot. First sorry that I still have not found the root cause, I further checked the case, and the rough process is 1. A generate job starts up and generates 5 partitions, each with 1000 records. 2. A consumer job starts up, it has two tasks, the first is kafka source -> failing identity map -> validate map with parallelism 8, and the second task is the validating exactly-once sink, with parallelism 1. 3. Since there are only 5 partitions, 3 of the source tasks would finished immediately after startup. 4. One of the failing identity map would trigger failover in the middle. 5. After restarts, the job would re-run. Note that no checkpoint taken in the first run due to finished tasks, the job would run from scratch after failover. And similar to the first run 3 of the 8 source tasks would finish immediately. 6. The original design is that the sink task would check if 5000 records are received, if so, it would throw a SuccessException() to terminate the job. The main problem here is that the debug thread prints that each of the 5 alive failing identity map have processed 1000 records, but the sink still do not throw the exception. I could not see other possible execution path for now. To make it clear how many records the sink has received, I first open [a PR to also add debug thread to print the sink thread | https://github.com/apache/flink/pull/16233]. Could some one have a look at this PR first? Very thanks~ > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.14.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at >
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17366357#comment-17366357 ] Yun Gao edited comment on FLINK-22085 at 6/21/21, 3:38 AM: --- I checked the last several cases, it seems they are all hang at testMultipleSourcesOnePartition(org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase) with the last log shows {code:java} 01:34:47,793 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Failed to trigger checkpoint for job d9229593ea3e8c25751e0b146ba62ee1 since some tasks of job d9229593ea3e8c25751e0b146ba62ee1 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running. {code} I'll first focus on this issue. was (Author: gaoyunhaii): I checked the last several cases, it seems they are all failed at testMultipleSourcesOnePartition(org.apache.flink.connector.kafka.source.KafkaSourceLegacyITCase) with the last log shows {code:java} 01:34:47,793 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Failed to trigger checkpoint for job d9229593ea3e8c25751e0b146ba62ee1 since some tasks of job d9229593ea3e8c25751e0b146ba62ee1 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running. {code} I'll first focus on this issue. > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Yun Gao >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.14.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > *2) Root cause explanations* > The test would hang because it enters the following loop: > - closeOnFlush() is called for a given channel > - closeOnFlush() calls channel.write(..) > - channel.write() triggers the exceptionCaught(...) callback > - closeOnFlush() is called for the same channel again. > *3) Solution* > Update closeOnFlush() so that, if a channel is being closed by this method, > then closeOnFlush() would not try to write to
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17332957#comment-17332957 ] Dong Lin edited comment on FLINK-22085 at 4/27/21, 5:53 AM: Thank you [~dwysakowicz] for the information. For the first test failure [1], it could be because the Azure pipeline is very slow and the it takes more than 60 seconds (due to long GC) to complete that test. Maybe we can see if increasing the timeout to 120 seconds could reduce the failure rate of this test. For the second test failure [2], it appears that the test failed due to "OperatorEvent from an OperatorCoordinator to a task was lost". This is relate to https://github.com/apache/flink/pull/15605 which was committed recently. Given that KafkaSourceLegacyITCase no longer hangs and the comment history in this JIRA is already very long, I opened https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of "OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can close this JIRA and continue the discussion in FLINK-22488. [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062 was (Author: lindong): Thank you [~dwysakowicz] for the information. For the first test failure [1], it could be because the Azure pipeline is very slow and the it takes more than 60 seconds (due to long GC) to complete that test. Maybe we can see if increasing the timeout to 120 seconds could reduce the failure rate of this test. For the second test failure [2], it appears that the test failed due to "OperatorEvent from an OperatorCoordinator to a task was lost". This is relate to https://github.com/apache/flink/pull/15605 which was committed recently. Since the KafkaSourceLegacyITCase no longer hangs and the comments in this JIRA is already very long, I opened https://issues.apache.org/jira/browse/FLINK-22488 to track the issue of "OperatorEvent from an OperatorCoordinator to a task was lost". Maybe we can close this JIRA and continue the discussion in FLINK-22488. [1] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17206=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6612 [2] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=17212=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=7062 > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Dong Lin >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at >
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330815#comment-17330815 ] Dong Lin edited comment on FLINK-22085 at 4/23/21, 2:29 PM: [~dwysakowicz] FYI, after applying https://github.com/apache/flink/pull/15713, I am able to run KafkaSourceLegacyITCase 100 times without any failure. The bug fixed by https://github.com/apache/flink/pull/15713 would cause test hang only if there is a Flink job that keeps running. After reading through related code, I still could not find a full explanation of why there is such a Flink job in the first place. I will stop investigating this bug for now. Let's see if there is still test failure and if we can find more useful information from the Azure pipeline log. was (Author: lindong): [~dwysakowicz] After applying https://github.com/apache/flink/pull/15713, I am able to run KafkaSourceLegacyITCase 100 times without any failure. The bug fixed by https://github.com/apache/flink/pull/15713 would cause test hang only if there is a Flink job that keeps running. After reading through related code, I still could not find a full explanation of why there is such a Flink job in the first place. I will stop investigating this bug for now. Let's see if there is still test failure and if we can find more useful information from the Azure pipeline log. > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0, 1.14.0 >Reporter: Dawid Wysakowicz >Assignee: Dong Lin >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > *2) Root cause explanations* > The test would hang because it enters the following loop: > - closeOnFlush() is called for a given channel > - closeOnFlush() calls channel.write(..) > - channel.write() triggers the exceptionCaught(...) callback > - closeOnFlush() is called for the same channel again. > *3) Solution* > Update closeOnFlush() so that, if a channel is being closed by this method, > then closeOnFlush() would not try to write to this
[jira] [Comment Edited] (FLINK-22085) KafkaSourceLegacyITCase hangs/fails on azure
[ https://issues.apache.org/jira/browse/FLINK-22085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17325615#comment-17325615 ] Matthias edited comment on FLINK-22085 at 4/20/21, 8:50 AM: [~lindong] {{KafkaSourceLegacyITCase}} timed out in [that build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16789=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6593]. The build itself included the fix [98424e6|https://github.com/apache/flink/commit/98424e6383bcce107844cbeecc2e9df4ffb4272a] provided by this issue. May you have another look at it to double-check whether it's related or a completely different issue/ was (Author: mapohl): [~lindong] {{KafkaSourceLegacyITCase}} timed out in [that build|https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=16789=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=6593]. The build itself included the fix [98424e6|https://github.com/apache/flink/commit/98424e6383bcce107844cbeecc2e9df4ffb4272a] provided by this issue. May you have another look at it? > KafkaSourceLegacyITCase hangs/fails on azure > > > Key: FLINK-22085 > URL: https://issues.apache.org/jira/browse/FLINK-22085 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.13.0 >Reporter: Dawid Wysakowicz >Assignee: Dong Lin >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.13.0 > > > 1) Observations > a) The Azure pipeline would occasionally hang without printing any test error > information. > [https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=15939=logs=c5f0071e-1851-543e-9a45-9ac140befc32=1fb1a56f-e8b5-5a82-00a0-a2db7757b4f5=8219] > b) By running the test KafkaSourceLegacyITCase::testBrokerFailure() with INFO > level logging, the the test would hang with the following error message > printed repeatedly: > {code:java} > 20451 [New I/O boss #50] ERROR > org.apache.flink.networking.NetworkFailureHandler [] - Closing communication > channel because of an exception > java.net.ConnectException: Connection refused: localhost/127.0.0.1:50073 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > ~[?:1.8.0_151] > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > ~[?:1.8.0_151] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) > ~[flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > org.apache.flink.shaded.testutils.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) > [flink-test-utils_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > *2) Root cause explanations* > The test would hang because it enters the following loop: > - closeOnFlush() is called for a given channel > - closeOnFlush() calls channel.write(..) > - channel.write() triggers the exceptionCaught(...) callback > - closeOnFlush() is called for the same channel again. > *3) Solution* > Update closeOnFlush() so that, if a channel is being closed by this method, > then closeOnFlush() would not try to write to this channel if it is called on > this channel again. -- This message was sent by Atlassian Jira (v8.3.4#803005)