[jira] [Commented] (FLINK-4500) Cassandra sink can lose messages
[ https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248266#comment-16248266 ] ASF GitHub Bot commented on FLINK-4500: --- Github user mcfongtw commented on the issue: https://github.com/apache/flink/pull/4605 Hi, @zentol , thanks for reviewing this PR. I recalled that I put a [caveat ](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html#checkpointing-and-fault-tolerance)about this potential data loss in the latest C* connector documents. Since this fix is committed, would you like me to open another PR just to remove that warning message from document? > Cassandra sink can lose messages > > > Key: FLINK-4500 > URL: https://issues.apache.org/jira/browse/FLINK-4500 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector >Affects Versions: 1.1.0 >Reporter: Elias Levy >Assignee: Michael Fong > Fix For: 1.4.0 > > > The problem is the same as I pointed out with the Kafka producer sink > (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() > both send data asynchronously to Cassandra and record whether an error occurs > via a future callback. But CassandraSinkBase does not implement > Checkpointed, so it can't stop checkpoint from happening even though the are > Cassandra queries in flight from the checkpoint that may fail. If they do > fail, they would subsequently not be replayed when the job recovered, and > would thus be lost. > In addition, > CassandraSinkBase's close should check whether there is a pending exception > and throw it, rather than silently close. It should also wait for any > pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...
Github user mcfongtw commented on the issue: https://github.com/apache/flink/pull/4605 Hi, @zentol , thanks for reviewing this PR. I recalled that I put a [caveat ](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html#checkpointing-and-fault-tolerance)about this potential data loss in the latest C* connector documents. Since this fix is committed, would you like me to open another PR just to remove that warning message from document? ---
[jira] [Commented] (FLINK-4575) DataSet aggregate methods should support POJOs
[ https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248142#comment-16248142 ] Chuyang Wan commented on FLINK-4575: [~ggevay], if you are ok with converting the POJO to a Tuple via getFlatFields, I can take the task. The idea is, to take POJO field for aggregation operators' argument, and convert it to tuple field, then just use the tuple field as what the current program does. > DataSet aggregate methods should support POJOs > -- > > Key: FLINK-4575 > URL: https://issues.apache.org/jira/browse/FLINK-4575 > Project: Flink > Issue Type: Improvement > Components: DataSet API >Reporter: Gabor Gevay >Priority: Minor > Labels: starter > > The aggregate methods of DataSets (aggregate, sum, min, max) currently only > support Tuples, with the fields specified by indices. With > https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for > POJOs and field expressions would be easy: {{AggregateOperator}} would create > {{FieldAccessors}} instead of just storing field positions, and > {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} > instead of the Tuple field access methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7702) Javadocs are not being built
[ https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248136#comment-16248136 ] ASF GitHub Bot commented on FLINK-7702: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4998 [FLINK-7702] Add maven-bundle-plugin to root pom Before, we had it in places that require it. This doesn't work when running mvn javadoc:aggregate because this will only run for the root pom and can then not find the "bundle" dependencies. R: @zentol, could you maybe have a look. Before this change you couldn't run `mvn javadoc:aggregate -DadditionalJOption="-Xdoclint:none" -Dmaven.javadoc.failOnError=false` successfully, meaning that we never actually built `1.4-SNAPSHOT` Javadocs. With this change it seems to work but the `genjavadoc` tool that generates fake Java sources from Scala code so that we can aggregate Javadocs also for Scala code doesn't seem to work anymore on Scala 2.11. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7702-fix-javadoc-build-with-genjava Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4998.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4998 commit caba0cbc36e8dd588e57c91c102c4fa097e6b5f6 Author: Aljoscha KrettekDate: 2017-11-10T17:13:26Z [FLINK-7702] Add maven-bundle-plugin to root pom Before, we had it in places that require it. This doesn't work when running mvn javadoc:aggregate because this will only run for the root pom and can then not find the "bundle" dependencies. > Javadocs are not being built > > > Key: FLINK-7702 > URL: https://issues.apache.org/jira/browse/FLINK-7702 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Gabor Gevay >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The "Javadocs" link in the left side menu of this page doesn't work: > https://ci.apache.org/projects/flink/flink-docs-master/ > Note that it works in 1.3: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4998: [FLINK-7702] Add maven-bundle-plugin to root pom
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4998 [FLINK-7702] Add maven-bundle-plugin to root pom Before, we had it in places that require it. This doesn't work when running mvn javadoc:aggregate because this will only run for the root pom and can then not find the "bundle" dependencies. R: @zentol, could you maybe have a look. Before this change you couldn't run `mvn javadoc:aggregate -DadditionalJOption="-Xdoclint:none" -Dmaven.javadoc.failOnError=false` successfully, meaning that we never actually built `1.4-SNAPSHOT` Javadocs. With this change it seems to work but the `genjavadoc` tool that generates fake Java sources from Scala code so that we can aggregate Javadocs also for Scala code doesn't seem to work anymore on Scala 2.11. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7702-fix-javadoc-build-with-genjava Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4998.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4998 commit caba0cbc36e8dd588e57c91c102c4fa097e6b5f6 Author: Aljoscha KrettekDate: 2017-11-10T17:13:26Z [FLINK-7702] Add maven-bundle-plugin to root pom Before, we had it in places that require it. This doesn't work when running mvn javadoc:aggregate because this will only run for the root pom and can then not find the "bundle" dependencies. ---
[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
[ https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-7488: -- Description: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different heap sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110) {code} $HADOOP_CONF_DIR was not set prior to running the test. was: {code} compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.239 sec <<< FAILURE! org.junit.ComparisonFailure: Different network buffer memory sizes with configuration: {taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: /usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600> at org.junit.Assert.assertEquals(Assert.java:115) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235) at org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81) compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest) Time elapsed: 0.16 sec <<< FAILURE! org.junit.ComparisonFailure: Different
[jira] [Created] (FLINK-8047) Create HTTP Sink
Jessica Negara created FLINK-8047: - Summary: Create HTTP Sink Key: FLINK-8047 URL: https://issues.apache.org/jira/browse/FLINK-8047 Project: Flink Issue Type: Improvement Components: Batch Connectors and Input/Output Formats Reporter: Jessica Negara Fix For: 1.4.0 Implement an HTTP output connector + sink, to allow users to send Flink output to an HTTP web server. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247846#comment-16247846 ] Juan Miguel Cejuela commented on FLINK-8046: Since we are at this, it is in my humble opinion also strange that, when computing the file splits as in `format.createInputSplits(readerParallelism)`, the given `readerParallelism` is used, but not the the format's `unstoppable` field or `.getNumSplits()` method. I don't know if this could be for another issue. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
[ https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247825#comment-16247825 ] ASF GitHub Bot commented on FLINK-8046: --- GitHub user juanmirocks opened a pull request: https://github.com/apache/flink/pull/4997 [FLINK-8046] [flink-streaming-java] Have filter of timestamp compare with strictly SMALLER (NOT smaller or equal) ## What is the purpose of the change This change fixes the wrong ignoring of files with same exact timestamp. This change also matches the doc header of the method (`shouldIgnore`): "...if the modification time of the file is smaller than...". Without this change, some files with same exact timestamp (because they were written at the same exact long time) will be ignored, which is unexpected by the user. Also you would find the funny log of `Ignoring file:/XXX, with mod time= 1510321363000 and global mod time= 1510321363000` ## Brief change log * Comparison is done with strictly SMALLER (<) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/tagtog/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4997.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4997 commit 2db52989fef2455413d42286893c5227983ee74b Author: Juan Miguel Cejuela Date: 2017-11-10T16:57:09Z compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header "if the modification time of the file is smaller than") Otherwise, some files with same exact timestamp (because they were written at the same exact long time) will be ignored. > ContinuousFileMonitoringFunction wrongly ignores files with exact same > timestamp > > > Key: FLINK-8046 > URL: https://issues.apache.org/jira/browse/FLINK-8046 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.2 >Reporter: Juan Miguel Cejuela > Labels: stream > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > The current monitoring of files sets the internal variable > `globalModificationTime` to filter out files that are "older". However, the > current test (to check "older") does > `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom > `shouldIgnore`) > The comparison should strictly be SMALLER (NOT smaller or equal). The method > documentation also states "This happens if the modification time of the file > is _smaller_ than...". > The equality acceptance for "older", makes some files with same exact > timestamp to be ignored. The behavior is also non-deterministic, as the first > file to be accepted ("first" being pretty much random) makes the rest of > files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4997: [FLINK-8046] [flink-streaming-java] Have filter of...
GitHub user juanmirocks opened a pull request: https://github.com/apache/flink/pull/4997 [FLINK-8046] [flink-streaming-java] Have filter of timestamp compare with strictly SMALLER (NOT smaller or equal) ## What is the purpose of the change This change fixes the wrong ignoring of files with same exact timestamp. This change also matches the doc header of the method (`shouldIgnore`): "...if the modification time of the file is smaller than...". Without this change, some files with same exact timestamp (because they were written at the same exact long time) will be ignored, which is unexpected by the user. Also you would find the funny log of `Ignoring file:/XXX, with mod time= 1510321363000 and global mod time= 1510321363000` ## Brief change log * Comparison is done with strictly SMALLER (<) ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/tagtog/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4997.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4997 commit 2db52989fef2455413d42286893c5227983ee74b Author: Juan Miguel Cejuela Date: 2017-11-10T16:57:09Z compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header "if the modification time of the file is smaller than") Otherwise, some files with same exact timestamp (because they were written at the same exact long time) will be ignored. ---
[jira] [Created] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp
Juan Miguel Cejuela created FLINK-8046: -- Summary: ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp Key: FLINK-8046 URL: https://issues.apache.org/jira/browse/FLINK-8046 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.2 Reporter: Juan Miguel Cejuela Fix For: 1.5.0 The current monitoring of files sets the internal variable `globalModificationTime` to filter out files that are "older". However, the current test (to check "older") does `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom `shouldIgnore`) The comparison should strictly be SMALLER (NOT smaller or equal). The method documentation also states "This happens if the modification time of the file is _smaller_ than...". The equality acceptance for "older", makes some files with same exact timestamp to be ignored. The behavior is also non-deterministic, as the first file to be accepted ("first" being pretty much random) makes the rest of files with same exact timestamp to be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7702) Javadocs are not being built
[ https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7702: Summary: Javadocs are not being built (was: Javadocs link broken) > Javadocs are not being built > > > Key: FLINK-7702 > URL: https://issues.apache.org/jira/browse/FLINK-7702 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Gabor Gevay >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The "Javadocs" link in the left side menu of this page doesn't work: > https://ci.apache.org/projects/flink/flink-docs-master/ > Note that it works in 1.3: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8045) Add Internal DATE/TIME/TIMESTAMP as internal representation of DATE/TIME/TIMESTAMP
[ https://issues.apache.org/jira/browse/FLINK-8045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhenghua Gao updated FLINK-8045: Description: Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is represented as Int internal. This feature may improve performance processing DATE/TIME/TIMESTAMP data. But I found there is a LIMITATION: internal representation exists only within one operator. We transfer DATE/TIME/TIMESTAMP objects between operators. I think we could treat DATE/TIME/TIMESTAMP as internal representation in the whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT) was: Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is represented as Int internal. This feature may improve performance processing DATE/TIME/TIMESTAMP data. But I found there is a LIMITATION: internal representation exists only within one operator. We transfer DATE/TIME/TIMESTAMP objects between operators. I think we could treat DATE/TIME/TIMESTAMP as internal representation in the whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT) > Add Internal DATE/TIME/TIMESTAMP as internal representation of > DATE/TIME/TIMESTAMP > -- > > Key: FLINK-8045 > URL: https://issues.apache.org/jira/browse/FLINK-8045 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao > > Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is > represented as Int internal. > This feature may improve performance processing DATE/TIME/TIMESTAMP data. > But I found there is a LIMITATION: internal representation exists only within > one operator. > We transfer DATE/TIME/TIMESTAMP objects between operators. > I think we could treat DATE/TIME/TIMESTAMP as internal representation in the > whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8045) Add Internal DATE/TIME/TIMESTAMP as internal representation of DATE/TIME/TIMESTAMP
Zhenghua Gao created FLINK-8045: --- Summary: Add Internal DATE/TIME/TIMESTAMP as internal representation of DATE/TIME/TIMESTAMP Key: FLINK-8045 URL: https://issues.apache.org/jira/browse/FLINK-8045 Project: Flink Issue Type: Improvement Components: Table API & SQL Reporter: Zhenghua Gao Assignee: Zhenghua Gao Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is represented as Int internal. This feature may improve performance processing DATE/TIME/TIMESTAMP data. But I found there is a LIMITATION: internal representation exists only within one operator. We transfer DATE/TIME/TIMESTAMP objects between operators. I think we could treat DATE/TIME/TIMESTAMP as internal representation in the whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4996: [hotfix][docs] Change mailing list link in quickst...
Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4996 ---
[GitHub] flink issue #4996: [hotfix][docs] Change mailing list link in quickstart to ...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4996 Thanks @aljoscha for merging! ---
[jira] [Commented] (FLINK-7845) Netty Exception when submitting batch job repeatedly
[ https://issues.apache.org/jira/browse/FLINK-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247785#comment-16247785 ] Piotr Nowojski commented on FLINK-7845: --- Could you strip down your example to some minimal code showing the problem? This IllegalAccessError is almost for sure unrelated. Probably it's caused by dependency convergence error on between Netty 3.8 and 4.0 which should be fixed by https://issues.apache.org/jira/browse/FLINK-7013. You can try it out with Flink 1.4-SNAPSHOT or wait until the release of 1.4. > Netty Exception when submitting batch job repeatedly > > > Key: FLINK-7845 > URL: https://issues.apache.org/jira/browse/FLINK-7845 > Project: Flink > Issue Type: Bug > Components: Core, Network >Affects Versions: 1.3.2 >Reporter: Flavio Pompermaier > > We had some problems with Flink and Netty so we wrote a small unit test to > reproduce the memory issues we have in production. It happens that we have to > restart the Flink cluster because the memory is always increasing from job to > job. > The github project is https://github.com/okkam-it/flink-memory-leak and the > JUnit test is contained in the MemoryLeakTest class (within src/main/test). > I don't know if this is the root of our problems but at some point, usually > around the 28th loop, the job fails with the following exception (actually we > never faced that in production but maybe is related to the memory issue > somehow...): > {code:java} > Caused by: java.lang.IllegalAccessError: > org/apache/flink/runtime/io/network/netty/NettyMessage > at > io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java) > at > io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102) > ... 16 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-7880: --- Assignee: Kostas Kloudas (was: Aljoscha Krettek) > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7880) flink-queryable-state-java fails with core-dump
[ https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-7880: --- Assignee: Aljoscha Krettek (was: Kostas Kloudas) > flink-queryable-state-java fails with core-dump > --- > > Key: FLINK-7880 > URL: https://issues.apache.org/jira/browse/FLINK-7880 > Project: Flink > Issue Type: Bug > Components: Queryable State, Tests >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek >Priority: Blocker > Labels: test-stability > Fix For: 1.4.0 > > > The {{flink-queryable-state-java}} module fails on Travis with a core dump. > https://travis-ci.org/tillrohrmann/flink/jobs/289949829 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry
[ https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247736#comment-16247736 ] Fabian Hueske commented on FLINK-7919: -- Hi [~greghogan], yes that would be an option as well. This would have the benefit of being more aligned with the existing API (outer joins have their own API calls). The fact, that this issue hasn't been reported earlier also indicates that it is usually used as an inner join. > Join with Solution Set fails with NPE if Solution Set has no entry > -- > > Key: FLINK-7919 > URL: https://issues.apache.org/jira/browse/FLINK-7919 > Project: Flink > Issue Type: Bug > Components: DataSet API, Local Runtime >Affects Versions: 1.4.0, 1.3.2 >Reporter: Fabian Hueske > > A job with a delta iteration fails hard with a NPE in the solution set join, > if the solution set has no entry for the join key of the probe side. > The following program reproduces the problem: > {code} > DataSet> values = env.fromElements( > Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1)); > DeltaIteration , Tuple2 > di = values > .iterateDelta(values, 5,0); > DataSet > loop = di.getWorkset() > .map(new MapFunction , Tuple2 >() { > @Override > public Tuple2 map(Tuple2 value) throws > Exception { > // modifying the key to join on a non existing solution set key > return Tuple2.of(value.f0 + 1, 1); > } > }) > .join(di.getSolutionSet()).where(0).equalTo(0) > .with(new JoinFunction , Tuple2 , > Tuple2 >() { > @Override > public Tuple2 join( > Tuple2 first, > Tuple2 second) throws Exception { > > return Tuple2.of(first.f0, first.f1 + second.f1); > } > }); > DataSet > result = di.closeWith(loop, loop); > result.print(); > {code} > It doesn't matter whether the solution set is managed or not. > The problem is cause because the solution set hash table prober returns a > {{null}} value if the solution set does not contain a value for the probe > side key. > The join operator does not check if the return value is {{null}} or not but > immediately tries to create a copy using a {{TypeSerializer}}. This copy > fails with a NPE. > I propose to check for {{null}} and call the join function with {{null}} on > the solution set side. This gives OUTER JOIN semantics for join. > Since the code was previously failing with a NPE, it is safe to forward the > {{null}} into the {{JoinFunction}}. > However, users must be aware that the solution set value may be {{null}} and > we need to update the documentation (JavaDocs + website) to describe the > behavior. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems
[ https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247727#comment-16247727 ] ASF GitHub Bot commented on FLINK-7973: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4961 I did include your end-to-end tests (with some fixes) and the fixes for the errors they found. Should be fine now, let's see what travis says... > Fix service shading relocation for S3 file systems > -- > > Key: FLINK-7973 > URL: https://issues.apache.org/jira/browse/FLINK-7973 > Project: Flink > Issue Type: Bug >Reporter: Stephan Ewen >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The shade plugin relocates services incorrectly currently, applying > relocation patterns multiple times. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4961: [FLINK-7973] fix shading and relocating Hadoop for the S3...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4961 I did include your end-to-end tests (with some fixes) and the fixes for the errors they found. Should be fine now, let's see what travis says... ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247724#comment-16247724 ] ASF GitHub Bot commented on FLINK-4228: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r150275387 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- makes sense, I guess > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r150275387 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + --- End diff -- makes sense, I guess ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247722#comment-16247722 ] ASF GitHub Bot commented on FLINK-4228: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r150275161 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- this dependency is, however, somehow related to `com.amazonaws`, not our code... > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: >
[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4939#discussion_r150275161 --- Diff: flink-yarn/pom.xml --- @@ -153,6 +159,63 @@ under the License. + + + + include_hadoop_aws + + + include_hadoop_aws + + + + + + org.apache.hadoop + hadoop-aws + ${hadoop.version} + test + + + org.apache.avro + avro + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + --- End diff -- this dependency is, however, somehow related to `com.amazonaws`, not our code... ---
[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4939 1) yes, I have tested the changes on a YARN cluster as described in the PR comment (before the changes on the unit tests) 2) regarding the `pom.xml` changes: I actually took those from 991af3652479f85f732cbbade46bed7df1c5d819 and 36b663f4561458408ee68902a1db6a5cd539e1c2 and can only guess why @StephanEwen and @zentol used this way except for what you proposed. Maybe they can clarify. ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247721#comment-16247721 ] ASF GitHub Bot commented on FLINK-4228: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4939 1) yes, I have tested the changes on a YARN cluster as described in the PR comment (before the changes on the unit tests) 2) regarding the `pom.xml` changes: I actually took those from 991af3652479f85f732cbbade46bed7df1c5d819 and 36b663f4561458408ee68902a1db6a5cd539e1c2 and can only guess why @StephanEwen and @zentol used this way except for what you proposed. Maybe they can clarify. > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247674#comment-16247674 ] Seth Wiesman edited comment on FLINK-7999 at 11/10/17 3:47 PM: --- Sure, I will use the example of my use case. I am using Flink to aggregate spend about campaigns that I am running. Each record in the main stream represents contains both a campaign id as well as information about a single financial transaction, this stream is analogous to the fact table in a traditional data warehouse. However, each campaign runs under a different currency so I need to join with metadata containing the currency code for that campaign. Campaigns have both start and end dates, which can span any amount of time from one day to several months. Events: timestamp | id | spend - 2017-11-11 03:00 | 1 | 0.25 2017-11-11 03:02 | 2 | 0.03 2017-11-11 03:05 | 1 | 0.11 Metadata: id | start_date | end_date | currency_code 1 | 2017-11-10 | 2017-11-12 | "USD" 2 | 2017-04-02 | 2019-12-31 | "EUR" I have a valid window of each event can be joined, but it varies by id, today implement this join with the following `CoProcessFunction`. {code:java} class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, CampaignMetadata, Event] { @transient private lazy val descriptor = new ValueStateDescriptor[CampaignMetadata]("campaign", createTypeInformation[CampaignMetadata]) override def processElement1(value: Event, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val campaign = getRuntimeContext.getState(descriptor).value() if (campaign != null && campaign.start <= ctx.timestamp()) { out.collect(value.copy(meta = campaign)) } } override def processElement2(value: CampaignMetadata, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val end = value.end.getTime + allowedLateness if (end < ctx.timerService().currentWatermark()) { return } ctx.timerService().registerEventTimeTimer(end) getRuntimeContext.getState(descriptor).update(value) } override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = { val state= getRuntimeContext.getState(descriptor) val campaign = state.value() if (campaign != null) { val end = campaign.end.getTime + allowedLateness if (end == timestamp) { state.clear() } } } } {code} was (Author: sjwiesman): Sure, I will use the example of my use case. I am using Flink to aggregate spend about campaigns that I am running. Each record in the main stream represents contains both a campaign id as well as information about a single financial transaction, this stream is analogous to the fact table in a traditional data warehouse. However, each campaign runs under a different currency so I need to join with metadata containing the currency code for that campaign. Campaigns have both start and end dates, which can span any amount of time from one day to several months. Events: Metadata: timestamp | id | spend id | start_date | end_date | currency_code - --- 2017-11-11 03:00 | 1 | 0.25 1 | 2017-11-10 | 2017-11-12 | "USD" 2017-11-11 03:02 | 2 | 0.03 2 | 2017-04-02 | 2019-12-31 | "EUR" 2017-11-11 03:05 | 1 | 0.11 I have a valid window of each event can be joined, but it varies by id, today implement this join with the following `CoProcessFunction`. {code:java} class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, CampaignMetadata, Event] { @transient private lazy val descriptor = new ValueStateDescriptor[CampaignMetadata]("campaign", createTypeInformation[CampaignMetadata]) override def processElement1(value: Event, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val campaign = getRuntimeContext.getState(descriptor).value() if (campaign != null && campaign.start <= ctx.timestamp()) { out.collect(value.copy(meta = campaign)) } } override def processElement2(value: CampaignMetadata, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val end = value.end.getTime + allowedLateness if (end < ctx.timerService().currentWatermark()) { return } ctx.timerService().registerEventTimeTimer(end)
[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries
[ https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247674#comment-16247674 ] Seth Wiesman commented on FLINK-7999: - Sure, I will use the example of my use case. I am using Flink to aggregate spend about campaigns that I am running. Each record in the main stream represents contains both a campaign id as well as information about a single financial transaction, this stream is analogous to the fact table in a traditional data warehouse. However, each campaign runs under a different currency so I need to join with metadata containing the currency code for that campaign. Campaigns have both start and end dates, which can span any amount of time from one day to several months. Events: Metadata: timestamp | id | spend id | start_date | end_date | currency_code - --- 2017-11-11 03:00 | 1 | 0.25 1 | 2017-11-10 | 2017-11-12 | "USD" 2017-11-11 03:02 | 2 | 0.03 2 | 2017-04-02 | 2019-12-31 | "EUR" 2017-11-11 03:05 | 1 | 0.11 I have a valid window of each event can be joined, but it varies by id, today implement this join with the following `CoProcessFunction`. {code:java} class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, CampaignMetadata, Event] { @transient private lazy val descriptor = new ValueStateDescriptor[CampaignMetadata]("campaign", createTypeInformation[CampaignMetadata]) override def processElement1(value: Event, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val campaign = getRuntimeContext.getState(descriptor).value() if (campaign != null && campaign.start <= ctx.timestamp()) { out.collect(value.copy(meta = campaign)) } } override def processElement2(value: CampaignMetadata, ctx: CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = { val end = value.end.getTime + allowedLateness if (end < ctx.timerService().currentWatermark()) { return } ctx.timerService().registerEventTimeTimer(end) getRuntimeContext.getState(descriptor).update(value) } override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = { val state= getRuntimeContext.getState(descriptor) val campaign = state.value() if (campaign != null) { val end = campaign.end.getTime + allowedLateness if (end == timestamp) { state.clear() } } } } {code} > Variable Join Window Boundaries > --- > > Key: FLINK-7999 > URL: https://issues.apache.org/jira/browse/FLINK-7999 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Seth Wiesman > > Allow window joins with variable length based on row attributes. > Consider a two streams joined on an id, where one has start and end dates, it > would be useful to be able to join each row during is live durations. Today > this can be expressed in the datastream api using a CoProcessFunction. > left.id = right.id AND (left.time > right.start and left.time < right.end) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6163) Document per-window state in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-6163. --- Resolution: Fixed Documented on master in d98ba08a7a73dc93c839c77d11245aba70869ab5 Documented on release-1.4 in da435f121821fd1107c41352a54ee804f10cf7e3 > Document per-window state in ProcessWindowFunction > -- > > Key: FLINK-6163 > URL: https://issues.apache.org/jira/browse/FLINK-6163 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Documentation >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The current windowing documentation mostly describes {{WindowFunction}} and > treats {{ProcessWindowFunction}} as an afterthought. We should reverse that > and also document the new per-key state that is only available to > {{ProcessWindowFunction}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6163) Document per-window state in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247603#comment-16247603 ] ASF GitHub Bot commented on FLINK-6163: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4992 > Document per-window state in ProcessWindowFunction > -- > > Key: FLINK-6163 > URL: https://issues.apache.org/jira/browse/FLINK-6163 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Documentation >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The current windowing documentation mostly describes {{WindowFunction}} and > treats {{ProcessWindowFunction}} as an afterthought. We should reverse that > and also document the new per-key state that is only available to > {{ProcessWindowFunction}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4992: [FLINK-6163] Document per-window state in ProcessW...
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/4992 ---
[jira] [Commented] (FLINK-6163) Document per-window state in ProcessWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247601#comment-16247601 ] ASF GitHub Bot commented on FLINK-6163: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4992 Thanks for the review!  > Document per-window state in ProcessWindowFunction > -- > > Key: FLINK-6163 > URL: https://issues.apache.org/jira/browse/FLINK-6163 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Documentation >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The current windowing documentation mostly describes {{WindowFunction}} and > treats {{ProcessWindowFunction}} as an afterthought. We should reverse that > and also document the new per-key state that is only available to > {{ProcessWindowFunction}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4992: [FLINK-6163] Document per-window state in ProcessWindowFu...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4992 Thanks for the review! 😃 ---
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247539#comment-16247539 ] ASF GitHub Bot commented on FLINK-7974: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150243346 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; --- End diff -- We want to avoid pulling in the dependency of `runtime` here. Does it make sense to move the `Executors` to another module, e.g. `core`. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4996: [hotfix][docs] Change mailing list link in quickst...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4996 [hotfix][docs] Change mailing list link in quickstart to flink-user Change mailing list link in quickstart to flink-user. Previously it was pointing to flink-dev. This change only fixes minor bug in documentation and doesn't touch source code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink docs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4996.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4996 commit 8c5ae23445850d090bb713bc3db0ccbd32493fe9 Author: Piotr NowojskiDate: 2017-11-10T14:15:11Z [hotfix][docs] Change mailing list link in quickstart to flink-user Previously it was pointing to flink-dev ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150243346 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; --- End diff -- We want to avoid pulling in the dependency of `runtime` here. Does it make sense to move the `Executors` to another module, e.g. `core`. ---
[jira] [Commented] (FLINK-7811) Add support for Scala 2.12
[ https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247518#comment-16247518 ] ASF GitHub Bot commented on FLINK-7811: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/4995 [hotfix] [docs] Fix broken link to FLINK-7811 This fixes a broken hyperlink on page https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html#scala-versions ![image](https://user-images.githubusercontent.com/1681921/32661415-3205d53a-c627-11e7-8f15-3ff6c52c7bf0.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink hotfix-building-fix-url Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4995.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4995 > Add support for Scala 2.12 > -- > > Key: FLINK-7811 > URL: https://issues.apache.org/jira/browse/FLINK-7811 > Project: Flink > Issue Type: Sub-task > Components: Scala API >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4995: [hotfix] [docs] Fix broken link to FLINK-7811
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/4995 [hotfix] [docs] Fix broken link to FLINK-7811 This fixes a broken hyperlink on page https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html#scala-versions ![image](https://user-images.githubusercontent.com/1681921/32661415-3205d53a-c627-11e7-8f15-3ff6c52c7bf0.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink hotfix-building-fix-url Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4995.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4995 ---
[GitHub] flink pull request #4994: [hotfix][build] Disable dependency convergence in ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4994 ---
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247487#comment-16247487 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150232000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { --- End diff -- Ideally we would get rid of the `timeout` parameter because that is something the user specifies when calling `get()` on the returned termination future. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150232000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { --- End diff -- Ideally we would get rid of the `timeout` parameter because that is something the user specifies when calling `get()` on the returned termination future. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150231729 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); --- End diff -- What if `awaitTermination` returns `false`? Then we should call `queryExecutor.shutdownNow` ---
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247484#comment-16247484 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150231729 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); --- End diff -- What if `awaitTermination` returns `false`? Then we should call `queryExecutor.shutdownNow` > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247473#comment-16247473 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228196 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); --- End diff -- result of channel future is ignored. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247472#comment-16247472 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228244 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + tmp.thenRun(() -> { --- End diff -- here as well. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247459#comment-16247459 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223433 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { --- End diff -- Is is strictly necessary to first shut down the query executor before shutting down the netty server? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247470#comment-16247470 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226636 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( + () -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS) + .addListener(finished -> shutdownFuture.complete(true)); --- End diff -- result should be considered. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247456#comment-16247456 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223049 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; --- End diff -- Better to use `Executors.gracefulShutdown` for this. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247458#comment-16247458 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224310 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ --- End diff -- JavaDocs should be updated. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247479#comment-16247479 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228810 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); --- End diff -- Same here? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247460#comment-16247460 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225136 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { --- End diff -- same here, what if an exception happened while shutting `handler` down? Shouldn't we complete the `handlerShutdownFuture` with it? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247467#comment-16247467 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226886 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( --- End diff -- Also here we shouldn't ignore the result of the `allOf` future because it might contain an exception. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247463#comment-16247463 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224448 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { + handler = null; + handlerShutdownFuture.complete(null); + }); + } else { + handlerShutdownFuture.complete(null); } --- End diff -- Why do we execute the handler shutdown in the callback of the `queryExecShutdownFuture`? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247480#comment-16247480 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230620 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { --- End diff -- This method when being called twice will give you a wrong result. The second call will immediately return you a completed future. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247461#comment-16247461 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224362 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); --- End diff -- why do we complete a `boolean` future with `null`? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247482#comment-16247482 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228785 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); --- End diff -- Shouldn't we only increase the `stats.reportSuccessfulRequest` if we could complete `pending`? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247477#comment-16247477 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230125 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { --- End diff -- I think this method can give you wrong results when being called twice. The second call will give you a completed future even though the first call could still be running. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247471#comment-16247471 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225215 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { + handler = null; --- End diff -- I would actually refrain from state changes from an aysnchronous callback thread. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247476#comment-16247476 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228000 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { synchronized (connectLock) { - if (!closed) { + final CompletableFuture shutdownFuture; + if (closed) { + shutdownFuture = new CompletableFuture<>(); + shutdownFuture.complete(null); + } else { if (failureCause == null) { failureCause = cause; } if (established != null) { - established.close(); + shutdownFuture = established.close(); } else { PendingRequest pending; while ((pending = queuedRequests.poll()) != null) { pending.completeExceptionally(cause); } + shutdownFuture = new CompletableFuture<>(); + shutdownFuture.complete(null); } - closed = true; + shutdownFuture.thenRun(() -> closed = true); --- End diff -- Why do we set `closed` to true` only after the shutdown completed? Shouldn't `close` directly set it to true such that all preceding operations won't be executed anymore? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247462#comment-16247462 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225343 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java --- @@ -185,7 +185,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E /** * Shuts down any handler specific resources, e.g. thread pools etc. */ --- End diff -- JavaDocs missing > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247478#comment-16247478 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230379 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { --- End diff -- This method is blocking even though it shouldn't be. Imagine that two close calls happen concurrently. One of them should trigger the closing operation and the other should immediately return with the termination future. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247452#comment-16247452 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150222505 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -250,8 +252,8 @@ private boolean attemptToBind(final int port) throws Throwable { throw future.cause(); } catch (BindException e) { - LOG.debug("Failed to start server {} on port {}: {}.", serverName, port, e.getMessage()); - shutdown(); + log.debug("Failed to start {} on port {}: {}.", serverName, port, e.getMessage()); + shutdownServer(Time.seconds(10L)).join(); --- End diff -- I would use `get()` because it makes the thrown exceptions explicit. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247468#comment-16247468 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228589 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + tmp.thenRun(() -> { --- End diff -- Instead of creating the additional indirection with `shutdownFuture`, couldn't we simply return the result here as teh shutdown future? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247475#comment-16247475 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150229482 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); } } @Override public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } + close(cause).thenAccept(cancelled -> { + if (cancelled) { --- End diff -- Why do we only remove `this` from `establischedConnections` if `cancelled` is true? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247469#comment-16247469 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226027 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); --- End diff -- could be initialized with the number of established + pending connections. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247481#comment-16247481 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150227018 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { synchronized (connectLock) { - if (!closed) { + final CompletableFuture shutdownFuture; + if (closed) { + shutdownFuture = new CompletableFuture<>(); --- End diff -- shorter: `CompletableFuture.completedFuture(null)` > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247451#comment-16247451 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221485 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { + try { + client.shutdown().join(); --- End diff -- `get()` is better here > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247465#comment-16247465 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224857 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); --- End diff -- Shouldn't we take the `finished` value to complete the `groupShutdownFuture`? There might have been an exception been thrown. We should forward this then as well. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247474#comment-16247474 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226661 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( + () -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS) --- End diff -- why not setting the timeout to `0`? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247454#comment-16247454 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221248 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); --- End diff -- Not sure whether we should hard code the timeout here. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247466#comment-16247466 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225368 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ --- End diff -- JavaDocs need to be adapted > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247455#comment-16247455 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221573 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { + try { + client.shutdown().join(); + } catch (CompletionException e) { + throw e.getCause(); --- End diff -- What if `getCause` is `null`? There is `ExceptionUtils.stripFromCompletionException`. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247457#comment-16247457 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223999 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) --- End diff -- why not setting the timeout here to `0`? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247464#comment-16247464 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225411 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { --- End diff -- What does the boolean future value tell us about the `shutdown`? > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion
[ https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247453#comment-16247453 ] ASF GitHub Bot commented on FLINK-7974: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221458 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { --- End diff -- We should not throw `Throwable`. > AbstractServerBase#shutdown does not wait for shutdown completion > - > > Key: FLINK-7974 > URL: https://issues.apache.org/jira/browse/FLINK-7974 > Project: Flink > Issue Type: Bug > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Critical > > The {{AbstractServerBase}} does not wait for the completion of its shutdown > when calling {{AbstractServerBase#shutdown}}. This is problematic since it > leads to resource leaks and instable tests such as the > {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} > return a termination future which is completed upon shutdown completion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230620 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { --- End diff -- This method when being called twice will give you a wrong result. The second call will immediately return you a completed future. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224448 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { + handler = null; + handlerShutdownFuture.complete(null); + }); + } else { + handlerShutdownFuture.complete(null); } --- End diff -- Why do we execute the handler shutdown in the callback of the `queryExecShutdownFuture`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225368 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ --- End diff -- JavaDocs need to be adapted ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228196 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); --- End diff -- result of channel future is ignored. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221573 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { + try { + client.shutdown().join(); + } catch (CompletionException e) { + throw e.getCause(); --- End diff -- What if `getCause` is `null`? There is `ExceptionUtils.stripFromCompletionException`. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150229482 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); } } @Override public void onFailure(Throwable cause) { - if (close(cause)) { - // Remove from established channels, otherwise future - // requests will be handled by this failed channel. - establishedConnections.remove(serverAddress, this); - } + close(cause).thenAccept(cancelled -> { + if (cancelled) { --- End diff -- Why do we only remove `this` from `establischedConnections` if `cancelled` is true? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225343 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java --- @@ -185,7 +185,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E /** * Shuts down any handler specific resources, e.g. thread pools etc. */ --- End diff -- JavaDocs missing ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224857 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); --- End diff -- Shouldn't we take the `finished` value to complete the `groupShutdownFuture`? There might have been an exception been thrown. We should forward this then as well. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228244 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); if (failureCause.compareAndSet(null, cause)) { - channel.close(); - stats.reportInactiveConnection(); + final CompletableFuture tmp = new CompletableFuture<>(); + channel.close().addListener(finished -> tmp.complete(null)); - for (long requestId : pendingRequests.keySet()) { - TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { - stats.reportFailedRequest(); + tmp.thenRun(() -> { --- End diff -- here as well. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225411 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { --- End diff -- What does the boolean future value tell us about the `shutdown`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228810 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); + pending.complete(response); } } @Override public void onRequestFailure(long requestId, Throwable cause) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.completeExceptionally(cause)) { + if (pending != null && !pending.isDone()) { stats.reportFailedRequest(); + pending.completeExceptionally(cause); --- End diff -- Same here? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223999 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) --- End diff -- why not setting the timeout here to `0`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221248 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); --- End diff -- Not sure whether we should hard code the timeout here. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150225215 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); + } else { + groupShutdownFuture.complete(null); + } + } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); + if (handler != null) { + handler.shutdown().thenRun(() -> { + handler = null; --- End diff -- I would actually refrain from state changes from an aysnchronous callback thread. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150223049 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; --- End diff -- Better to use `Executors.gracefulShutdown` for this. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150221458 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java --- @@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress remoteAddress, final int remotePor new DisabledKvStateRequestStats()); } - /** Shuts down the client. */ + /** +* Shuts down the client and waits for 10 seconds +* for the shutdown to be completed. +* +* If this expires, or an exception is thrown for +* any reason, then a warning is printed containing the +* exception. +*/ public void shutdown() { - client.shutdown(); + try { + client.shutdown().get(10L, TimeUnit.SECONDS); + LOG.info("The Queryable State Client was shutdown successfully."); + } catch (Exception e) { + LOG.warn("The Queryable State Client shutdown failed: ", e); + } + } + + /** +* Shuts down the client and waits until shutdown is completed. +* +* If an exception is thrown for any reason, then this exception +* is further propagated upwards. +*/ + public void shutdownAndWait() throws Throwable { --- End diff -- We should not throw `Throwable`. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150230125 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -419,20 +440,27 @@ void close() { * @param cause The cause to close the channel with. * @return Channel close future */ - private boolean close(Throwable cause) { + private CompletableFuture close(Throwable cause) { --- End diff -- I think this method can give you wrong results when being called twice. The second call will give you a completed future even though the first call could still be running. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226027 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); --- End diff -- could be initialized with the number of established + pending connections. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224310 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ --- End diff -- JavaDocs should be updated. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150224362 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java --- @@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws Throwable { /** * Shuts down the server and all related thread pools. */ - public void shutdown() { - LOG.info("Shutting down server {} @ {}", serverName, serverAddress); - - if (handler != null) { - handler.shutdown(); - handler = null; - } - - if (queryExecutor != null) { - queryExecutor.shutdown(); - } + public CompletableFuture shutdownServer(Time timeout) throws InterruptedException { + log.info("Shutting down {} @ {}", serverName, serverAddress); + + final CompletableFuture queryExecShutdownFuture = CompletableFuture.supplyAsync(() -> { + try { + if (queryExecutor != null && !queryExecutor.isShutdown()) { + queryExecutor.shutdown(); + queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + return true; + } + } catch (InterruptedException e) { + log.warn("Failed to shutdown {}: ", serverName, e); + return false; + } + return false; + }); + + final CompletableFuture groupShutdownFuture = new CompletableFuture<>(); + final CompletableFuture handlerShutdownFuture = new CompletableFuture<>(); + + queryExecShutdownFuture.thenRun(() -> { + if (bootstrap != null) { + EventLoopGroup group = bootstrap.group(); + if (group != null) { + group.shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> groupShutdownFuture.complete(null)); --- End diff -- why do we complete a `boolean` future with `null`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150228785 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -483,27 +511,31 @@ private boolean close(Throwable cause) { @Override public void onRequestResult(long requestId, RESP response) { TimestampedCompletableFuture pending = pendingRequests.remove(requestId); - if (pending != null && pending.complete(response)) { + if (pending != null && !pending.isDone()) { long durationMillis = (System.nanoTime() - pending.getTimestamp()) / 1_000_000L; stats.reportSuccessfulRequest(durationMillis); --- End diff -- Shouldn't we only increase the `stats.reportSuccessfulRequest` if we could complete `pending`? ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150226886 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -166,27 +168,39 @@ public String getClientName() { * * After a call to this method, all returned futures will be failed. */ - public void shutdown() { + public CompletableFuture shutdown() { + final CompletableFuture shutdownFuture = new CompletableFuture<>(); + if (shutDown.compareAndSet(false, true)) { + final ListconnectionFutures = new ArrayList<>(); for (Map.Entry conn : establishedConnections.entrySet()) { if (establishedConnections.remove(conn.getKey(), conn.getValue())) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } for (Map.Entry conn : pendingConnections.entrySet()) { if (pendingConnections.remove(conn.getKey()) != null) { - conn.getValue().close(); + connectionFutures.add(conn.getValue().close()); } } - if (bootstrap != null) { - EventLoopGroup group = bootstrap.group(); - if (group != null) { - group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS); - } - } + CompletableFuture.allOf(connectionFutures.toArray(new CompletableFuture[connectionFutures.size()])).thenRun( --- End diff -- Also here we shouldn't ignore the result of the `allOf` future because it might contain an exception. ---
[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4993#discussion_r150227018 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java --- @@ -309,31 +323,38 @@ private void handInChannel(Channel channel) { /** * Close the connecting channel with a ClosedChannelException. */ - private void close() { - close(new ClosedChannelException()); + private CompletableFuture close() { + return close(new ClosedChannelException()); } /** * Close the connecting channel with an Exception (can be {@code null}) * or forward to the established channel. */ - private void close(Throwable cause) { + private CompletableFuture close(Throwable cause) { synchronized (connectLock) { - if (!closed) { + final CompletableFuture shutdownFuture; + if (closed) { + shutdownFuture = new CompletableFuture<>(); --- End diff -- shorter: `CompletableFuture.completedFuture(null)` ---