[jira] [Commented] (FLINK-4500) Cassandra sink can lose messages

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248266#comment-16248266
 ] 

ASF GitHub Bot commented on FLINK-4500:
---

Github user mcfongtw commented on the issue:

https://github.com/apache/flink/pull/4605
  
Hi, @zentol , thanks for reviewing this PR. I recalled that I put a [caveat 
](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html#checkpointing-and-fault-tolerance)about
 this potential data loss in the latest C* connector documents. Since this fix 
is committed, would you like me to open another PR just to remove that warning 
message from document?


> Cassandra sink can lose messages
> 
>
> Key: FLINK-4500
> URL: https://issues.apache.org/jira/browse/FLINK-4500
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
> Fix For: 1.4.0
>
>
> The problem is the same as I pointed out with the Kafka producer sink 
> (FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
> both send data asynchronously to Cassandra and record whether an error occurs 
> via a future callback.  But CassandraSinkBase does not implement 
> Checkpointed, so it can't stop checkpoint from happening even though the are 
> Cassandra queries in flight from the checkpoint that may fail.  If they do 
> fail, they would subsequently not be replayed when the job recovered, and 
> would thus be lost.
> In addition, 
> CassandraSinkBase's close should check whether there is a pending exception 
> and throw it, rather than silently close.  It should also wait for any 
> pending async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4605: [FLINK-4500] [C* Connector] CassandraSinkBase implements ...

2017-11-10 Thread mcfongtw
Github user mcfongtw commented on the issue:

https://github.com/apache/flink/pull/4605
  
Hi, @zentol , thanks for reviewing this PR. I recalled that I put a [caveat 
](https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html#checkpointing-and-fault-tolerance)about
 this potential data loss in the latest C* connector documents. Since this fix 
is committed, would you like me to open another PR just to remove that warning 
message from document?


---


[jira] [Commented] (FLINK-4575) DataSet aggregate methods should support POJOs

2017-11-10 Thread Chuyang Wan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248142#comment-16248142
 ] 

Chuyang Wan commented on FLINK-4575:


[~ggevay], if you are ok with converting the POJO to a Tuple via getFlatFields, 
I can take the task. The idea is, to take POJO field for aggregation operators' 
argument, and convert it to tuple field, then just use the tuple field as what 
the current program does. 

> DataSet aggregate methods should support POJOs
> --
>
> Key: FLINK-4575
> URL: https://issues.apache.org/jira/browse/FLINK-4575
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: Gabor Gevay
>Priority: Minor
>  Labels: starter
>
> The aggregate methods of DataSets (aggregate, sum, min, max) currently only 
> support Tuples, with the fields specified by indices. With 
> https://issues.apache.org/jira/browse/FLINK-3702 resolved, adding support for 
> POJOs and field expressions would be easy: {{AggregateOperator}} would create 
> {{FieldAccessors}} instead of just storing field positions, and 
> {{AggregateOperator.AggregatingUdf}} would use these {{FieldAccessors}} 
> instead of the Tuple field access methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7702) Javadocs are not being built

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16248136#comment-16248136
 ] 

ASF GitHub Bot commented on FLINK-7702:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4998

[FLINK-7702] Add maven-bundle-plugin to root pom

Before, we had it in places that require it. This doesn't work when
running mvn javadoc:aggregate because this will only run for the root
pom and can then not find the "bundle" dependencies.

R: @zentol, could you maybe have a look.

Before this change you couldn't run `mvn javadoc:aggregate 
-DadditionalJOption="-Xdoclint:none" -Dmaven.javadoc.failOnError=false` 
successfully, meaning that we never actually built `1.4-SNAPSHOT` Javadocs. 
With this change it seems to work but the `genjavadoc` tool that generates fake 
Java sources from Scala code so that we can aggregate Javadocs also for Scala 
code doesn't seem to work anymore on Scala 2.11.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-7702-fix-javadoc-build-with-genjava

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4998


commit caba0cbc36e8dd588e57c91c102c4fa097e6b5f6
Author: Aljoscha Krettek 
Date:   2017-11-10T17:13:26Z

[FLINK-7702] Add maven-bundle-plugin to root pom

Before, we had it in places that require it. This doesn't work when
running mvn javadoc:aggregate because this will only run for the root
pom and can then not find the "bundle" dependencies.




> Javadocs are not being built
> 
>
> Key: FLINK-7702
> URL: https://issues.apache.org/jira/browse/FLINK-7702
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Gabor Gevay
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The "Javadocs" link in the left side menu of this page doesn't work:
> https://ci.apache.org/projects/flink/flink-docs-master/
> Note that it works in 1.3:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4998: [FLINK-7702] Add maven-bundle-plugin to root pom

2017-11-10 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4998

[FLINK-7702] Add maven-bundle-plugin to root pom

Before, we had it in places that require it. This doesn't work when
running mvn javadoc:aggregate because this will only run for the root
pom and can then not find the "bundle" dependencies.

R: @zentol, could you maybe have a look.

Before this change you couldn't run `mvn javadoc:aggregate 
-DadditionalJOption="-Xdoclint:none" -Dmaven.javadoc.failOnError=false` 
successfully, meaning that we never actually built `1.4-SNAPSHOT` Javadocs. 
With this change it seems to work but the `genjavadoc` tool that generates fake 
Java sources from Scala code so that we can aggregate Javadocs also for Scala 
code doesn't seem to work anymore on Scala 2.11.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-7702-fix-javadoc-build-with-genjava

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4998


commit caba0cbc36e8dd588e57c91c102c4fa097e6b5f6
Author: Aljoscha Krettek 
Date:   2017-11-10T17:13:26Z

[FLINK-7702] Add maven-bundle-plugin to root pom

Before, we had it in places that require it. This doesn't work when
running mvn javadoc:aggregate because this will only run for the root
pom and can then not find the "bundle" dependencies.




---


[jira] [Updated] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails

2017-11-10 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7488:
--
Description: 
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:275)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:110)
{code}

$HADOOP_CONF_DIR was not set prior to running the test.

  was:
{code}
compareNetworkBufShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.239 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different network buffer memory sizes with 
configuration: {taskmanager.network.memory.fraction=0.1, 
taskmanager.memory.off-heap=false, taskmanager.memory.fraction=0.7, 
taskmanager.memory.size=-1, taskmanager.network.memory.max=1073741824, 
taskmanager.heap.mb=1000, taskmanager.network.memory.min=67108864} 
expected:<[]104857600> but was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf 
because no HADOOP_CONF_DIR was set.Using the result of 'hadoop classpath' to 
augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
  at org.junit.Assert.assertEquals(Assert.java:115)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufJavaVsScript(TaskManagerHeapSizeCalculationJavaBashTest.java:235)
  at 
org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava(TaskManagerHeapSizeCalculationJavaBashTest.java:81)

compareHeapSizeShellScriptWithJava(org.apache.flink.dist.TaskManagerHeapSizeCalculationJavaBashTest)
  Time elapsed: 0.16 sec  <<< FAILURE!
org.junit.ComparisonFailure: Different 

[jira] [Created] (FLINK-8047) Create HTTP Sink

2017-11-10 Thread Jessica Negara (JIRA)
Jessica Negara created FLINK-8047:
-

 Summary: Create HTTP Sink
 Key: FLINK-8047
 URL: https://issues.apache.org/jira/browse/FLINK-8047
 Project: Flink
  Issue Type: Improvement
  Components: Batch Connectors and Input/Output Formats
Reporter: Jessica Negara
 Fix For: 1.4.0


Implement an HTTP output connector + sink, to allow users to send Flink output 
to an HTTP web server.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-10 Thread Juan Miguel Cejuela (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247846#comment-16247846
 ] 

Juan Miguel Cejuela commented on FLINK-8046:


Since we are at this, it is in my humble opinion also strange that, when 
computing the file splits as in `format.createInputSplits(readerParallelism)`, 
the given `readerParallelism` is used, but not the the format's `unstoppable` 
field or `.getNumSplits()` method.

I don't know if this could be for another issue.

> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247825#comment-16247825
 ] 

ASF GitHub Bot commented on FLINK-8046:
---

GitHub user juanmirocks opened a pull request:

https://github.com/apache/flink/pull/4997

[FLINK-8046] [flink-streaming-java] Have filter of timestamp compare with 
strictly SMALLER (NOT smaller or equal)

## What is the purpose of the change

This change fixes the wrong ignoring of files with same exact timestamp. 
This change also matches the doc header of the method (`shouldIgnore`): "...if 
the modification time of the file is smaller than...".

Without this change, some files with same exact timestamp (because they 
were written at the same exact long time) will be ignored, which is unexpected 
by the user.

Also you would find the funny log of `Ignoring file:/XXX, with mod time= 
1510321363000 and global mod time= 1510321363000`

## Brief change log

* Comparison is done with strictly SMALLER (<)

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tagtog/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4997


commit 2db52989fef2455413d42286893c5227983ee74b
Author: Juan Miguel Cejuela 
Date:   2017-11-10T16:57:09Z

compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header 
"if the modification time of the file is smaller than")

Otherwise, some files with same exact timestamp (because they were written 
at the same exact long time) will be ignored.




> ContinuousFileMonitoringFunction wrongly ignores files with exact same 
> timestamp
> 
>
> Key: FLINK-8046
> URL: https://issues.apache.org/jira/browse/FLINK-8046
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.2
>Reporter: Juan Miguel Cejuela
>  Labels: stream
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The current monitoring of files sets the internal variable 
> `globalModificationTime` to filter out files that are "older". However, the 
> current test (to check "older") does 
> `boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
> `shouldIgnore`)
> The comparison should strictly be SMALLER (NOT smaller or equal). The method 
> documentation also states "This happens if the modification time of the file 
> is _smaller_ than...".
> The equality acceptance for "older", makes some files with same exact 
> timestamp to be ignored. The behavior is also non-deterministic, as the first 
> file to be accepted ("first" being pretty much random) makes the rest of 
> files with same exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4997: [FLINK-8046] [flink-streaming-java] Have filter of...

2017-11-10 Thread juanmirocks
GitHub user juanmirocks opened a pull request:

https://github.com/apache/flink/pull/4997

[FLINK-8046] [flink-streaming-java] Have filter of timestamp compare with 
strictly SMALLER (NOT smaller or equal)

## What is the purpose of the change

This change fixes the wrong ignoring of files with same exact timestamp. 
This change also matches the doc header of the method (`shouldIgnore`): "...if 
the modification time of the file is smaller than...".

Without this change, some files with same exact timestamp (because they 
were written at the same exact long time) will be ignored, which is unexpected 
by the user.

Also you would find the funny log of `Ignoring file:/XXX, with mod time= 
1510321363000 and global mod time= 1510321363000`

## Brief change log

* Comparison is done with strictly SMALLER (<)

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tagtog/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4997


commit 2db52989fef2455413d42286893c5227983ee74b
Author: Juan Miguel Cejuela 
Date:   2017-11-10T16:57:09Z

compare as strictly SMALLER (not SMALLER OR EQUAL) (as per the doc header 
"if the modification time of the file is smaller than")

Otherwise, some files with same exact timestamp (because they were written 
at the same exact long time) will be ignored.




---


[jira] [Created] (FLINK-8046) ContinuousFileMonitoringFunction wrongly ignores files with exact same timestamp

2017-11-10 Thread Juan Miguel Cejuela (JIRA)
Juan Miguel Cejuela created FLINK-8046:
--

 Summary: ContinuousFileMonitoringFunction wrongly ignores files 
with exact same timestamp
 Key: FLINK-8046
 URL: https://issues.apache.org/jira/browse/FLINK-8046
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.2
Reporter: Juan Miguel Cejuela
 Fix For: 1.5.0


The current monitoring of files sets the internal variable 
`globalModificationTime` to filter out files that are "older". However, the 
current test (to check "older") does 

`boolean shouldIgnore = modificationTime <= globalModificationTime;` (rom 
`shouldIgnore`)

The comparison should strictly be SMALLER (NOT smaller or equal). The method 
documentation also states "This happens if the modification time of the file is 
_smaller_ than...".

The equality acceptance for "older", makes some files with same exact timestamp 
to be ignored. The behavior is also non-deterministic, as the first file to be 
accepted ("first" being pretty much random) makes the rest of files with same 
exact timestamp to be ignored.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7702) Javadocs are not being built

2017-11-10 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7702:

Summary: Javadocs are not being built  (was: Javadocs link broken)

> Javadocs are not being built
> 
>
> Key: FLINK-7702
> URL: https://issues.apache.org/jira/browse/FLINK-7702
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Gabor Gevay
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The "Javadocs" link in the left side menu of this page doesn't work:
> https://ci.apache.org/projects/flink/flink-docs-master/
> Note that it works in 1.3:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8045) Add Internal DATE/TIME/TIMESTAMP as internal representation of DATE/TIME/TIMESTAMP

2017-11-10 Thread Zhenghua Gao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhenghua Gao updated FLINK-8045:

Description: 
Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is 
represented as Int internal. 
This feature may improve performance processing DATE/TIME/TIMESTAMP data. 
But I found there is a LIMITATION: internal representation exists only within 
one operator. 
We transfer DATE/TIME/TIMESTAMP objects between operators.

I think we could treat DATE/TIME/TIMESTAMP as internal representation in the 
whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT)


  was:
Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is 
represented as Int internal. This feature may improve performance processing 
DATE/TIME/TIMESTAMP data. But I found there is a LIMITATION: internal 
representation exists only within one operator. 
We transfer DATE/TIME/TIMESTAMP objects between operators.
 I think we could treat DATE/TIME/TIMESTAMP as internal representation in 
the whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT)



> Add Internal DATE/TIME/TIMESTAMP as internal representation of 
> DATE/TIME/TIMESTAMP
> --
>
> Key: FLINK-8045
> URL: https://issues.apache.org/jira/browse/FLINK-8045
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Zhenghua Gao
>Assignee: Zhenghua Gao
>
> Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is 
> represented as Int internal. 
> This feature may improve performance processing DATE/TIME/TIMESTAMP data. 
> But I found there is a LIMITATION: internal representation exists only within 
> one operator. 
> We transfer DATE/TIME/TIMESTAMP objects between operators.
> I think we could treat DATE/TIME/TIMESTAMP as internal representation in the 
> whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8045) Add Internal DATE/TIME/TIMESTAMP as internal representation of DATE/TIME/TIMESTAMP

2017-11-10 Thread Zhenghua Gao (JIRA)
Zhenghua Gao created FLINK-8045:
---

 Summary: Add Internal DATE/TIME/TIMESTAMP as internal 
representation of DATE/TIME/TIMESTAMP
 Key: FLINK-8045
 URL: https://issues.apache.org/jira/browse/FLINK-8045
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Zhenghua Gao
Assignee: Zhenghua Gao


Currently DATE/TIME/TIMESTAMP have internal representation. Such as Date is 
represented as Int internal. This feature may improve performance processing 
DATE/TIME/TIMESTAMP data. But I found there is a LIMITATION: internal 
representation exists only within one operator. 
We transfer DATE/TIME/TIMESTAMP objects between operators.
 I think we could treat DATE/TIME/TIMESTAMP as internal representation in 
the whole job, and cast them to java.sql.* as needed(UDF/UDTF/OUTPUT)




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4996: [hotfix][docs] Change mailing list link in quickst...

2017-11-10 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4996


---


[GitHub] flink issue #4996: [hotfix][docs] Change mailing list link in quickstart to ...

2017-11-10 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4996
  
Thanks @aljoscha for merging!


---


[jira] [Commented] (FLINK-7845) Netty Exception when submitting batch job repeatedly

2017-11-10 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247785#comment-16247785
 ] 

Piotr Nowojski commented on FLINK-7845:
---

Could you strip down your example to some minimal code showing the problem? 

This IllegalAccessError is almost for sure unrelated. Probably it's caused by 
dependency convergence error on between Netty 3.8 and 4.0 which should be fixed 
by https://issues.apache.org/jira/browse/FLINK-7013. You can try it out with 
Flink 1.4-SNAPSHOT or wait until the release of 1.4.


> Netty Exception when submitting batch job repeatedly
> 
>
> Key: FLINK-7845
> URL: https://issues.apache.org/jira/browse/FLINK-7845
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Network
>Affects Versions: 1.3.2
>Reporter: Flavio Pompermaier
>
> We had some problems with Flink and Netty so we wrote a small unit test to 
> reproduce the memory issues we have in production. It happens that we have to 
> restart the Flink cluster because the memory is always increasing from job to 
> job. 
> The github project is https://github.com/okkam-it/flink-memory-leak and the 
> JUnit test is contained in the MemoryLeakTest class (within src/main/test).
> I don't know if this is the root of our problems but at some point, usually 
> around the 28th loop, the job fails with the following exception (actually we 
> never faced that in production but maybe is related to the memory issue 
> somehow...):
> {code:java}
> Caused by: java.lang.IllegalAccessError: 
> org/apache/flink/runtime/io/network/netty/NettyMessage
>   at 
> io.netty.util.internal.__matchers__.org.apache.flink.runtime.io.network.netty.NettyMessageMatcher.match(NoOpTypeParameterMatcher.java)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.acceptInboundMessage(SimpleChannelInboundHandler.java:95)
>   at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:102)
>   ... 16 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-10 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7880:
---

Assignee: Kostas Kloudas  (was: Aljoscha Krettek)

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-11-10 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7880:
---

Assignee: Aljoscha Krettek  (was: Kostas Kloudas)

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry

2017-11-10 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247736#comment-16247736
 ] 

Fabian Hueske commented on FLINK-7919:
--

Hi [~greghogan], yes that would be an option as well.

This would have the benefit of being more aligned with the existing API (outer 
joins have their own API calls). The fact, that this issue hasn't been reported 
earlier also indicates that it is usually used as an inner join.

> Join with Solution Set fails with NPE if Solution Set has no entry
> --
>
> Key: FLINK-7919
> URL: https://issues.apache.org/jira/browse/FLINK-7919
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>
> A job with a delta iteration fails hard with a NPE in the solution set join, 
> if the solution set has no entry for the join key of the probe side.
> The following program reproduces the problem:
> {code}
> DataSet> values = env.fromElements(
>   Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
> DeltaIteration, Tuple2> di = values
>   .iterateDelta(values, 5,0);
> DataSet> loop = di.getWorkset()
>   .map(new MapFunction, Tuple2>() {
> @Override
> public Tuple2 map(Tuple2 value) throws 
> Exception {
>   // modifying the key to join on a non existing solution set key 
>   return Tuple2.of(value.f0 + 1, 1);
> }
>   })
>   .join(di.getSolutionSet()).where(0).equalTo(0)
>   .with(new JoinFunction, Tuple2, 
> Tuple2>() {
> @Override
> public Tuple2 join(
>   Tuple2 first, 
>   Tuple2 second) throws Exception {
>   
>   return Tuple2.of(first.f0, first.f1 + second.f1);
> }
>   });
> DataSet> result = di.closeWith(loop, loop);
> result.print();
> {code}
> It doesn't matter whether the solution set is managed or not. 
> The problem is cause because the solution set hash table prober returns a 
> {{null}} value if the solution set does not contain a value for the probe 
> side key. 
> The join operator does not check if the return value is {{null}} or not but 
> immediately tries to create a copy using a {{TypeSerializer}}. This copy 
> fails with a NPE.
> I propose to check for {{null}} and call the join function with {{null}} on 
> the solution set side. This gives OUTER JOIN semantics for join.
> Since the code was previously failing with a NPE, it is safe to forward the 
> {{null}} into the {{JoinFunction}}. 
> However, users must be aware that the solution set value may be {{null}} and 
> we need to update the documentation (JavaDocs + website) to describe the 
> behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247727#comment-16247727
 ] 

ASF GitHub Bot commented on FLINK-7973:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4961
  
I did include your end-to-end tests (with some fixes) and the fixes for the 
errors they found. Should be fine now, let's see what travis says...


> Fix service shading relocation for S3 file systems
> --
>
> Key: FLINK-7973
> URL: https://issues.apache.org/jira/browse/FLINK-7973
> Project: Flink
>  Issue Type: Bug
>Reporter: Stephan Ewen
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The shade plugin relocates services incorrectly currently, applying 
> relocation patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4961: [FLINK-7973] fix shading and relocating Hadoop for the S3...

2017-11-10 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4961
  
I did include your end-to-end tests (with some fixes) and the fixes for the 
errors they found. Should be fine now, let's see what travis says...


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247724#comment-16247724
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r150275387
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

makes sense, I guess


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r150275387
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

makes sense, I guess


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247722#comment-16247722
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r150275161
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

this dependency is, however, somehow related to `com.amazonaws`, not our 
code...


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> 

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r150275161
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

this dependency is, however, somehow related to `com.amazonaws`, not our 
code...


---


[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...

2017-11-10 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4939
  
1) yes, I have tested the changes on a YARN cluster as described in the PR 
comment (before the changes on the unit tests)
2) regarding the `pom.xml` changes: I actually took those from 
991af3652479f85f732cbbade46bed7df1c5d819 and 
36b663f4561458408ee68902a1db6a5cd539e1c2 and can only guess why @StephanEwen 
and @zentol used this way except for what you proposed. Maybe they can clarify.


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247721#comment-16247721
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4939
  
1) yes, I have tested the changes on a YARN cluster as described in the PR 
comment (before the changes on the unit tests)
2) regarding the `pom.xml` changes: I actually took those from 
991af3652479f85f732cbbade46bed7df1c5d819 and 
36b663f4561458408ee68902a1db6a5cd539e1c2 and can only guess why @StephanEwen 
and @zentol used this way except for what you proposed. Maybe they can clarify.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7999) Variable Join Window Boundaries

2017-11-10 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247674#comment-16247674
 ] 

Seth Wiesman edited comment on FLINK-7999 at 11/10/17 3:47 PM:
---

Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each 
record in the main stream represents contains both a campaign id as well as 
information about a single financial transaction, this stream is analogous to 
the fact table in a traditional data warehouse. However, each campaign runs 
under a different currency so I need to join with metadata containing the 
currency code for that campaign. Campaigns have both start and end dates, which 
can span any amount of time from one day to several months. 

Events: 
timestamp | id | spend   
- 
2017-11-11 03:00 | 1  | 0.25   
2017-11-11 03:02 | 2  | 0.03   
2017-11-11 03:05 | 1  | 0.11

Metadata: 
id | start_date   | end_date | currency_code

1 | 2017-11-10 | 2017-11-12 |   "USD"
2 | 2017-04-02 | 2019-12-31 |   "EUR"


I have a valid window of each event can be joined, but it varies by id, today 
implement this join with the following `CoProcessFunction`. 

{code:java}
class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, 
CampaignMetadata, Event] {

  @transient private lazy val descriptor = new 
ValueStateDescriptor[CampaignMetadata]("campaign", 
createTypeInformation[CampaignMetadata])

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, 
CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = {
val campaign = getRuntimeContext.getState(descriptor).value()

if (campaign != null && campaign.start <= ctx.timestamp()) {
  out.collect(value.copy(meta = campaign))
}
  }

  override def processElement2(value: CampaignMetadata, ctx: 
CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: 
Collector[Event]): Unit = {
val end = value.end.getTime + allowedLateness
if (end < ctx.timerService().currentWatermark()) {
  return
}

ctx.timerService().registerEventTimeTimer(end)
getRuntimeContext.getState(descriptor).update(value)
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, 
CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = {
val state= getRuntimeContext.getState(descriptor)
val campaign = state.value()

if (campaign != null) {
  val end = campaign.end.getTime + allowedLateness

  if (end == timestamp) {
state.clear()
  }
}
  }
}

{code}


was (Author: sjwiesman):
Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each 
record in the main stream represents contains both a campaign id as well as 
information about a single financial transaction, this stream is analogous to 
the fact table in a traditional data warehouse. However, each campaign runs 
under a different currency so I need to join with metadata containing the 
currency code for that campaign. Campaigns have both start and end dates, which 
can span any amount of time from one day to several months. 

Events:  Metadata: 
timestamp | id | spend   id | start_date   | end_date | 
currency_code
- 
--- 
2017-11-11 03:00 | 1  | 0.25   1 | 2017-11-10 | 2017-11-12 |   "USD"
2017-11-11 03:02 | 2  | 0.03   2 | 2017-04-02 | 2019-12-31 |   "EUR"
2017-11-11 03:05 | 1  | 0.11

I have a valid window of each event can be joined, but it varies by id, today 
implement this join with the following `CoProcessFunction`. 

{code:java}
class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, 
CampaignMetadata, Event] {

  @transient private lazy val descriptor = new 
ValueStateDescriptor[CampaignMetadata]("campaign", 
createTypeInformation[CampaignMetadata])

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, 
CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = {
val campaign = getRuntimeContext.getState(descriptor).value()

if (campaign != null && campaign.start <= ctx.timestamp()) {
  out.collect(value.copy(meta = campaign))
}
  }

  override def processElement2(value: CampaignMetadata, ctx: 
CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: 
Collector[Event]): Unit = {
val end = value.end.getTime + allowedLateness
if (end < ctx.timerService().currentWatermark()) {
  return
}

ctx.timerService().registerEventTimeTimer(end)
  

[jira] [Commented] (FLINK-7999) Variable Join Window Boundaries

2017-11-10 Thread Seth Wiesman (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247674#comment-16247674
 ] 

Seth Wiesman commented on FLINK-7999:
-

Sure, I will use the example of my use case. 

I am using Flink to aggregate spend about campaigns that I am running. Each 
record in the main stream represents contains both a campaign id as well as 
information about a single financial transaction, this stream is analogous to 
the fact table in a traditional data warehouse. However, each campaign runs 
under a different currency so I need to join with metadata containing the 
currency code for that campaign. Campaigns have both start and end dates, which 
can span any amount of time from one day to several months. 

Events:  Metadata: 
timestamp | id | spend   id | start_date   | end_date | 
currency_code
- 
--- 
2017-11-11 03:00 | 1  | 0.25   1 | 2017-11-10 | 2017-11-12 |   "USD"
2017-11-11 03:02 | 2  | 0.03   2 | 2017-04-02 | 2019-12-31 |   "EUR"
2017-11-11 03:05 | 1  | 0.11

I have a valid window of each event can be joined, but it varies by id, today 
implement this join with the following `CoProcessFunction`. 

{code:java}
class CampaignJoin(allowedLateness: Long) extends CoProcessFunction[Event, 
CampaignMetadata, Event] {

  @transient private lazy val descriptor = new 
ValueStateDescriptor[CampaignMetadata]("campaign", 
createTypeInformation[CampaignMetadata])

  override def processElement1(value: Event, ctx: CoProcessFunction[Event, 
CampaignMetadata, Event]#Context, out: Collector[Event]): Unit = {
val campaign = getRuntimeContext.getState(descriptor).value()

if (campaign != null && campaign.start <= ctx.timestamp()) {
  out.collect(value.copy(meta = campaign))
}
  }

  override def processElement2(value: CampaignMetadata, ctx: 
CoProcessFunction[Event, CampaignMetadata, Event]#Context, out: 
Collector[Event]): Unit = {
val end = value.end.getTime + allowedLateness
if (end < ctx.timerService().currentWatermark()) {
  return
}

ctx.timerService().registerEventTimeTimer(end)
getRuntimeContext.getState(descriptor).update(value)
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[Record, 
CampaignMetadata, Record]#OnTimerContext, out: Collector[Record]): Unit = {
val state= getRuntimeContext.getState(descriptor)
val campaign = state.value()

if (campaign != null) {
  val end = campaign.end.getTime + allowedLateness

  if (end == timestamp) {
state.clear()
  }
}
  }
}

{code}

> Variable Join Window Boundaries
> ---
>
> Key: FLINK-7999
> URL: https://issues.apache.org/jira/browse/FLINK-7999
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Seth Wiesman
>
> Allow window joins with variable length based on row attributes. 
> Consider a two streams joined on an id, where one has start and end dates, it 
> would be useful to be able to join each row during is live durations. Today 
> this can be expressed in the datastream api using a CoProcessFunction. 
>  left.id = right.id AND (left.time > right.start and left.time < right.end)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6163) Document per-window state in ProcessWindowFunction

2017-11-10 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-6163.
---
Resolution: Fixed

Documented on master in
d98ba08a7a73dc93c839c77d11245aba70869ab5

Documented on release-1.4 in
da435f121821fd1107c41352a54ee804f10cf7e3

> Document per-window state in ProcessWindowFunction
> --
>
> Key: FLINK-6163
> URL: https://issues.apache.org/jira/browse/FLINK-6163
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The current windowing documentation mostly describes {{WindowFunction}} and 
> treats {{ProcessWindowFunction}} as an afterthought. We should reverse that 
> and also document the new per-key state that is only available to 
> {{ProcessWindowFunction}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6163) Document per-window state in ProcessWindowFunction

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247603#comment-16247603
 ] 

ASF GitHub Bot commented on FLINK-6163:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4992


> Document per-window state in ProcessWindowFunction
> --
>
> Key: FLINK-6163
> URL: https://issues.apache.org/jira/browse/FLINK-6163
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The current windowing documentation mostly describes {{WindowFunction}} and 
> treats {{ProcessWindowFunction}} as an afterthought. We should reverse that 
> and also document the new per-key state that is only available to 
> {{ProcessWindowFunction}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4992: [FLINK-6163] Document per-window state in ProcessW...

2017-11-10 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4992


---


[jira] [Commented] (FLINK-6163) Document per-window state in ProcessWindowFunction

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247601#comment-16247601
 ] 

ASF GitHub Bot commented on FLINK-6163:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4992
  
Thanks for the review!  



> Document per-window state in ProcessWindowFunction
> --
>
> Key: FLINK-6163
> URL: https://issues.apache.org/jira/browse/FLINK-6163
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The current windowing documentation mostly describes {{WindowFunction}} and 
> treats {{ProcessWindowFunction}} as an afterthought. We should reverse that 
> and also document the new per-key state that is only available to 
> {{ProcessWindowFunction}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4992: [FLINK-6163] Document per-window state in ProcessWindowFu...

2017-11-10 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4992
  
Thanks for the review! 😃 



---


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247539#comment-16247539
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150243346
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
--- End diff --

We want to avoid pulling in the dependency of `runtime` here. Does it make 
sense to move the `Executors` to another module, e.g. `core`.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4996: [hotfix][docs] Change mailing list link in quickst...

2017-11-10 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4996

[hotfix][docs] Change mailing list link in quickstart to flink-user

Change mailing list link in quickstart to flink-user. Previously it was 
pointing to flink-dev.

This change only fixes minor bug in documentation and doesn't touch source 
code.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink docs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4996.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4996


commit 8c5ae23445850d090bb713bc3db0ccbd32493fe9
Author: Piotr Nowojski 
Date:   2017-11-10T14:15:11Z

[hotfix][docs] Change mailing list link in quickstart to flink-user

Previously it was pointing to flink-dev




---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150243346
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
--- End diff --

We want to avoid pulling in the dependency of `runtime` here. Does it make 
sense to move the `Executors` to another module, e.g. `core`.


---


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247518#comment-16247518
 ] 

ASF GitHub Bot commented on FLINK-7811:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4995

[hotfix] [docs] Fix broken link to FLINK-7811

This fixes a broken hyperlink on page 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html#scala-versions


![image](https://user-images.githubusercontent.com/1681921/32661415-3205d53a-c627-11e7-8f15-3ff6c52c7bf0.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink hotfix-building-fix-url

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4995.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4995






> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4995: [hotfix] [docs] Fix broken link to FLINK-7811

2017-11-10 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/4995

[hotfix] [docs] Fix broken link to FLINK-7811

This fixes a broken hyperlink on page 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/start/building.html#scala-versions


![image](https://user-images.githubusercontent.com/1681921/32661415-3205d53a-c627-11e7-8f15-3ff6c52c7bf0.png)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink hotfix-building-fix-url

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4995.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4995






---


[GitHub] flink pull request #4994: [hotfix][build] Disable dependency convergence in ...

2017-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4994


---


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247487#comment-16247487
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150232000
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
--- End diff --

Ideally we would get rid of the `timeout` parameter because that is 
something the user specifies when calling `get()` on the returned termination 
future.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150232000
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
--- End diff --

Ideally we would get rid of the `timeout` parameter because that is 
something the user specifies when calling `get()` on the returned termination 
future.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150231729
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
--- End diff --

What if `awaitTermination` returns `false`? Then we should call 
`queryExecutor.shutdownNow`


---


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247484#comment-16247484
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150231729
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
--- End diff --

What if `awaitTermination` returns `false`? Then we should call 
`queryExecutor.shutdownNow`


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247473#comment-16247473
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228196
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
--- End diff --

result of channel future is ignored.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247472#comment-16247472
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228244
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
 
-   for (long requestId : pendingRequests.keySet()) 
{
-   TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
-   stats.reportFailedRequest();
+   tmp.thenRun(() -> {
--- End diff --

here as well.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247459#comment-16247459
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223433
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
--- End diff --

Is is strictly necessary to first shut down the query executor before 
shutting down the netty server?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247470#comment-16247470
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226636
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
+   () -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = 
bootstrap.group();
+   if (group != null) {
+   
group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS)
+   
.addListener(finished -> shutdownFuture.complete(true));
--- End diff --

result should be considered.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247456#comment-16247456
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223049
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
--- End diff --

Better to use `Executors.gracefulShutdown` for this.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247458#comment-16247458
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224310
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
--- End diff --

JavaDocs should be updated.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247479#comment-16247479
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228810
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+   pending.complete(response);
}
}
 
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
+   if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+   pending.completeExceptionally(cause);
--- End diff --

Same here?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247460#comment-16247460
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225136
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
--- End diff --

same here, what if an exception happened while shutting `handler` down? 
Shouldn't we complete the `handlerShutdownFuture` with it?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247467#comment-16247467
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226886
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
--- End diff --

Also here we shouldn't ignore the result of the `allOf` future because it 
might contain an exception.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247463#comment-16247463
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224448
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
+   handler = null;
+   handlerShutdownFuture.complete(null);
+   });
+   } else {
+   handlerShutdownFuture.complete(null);
}
--- End diff --

Why do we execute the handler shutdown in the callback of the 
`queryExecShutdownFuture`?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247480#comment-16247480
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230620
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
--- End diff --

This method when being called twice will give you a wrong result. The 
second call will immediately return you a completed future.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247461#comment-16247461
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224362
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
--- End diff --

why do we complete a `boolean` future with `null`?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247482#comment-16247482
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228785
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
--- End diff --

Shouldn't we only increase the `stats.reportSuccessfulRequest` if we could 
complete `pending`?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247477#comment-16247477
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230125
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
--- End diff --

I think this method can give you wrong results when being called twice. The 
second call will give you a completed future even though the first call could 
still be running.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247471#comment-16247471
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225215
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
+   handler = null;
--- End diff --

I would actually refrain from state changes from an aysnchronous callback 
thread.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247476#comment-16247476
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228000
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
synchronized (connectLock) {
-   if (!closed) {
+   final CompletableFuture shutdownFuture;
+   if (closed) {
+   shutdownFuture = new 
CompletableFuture<>();
+   shutdownFuture.complete(null);
+   } else {
if (failureCause == null) {
failureCause = cause;
}
 
if (established != null) {
-   established.close();
+   shutdownFuture = 
established.close();
} else {
PendingRequest pending;
while ((pending = 
queuedRequests.poll()) != null) {

pending.completeExceptionally(cause);
}
+   shutdownFuture = new 
CompletableFuture<>();
+   shutdownFuture.complete(null);
}
-   closed = true;
+   shutdownFuture.thenRun(() -> closed = 
true);
--- End diff --

Why do we set `closed` to true` only after the shutdown completed? 
Shouldn't `close` directly set it to true such that all preceding operations 
won't be executed anymore?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247462#comment-16247462
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225343
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 ---
@@ -185,7 +185,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
/**
 * Shuts down any handler specific resources, e.g. thread pools etc.
 */
--- End diff --

JavaDocs missing


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247478#comment-16247478
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230379
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
--- End diff --

This method is blocking even though it shouldn't be. Imagine that two close 
calls happen concurrently. One of them should trigger the closing operation and 
the other should immediately return with the termination future.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247452#comment-16247452
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150222505
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -250,8 +252,8 @@ private boolean attemptToBind(final int port) throws 
Throwable {
 
throw future.cause();
} catch (BindException e) {
-   LOG.debug("Failed to start server {} on port {}: {}.", 
serverName, port, e.getMessage());
-   shutdown();
+   log.debug("Failed to start {} on port {}: {}.", 
serverName, port, e.getMessage());
+   shutdownServer(Time.seconds(10L)).join();
--- End diff --

I would use `get()` because it makes the thrown exceptions explicit.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247468#comment-16247468
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228589
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
 
-   for (long requestId : pendingRequests.keySet()) 
{
-   TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
-   stats.reportFailedRequest();
+   tmp.thenRun(() -> {
--- End diff --

Instead of creating the additional indirection with `shutdownFuture`, 
couldn't we simply return the result here as teh shutdown future?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247475#comment-16247475
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150229482
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+   pending.complete(response);
}
}
 
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
+   if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+   pending.completeExceptionally(cause);
}
}
 
@Override
public void onFailure(Throwable cause) {
-   if (close(cause)) {
-   // Remove from established channels, otherwise 
future
-   // requests will be handled by this failed 
channel.
-   establishedConnections.remove(serverAddress, 
this);
-   }
+   close(cause).thenAccept(cancelled -> {
+   if (cancelled) {
--- End diff --

Why do we only remove `this` from `establischedConnections` if `cancelled` 
is true?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247469#comment-16247469
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226027
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
--- End diff --

could be initialized with the number of established + pending connections.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247481#comment-16247481
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150227018
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
synchronized (connectLock) {
-   if (!closed) {
+   final CompletableFuture shutdownFuture;
+   if (closed) {
+   shutdownFuture = new 
CompletableFuture<>();
--- End diff --

shorter: `CompletableFuture.completedFuture(null)`


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247451#comment-16247451
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221485
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
+   try {
+   client.shutdown().join();
--- End diff --

`get()` is better here


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247465#comment-16247465
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224857
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
--- End diff --

Shouldn't we take the `finished` value to complete the 
`groupShutdownFuture`? There might have been an exception been thrown. We 
should forward this then as well.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247474#comment-16247474
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226661
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
+   () -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = 
bootstrap.group();
+   if (group != null) {
+   
group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS)
--- End diff --

why not setting the timeout to `0`?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247454#comment-16247454
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221248
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
--- End diff --

Not sure whether we should hard code the timeout here.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247466#comment-16247466
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225368
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
--- End diff --

JavaDocs need to be adapted


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247455#comment-16247455
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221573
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
+   try {
+   client.shutdown().join();
+   } catch (CompletionException e) {
+   throw e.getCause();
--- End diff --

What if `getCause` is `null`? There is 
`ExceptionUtils.stripFromCompletionException`.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247457#comment-16247457
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223999
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
--- End diff --

why not setting the timeout here to `0`?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247464#comment-16247464
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225411
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
--- End diff --

What does the boolean future value tell us about the `shutdown`?


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7974) AbstractServerBase#shutdown does not wait for shutdown completion

2017-11-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16247453#comment-16247453
 ] 

ASF GitHub Bot commented on FLINK-7974:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221458
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
--- End diff --

We should not throw `Throwable`.


> AbstractServerBase#shutdown does not wait for shutdown completion
> -
>
> Key: FLINK-7974
> URL: https://issues.apache.org/jira/browse/FLINK-7974
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Critical
>
> The {{AbstractServerBase}} does not wait for the completion of its shutdown 
> when calling {{AbstractServerBase#shutdown}}. This is problematic since it 
> leads to resource leaks and instable tests such as the 
> {{AbstractServerTest}}. I propose to let the {{AbstractServerBase#shutdown}} 
> return a termination future which is completed upon shutdown completion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230620
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
--- End diff --

This method when being called twice will give you a wrong result. The 
second call will immediately return you a completed future.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224448
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
+   handler = null;
+   handlerShutdownFuture.complete(null);
+   });
+   } else {
+   handlerShutdownFuture.complete(null);
}
--- End diff --

Why do we execute the handler shutdown in the callback of the 
`queryExecShutdownFuture`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225368
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
--- End diff --

JavaDocs need to be adapted


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228196
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
--- End diff --

result of channel future is ignored.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221573
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
+   try {
+   client.shutdown().join();
+   } catch (CompletionException e) {
+   throw e.getCause();
--- End diff --

What if `getCause` is `null`? There is 
`ExceptionUtils.stripFromCompletionException`.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150229482
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+   pending.complete(response);
}
}
 
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
+   if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+   pending.completeExceptionally(cause);
}
}
 
@Override
public void onFailure(Throwable cause) {
-   if (close(cause)) {
-   // Remove from established channels, otherwise 
future
-   // requests will be handled by this failed 
channel.
-   establishedConnections.remove(serverAddress, 
this);
-   }
+   close(cause).thenAccept(cancelled -> {
+   if (cancelled) {
--- End diff --

Why do we only remove `this` from `establischedConnections` if `cancelled` 
is true?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225343
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 ---
@@ -185,7 +185,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) throws E
/**
 * Shuts down any handler specific resources, e.g. thread pools etc.
 */
--- End diff --

JavaDocs missing


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224857
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
--- End diff --

Shouldn't we take the `finished` value to complete the 
`groupShutdownFuture`? There might have been an exception been thrown. We 
should forward this then as well.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228244
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
if (failureCause.compareAndSet(null, cause)) {
-   channel.close();
-   stats.reportInactiveConnection();
+   final CompletableFuture tmp = new 
CompletableFuture<>();
+   channel.close().addListener(finished -> 
tmp.complete(null));
 
-   for (long requestId : pendingRequests.keySet()) 
{
-   TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
-   stats.reportFailedRequest();
+   tmp.thenRun(() -> {
--- End diff --

here as well.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225411
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
--- End diff --

What does the boolean future value tell us about the `shutdown`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228810
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
+   pending.complete(response);
}
}
 
@Override
public void onRequestFailure(long requestId, Throwable cause) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && 
pending.completeExceptionally(cause)) {
+   if (pending != null && !pending.isDone()) {
stats.reportFailedRequest();
+   pending.completeExceptionally(cause);
--- End diff --

Same here?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223999
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
--- End diff --

why not setting the timeout here to `0`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221248
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
--- End diff --

Not sure whether we should hard code the timeout here.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150225215
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
+   } else {
+   groupShutdownFuture.complete(null);
+   }
+   }
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
+   if (handler != null) {
+   handler.shutdown().thenRun(() -> {
+   handler = null;
--- End diff --

I would actually refrain from state changes from an aysnchronous callback 
thread.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150223049
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
--- End diff --

Better to use `Executors.gracefulShutdown` for this.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150221458
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 ---
@@ -108,9 +110,35 @@ public QueryableStateClient(final InetAddress 
remoteAddress, final int remotePor
new DisabledKvStateRequestStats());
}
 
-   /** Shuts down the client. */
+   /**
+* Shuts down the client and waits for 10 seconds
+* for the shutdown to be completed.
+*
+* If this expires, or an exception is thrown for
+* any reason, then a warning is printed containing the
+* exception.
+*/
public void shutdown() {
-   client.shutdown();
+   try {
+   client.shutdown().get(10L, TimeUnit.SECONDS);
+   LOG.info("The Queryable State Client was shutdown 
successfully.");
+   } catch (Exception e) {
+   LOG.warn("The Queryable State Client shutdown failed: 
", e);
+   }
+   }
+
+   /**
+* Shuts down the client and waits until shutdown is completed.
+*
+* If an exception is thrown for any reason, then this exception
+* is further propagated upwards.
+*/
+   public void shutdownAndWait() throws Throwable {
--- End diff --

We should not throw `Throwable`.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150230125
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -419,20 +440,27 @@ void close() {
 * @param cause The cause to close the channel with.
 * @return Channel close future
 */
-   private boolean close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
--- End diff --

I think this method can give you wrong results when being called twice. The 
second call will give you a completed future even though the first call could 
still be running.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226027
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
--- End diff --

could be initialized with the number of established + pending connections.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224310
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
--- End diff --

JavaDocs should be updated.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150224362
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 ---
@@ -260,25 +262,47 @@ private boolean attemptToBind(final int port) throws 
Throwable {
/**
 * Shuts down the server and all related thread pools.
 */
-   public void shutdown() {
-   LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
-
-   if (handler != null) {
-   handler.shutdown();
-   handler = null;
-   }
-
-   if (queryExecutor != null) {
-   queryExecutor.shutdown();
-   }
+   public CompletableFuture shutdownServer(Time timeout) throws 
InterruptedException {
+   log.info("Shutting down {} @ {}", serverName, serverAddress);
+
+   final CompletableFuture queryExecShutdownFuture = 
CompletableFuture.supplyAsync(() -> {
+   try {
+   if (queryExecutor != null && 
!queryExecutor.isShutdown()) {
+   queryExecutor.shutdown();
+   
queryExecutor.awaitTermination(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+   return true;
+   }
+   } catch (InterruptedException e) {
+   log.warn("Failed to shutdown {}: ", 
serverName, e);
+   return false;
+   }
+   return false;
+   });
+
+   final CompletableFuture groupShutdownFuture = new 
CompletableFuture<>();
+   final CompletableFuture handlerShutdownFuture = new 
CompletableFuture<>();
+
+   queryExecShutdownFuture.thenRun(() -> {
+   if (bootstrap != null) {
+   EventLoopGroup group = bootstrap.group();
+   if (group != null) {
+   group.shutdownGracefully(0L, 
timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+   .addListener(finished 
-> groupShutdownFuture.complete(null));
--- End diff --

why do we complete a `boolean` future with `null`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150228785
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -483,27 +511,31 @@ private boolean close(Throwable cause) {
@Override
public void onRequestResult(long requestId, RESP response) {
TimestampedCompletableFuture pending = 
pendingRequests.remove(requestId);
-   if (pending != null && pending.complete(response)) {
+   if (pending != null && !pending.isDone()) {
long durationMillis = (System.nanoTime() - 
pending.getTimestamp()) / 1_000_000L;
stats.reportSuccessfulRequest(durationMillis);
--- End diff --

Shouldn't we only increase the `stats.reportSuccessfulRequest` if we could 
complete `pending`?


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150226886
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -166,27 +168,39 @@ public String getClientName() {
 *
 * After a call to this method, all returned futures will be failed.
 */
-   public void shutdown() {
+   public CompletableFuture shutdown() {
+   final CompletableFuture shutdownFuture = new 
CompletableFuture<>();
+
if (shutDown.compareAndSet(false, true)) {
+   final List connectionFutures = 
new ArrayList<>();
for (Map.Entry conn : establishedConnections.entrySet()) {
if 
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
for (Map.Entry 
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) != 
null) {
-   conn.getValue().close();
+   
connectionFutures.add(conn.getValue().close());
}
}
 
-   if (bootstrap != null) {
-   EventLoopGroup group = bootstrap.group();
-   if (group != null) {
-   group.shutdownGracefully(0L, 10L, 
TimeUnit.SECONDS);
-   }
-   }
+   CompletableFuture.allOf(connectionFutures.toArray(new 
CompletableFuture[connectionFutures.size()])).thenRun(
--- End diff --

Also here we shouldn't ignore the result of the `allOf` future because it 
might contain an exception.


---


[GitHub] flink pull request #4993: [FLINK-7974][FLINK-7975][QS] Wait for shutdown in ...

2017-11-10 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4993#discussion_r150227018
  
--- Diff: 
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 ---
@@ -309,31 +323,38 @@ private void handInChannel(Channel channel) {
/**
 * Close the connecting channel with a ClosedChannelException.
 */
-   private void close() {
-   close(new ClosedChannelException());
+   private CompletableFuture close() {
+   return close(new ClosedChannelException());
}
 
/**
 * Close the connecting channel with an Exception (can be 
{@code null})
 * or forward to the established channel.
 */
-   private void close(Throwable cause) {
+   private CompletableFuture close(Throwable cause) {
synchronized (connectLock) {
-   if (!closed) {
+   final CompletableFuture shutdownFuture;
+   if (closed) {
+   shutdownFuture = new 
CompletableFuture<>();
--- End diff --

shorter: `CompletableFuture.completedFuture(null)`


---


  1   2   >