[jira] [Updated] (FLINK-8257) Unify the value checks for setParallelism()

2017-12-19 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-8257:
---
Description: The {{setParallelism()}} method exist in many components from 
different levels. Some of the methods require the input value to be greater 
than {{1}} (e.g., {{StreamTransformation.setParallelism()}}), while some of 
them also allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which 
is {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify 
the value checks for these methods.  (was: The {{setParallelism()}} method 
exist in many components from different levels. Some of the methods require the 
input value to be greater than {{1}} (e.g., 
{{StreamTransformation.setParallelism()}}), while some of also allow the value 
to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by default 
(e.g., {{DataSink.setParallelism()}}). We need to unify the value checks for 
these methods.)

> Unify the value checks for setParallelism()
> ---
>
> Key: FLINK-8257
> URL: https://issues.apache.org/jira/browse/FLINK-8257
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> The {{setParallelism()}} method exist in many components from different 
> levels. Some of the methods require the input value to be greater than {{1}} 
> (e.g., {{StreamTransformation.setParallelism()}}), while some of them also 
> allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is 
> {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the 
> value checks for these methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8257) Unify the value checks for setParallelism()

2017-12-19 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui reassigned FLINK-8257:
--

Assignee: Xingcan Cui

> Unify the value checks for setParallelism()
> ---
>
> Key: FLINK-8257
> URL: https://issues.apache.org/jira/browse/FLINK-8257
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>
> The {{setParallelism()}} method exist in many components from different 
> levels. Some of the methods require the input value to be greater than {{1}} 
> (e.g., {{StreamTransformation.setParallelism()}}), while some of also allow 
> the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by 
> default (e.g., {{DataSink.setParallelism()}}). We need to unify the value 
> checks for these methods.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-19 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297920#comment-16297920
 ] 

shuai.xu commented on FLINK-8289:
-

Hi [~eronwright], yes, what we need is the advertised address. But for Yarn 
Proxy / Mesos Admin Router, the advertised address is not the proxy address, 
but the ip:port, and this address is register to the proxy address, so the 
router can redirect the proxy address to the real address of the rest server.

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8295) Netty shading does not work properly

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297820#comment-16297820
 ] 

ASF GitHub Bot commented on FLINK-8295:
---

Github user mcfongtw commented on the issue:

https://github.com/apache/flink/pull/5183
  
+1 for this change, @NicoK . I did not see this error as I was working on 
FLINK-6805 - my bad :(. Also, you might want to remove those comment to avoid 
future confusion. 


> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...

2017-12-19 Thread mcfongtw
Github user mcfongtw commented on the issue:

https://github.com/apache/flink/pull/5183
  
+1 for this change, @NicoK . I did not see this error as I was working on 
FLINK-6805 - my bad :(. Also, you might want to remove those comment to avoid 
future confusion. 


---


[GitHub] flink pull request #5187: Merge pull request #1 from apache/master

2017-12-19 Thread laolang113
GitHub user laolang113 opened a pull request:

https://github.com/apache/flink/pull/5187

Merge pull request #1 from apache/master

update from apache


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/laolang113/flink master

Alternatively you can review and apply these changes as the patch at:


[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...

2017-12-19 Thread djh4230
Github user djh4230 commented on the issue:

https://github.com/apache/flink/pull/4926
  

![image](https://user-images.githubusercontent.com/8032384/34188337-cf86b410-e570-11e7-9781-283685d438bb.png)

The hadoop config files are in the flink lauched classpath


---


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297813#comment-16297813
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

Github user djh4230 commented on the issue:

https://github.com/apache/flink/pull/4926
  

![image](https://user-images.githubusercontent.com/8032384/34188337-cf86b410-e570-11e7-9781-283685d438bb.png)

The hadoop config files are in the flink lauched classpath


> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8288) Register the web interface url to yarn for yarn job mode

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297807#comment-16297807
 ] 

ASF GitHub Bot commented on FLINK-8288:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5186

[FLINK-8288] [runtime] register job master rest endpoint url to yarn


## What is the purpose of the change

This pull request pass the endpoint url of job master rest server to 
resource manager so it can register the url to YARN or Mesos.

## Verifying this change

*(Please pick either of the following options)*

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5186


commit 9a65d995454fe8d49ba6b84094ea7e5ee687836e
Author: shuai.xus 
Date:   2017-12-20T02:10:35Z

[FLINK-8288] [runtime] register job master rest endpoint url to yarn




> Register the web interface url to yarn for yarn job mode
> 
>
> Key: FLINK-8288
> URL: https://issues.apache.org/jira/browse/FLINK-8288
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> For flip-6 job mode, the resource manager is created before the web monitor, 
> so the web interface url is not set to resource manager, and the resource 
> manager can not register the url to yarn.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5186: [FLINK-8288] [runtime] register job master rest en...

2017-12-19 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5186

[FLINK-8288] [runtime] register job master rest endpoint url to yarn


## What is the purpose of the change

This pull request pass the endpoint url of job master rest server to 
resource manager so it can register the url to YARN or Mesos.

## Verifying this change

*(Please pick either of the following options)*

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8288

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5186.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5186


commit 9a65d995454fe8d49ba6b84094ea7e5ee687836e
Author: shuai.xus 
Date:   2017-12-20T02:10:35Z

[FLINK-8288] [runtime] register job master rest endpoint url to yarn




---


[jira] [Commented] (FLINK-5880) Add documentation for object reuse for DataStream API

2017-12-19 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297798#comment-16297798
 ] 

Elias Levy commented on FLINK-5880:
---

Came across here to open this issue after reading the latest [blog 
post|https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime].
  Hard to fault Databricks if the documentation about object reuse is not there.

> Add documentation for object reuse for DataStream API
> -
>
> Key: FLINK-5880
> URL: https://issues.apache.org/jira/browse/FLINK-5880
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aljoscha Krettek
>
> The batch documentation has this section: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297661#comment-16297661
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r157907259
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis");
+   kinesisMetricGroup
--- End diff --

use `addGroup("shard_id", subscribedShard.getShard().getShardId())` instead 
and register the metric on the returned group.


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297662#comment-16297662
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r157907331
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis");
+   kinesisMetricGroup
+   .getAllVariables()
+   .put("", 
subscribedShard.getShard().getShardId());
+
+   kinesisMetricGroup.gauge("millisBehindLatest", (Gauge) () 
-> millisBehindLatest);
--- End diff --

the cast shouldn't be necessary


> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r157907331
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis");
+   kinesisMetricGroup
+   .getAllVariables()
+   .put("", 
subscribedShard.getShard().getShardId());
+
+   kinesisMetricGroup.gauge("millisBehindLatest", (Gauge) () 
-> millisBehindLatest);
--- End diff --

the cast shouldn't be necessary


---


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5182#discussion_r157907259
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher 
fetcherRef,
SequenceNumber 
lastSequenceNum,
KinesisProxyInterface 
kinesis) {
this.fetcherRef = checkNotNull(fetcherRef);
+   MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext()
+   .getMetricGroup()
+   .addGroup("Kinesis");
+   kinesisMetricGroup
--- End diff --

use `addGroup("shard_id", subscribedShard.getShard().getShardId())` instead 
and register the metric on the returned group.


---


[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297476#comment-16297476
 ] 

ASF GitHub Bot commented on FLINK-8297:
---

GitHub user je-ik opened a pull request:

https://github.com/apache/flink/pull/5185

[FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for 
storing lists

## What is the purpose of the change

Enable storing lists not fitting to memory per single key.

## Brief change log

## Verifying this change

This change added tests and can be verified as follows:
  passes additional tests for RocksDBStateBackend.enableLargeListsPerKey()

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no, backward 
compatible
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/datadrivencz/flink 
rocksdb-backend-memory-optimization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5185


commit f1bbaa30901ba8a54b02908fd3eb3615301b4400
Author: Jan Lukavsky 
Date:   2017-12-14T20:42:06Z

[FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for 
storing lists




> RocksDBListState stores whole list in single byte[]
> ---
>
> Key: FLINK-8297
> URL: https://issues.apache.org/jira/browse/FLINK-8297
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Jan Lukavský
>
> RocksDBListState currently keeps whole list of data in single RocksDB 
> key-value pair, which implies that the list actually must fit into memory. 
> Larger lists are not supported and end up with OOME or other error. The 
> RocksDBListState could be modified so that individual items in list are 
> stored in separate keys in RocksDB and can then be iterated over. A simple 
> implementation could reuse existing RocksDBMapState, with key as index to the 
> list and a single RocksDBValueState keeping track of how many items has 
> already been added to the list. Because this implementation might be less 
> efficient in come cases, it would be good to make it opt-in by a construct 
> like
> {{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5185: [FLINK-8297] [flink-rocksdb] optionally use RocksD...

2017-12-19 Thread je-ik
GitHub user je-ik opened a pull request:

https://github.com/apache/flink/pull/5185

[FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for 
storing lists

## What is the purpose of the change

Enable storing lists not fitting to memory per single key.

## Brief change log

## Verifying this change

This change added tests and can be verified as follows:
  passes additional tests for RocksDBStateBackend.enableLargeListsPerKey()

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no, backward 
compatible
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/datadrivencz/flink 
rocksdb-backend-memory-optimization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5185


commit f1bbaa30901ba8a54b02908fd3eb3615301b4400
Author: Jan Lukavsky 
Date:   2017-12-14T20:42:06Z

[FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for 
storing lists




---


[jira] [Updated] (FLINK-8233) Retrieve ExecutionResult by REST polling

2017-12-19 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8233:

Priority: Blocker  (was: Major)

> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8233
> URL: https://issues.apache.org/jira/browse/FLINK-8233
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8234:

Priority: Blocker  (was: Major)

> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297450#comment-16297450
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157878862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int 
numJobManagersToWaitFor) {
}
 
@Override
-   public void jobFinished(JobExecutionResult result) {
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
decrementCheckAndCleanup();
}
 
@Override
-   public void jobFailed(Throwable cause) {
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
--- End diff --

Maybe rename to `JobResult` after all to avoid fqn.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157878862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int 
numJobManagersToWaitFor) {
}
 
@Override
-   public void jobFinished(JobExecutionResult result) {
+   public void 
jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
decrementCheckAndCleanup();
}
 
@Override
-   public void jobFailed(Throwable cause) {
+   public void 
jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) {
--- End diff --

Maybe rename to `JobResult` after all to avoid fqn.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297443#comment-16297443
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877969
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
+
+   private JobExecutionResult(
+   final JobID jobId,
+   final Map 
accumulatorResults,
+   final long netRuntime,
+   @Nullable final SerializedThrowable 
serializedThrowable) {
+
+   checkArgument(netRuntime >= 0, "netRuntime must be greater than 
or equals 0");
+
+   this.jobId = requireNonNull(jobId);
+   this.accumulatorResults = requireNonNull(accumulatorResults);
+   this.netRuntime = netRuntime;
+   this.serializedThrowable = serializedThrowable;
+   }
+
+   public boolean isSuccess() {
--- End diff --

Javadocs are missing.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877969
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but 
with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * This is used by the {@link JobMaster} to send the results to the 
{@link Dispatcher}.
+ */
+public class JobExecutionResult {
+
+   private final JobID jobId;
+
+   private final Map accumulatorResults;
+
+   private final long netRuntime;
+
+   private final SerializedThrowable serializedThrowable;
+
+   private JobExecutionResult(
+   final JobID jobId,
+   final Map 
accumulatorResults,
+   final long netRuntime,
+   @Nullable final SerializedThrowable 
serializedThrowable) {
+
+   checkArgument(netRuntime >= 0, "netRuntime must be greater than 
or equals 0");
+
+   this.jobId = requireNonNull(jobId);
+   this.accumulatorResults = requireNonNull(accumulatorResults);
+   this.netRuntime = netRuntime;
+   this.serializedThrowable = serializedThrowable;
+   }
+
+   public boolean isSuccess() {
--- End diff --

Javadocs are missing.


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297440#comment-16297440
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?

CC: @tillrohrmann 


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157877406
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 ---
@@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedE
}
}
else if (result != null) {
-   return result;
+   try {
+   return new SerializedJobExecutionResult(
+   jobId,
+   result.getNetRuntime(),
+   
result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader());
--- End diff --

Because the exception is serialized in 
`OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it 
here again. I wonder if this is sane?

CC: @tillrohrmann 


---


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297436#comment-16297436
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876793
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
+* failure cause.
+*
+* @param jobId ID of the job that we are interested in.
+* @param timeout Timeout for the asynchronous operation.
+*
+* @see #isJobExecutionResultPresent(JobID, Time)
+*
+* @return {@link CompletableFuture} containing the {@link 
JobExecutionResult} or a
+* {@link Throwable} which represents the failure cause. If there is no 
result, the future will
+* be completed exceptionally with
+* {@link 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException}
+*/
+   default CompletableFuture getJobExecutionResult(
+   JobID jobId,
+   @RpcTimeout Time timeout) {
+   throw new UnsupportedOperationException();
+   }
+
+   /**
+* Tests if the {@link SerializedJobExecutionResult} is present.
--- End diff --

Javadoc needs to be updated.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297435#comment-16297435
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
--- End diff --

Javadoc needs to be updated.


> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297433#comment-16297433
 ] 

ASF GitHub Bot commented on FLINK-8234:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5184

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

## What is the purpose of the change

Cache `JobExecutionResult` in `Dispatcher`, and add methods to 
`RestfulGateway` to enable retrieval of results through HTTP (not yet 
implemented). This will be needed so that accumulator results can be 
transmitted to the client.

## Brief change log

  - *Introduce new JobExecutionResult used by JobMaster to forward the 
information in the already existing JobExecutionResult.*
  - *Always cache a JobExecutionResult. Even in case of job failures. In 
case of job failures, the serialized exception is stored additionally.*
  - *Introduce new methods to RestfulGateway to allow retrieval of cached 
JobExecutionResults.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests to verify that the Dispatcher caches the job results 
when the job finishes successfully or by failure.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5184


commit d05c76e621106810c32bc17aa0576923ba6be401
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults




> Cache JobExecutionResult from finished JobManagerRunners
> 
>
> Key: FLINK-8234
> URL: https://issues.apache.org/jira/browse/FLINK-8234
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to serve the {{JobExecutionResults}} we have to cache them in the 
> {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should 
> have a configurable size and should periodically clean up stale entries in 
> order to avoid memory leaks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876761
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
--- End diff --

Javadoc needs to be updated.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5184#discussion_r157876793
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -92,4 +94,42 @@
 * @return Future containing the collection of instance ids and the 
corresponding metric query service path
 */
CompletableFuture>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+   /**
+* Returns the {@link SerializedJobExecutionResult} for a job, or in 
case the job failed, the
+* failure cause.
+*
+* @param jobId ID of the job that we are interested in.
+* @param timeout Timeout for the asynchronous operation.
+*
+* @see #isJobExecutionResultPresent(JobID, Time)
+*
+* @return {@link CompletableFuture} containing the {@link 
JobExecutionResult} or a
+* {@link Throwable} which represents the failure cause. If there is no 
result, the future will
+* be completed exceptionally with
+* {@link 
org.apache.flink.runtime.messages.JobExecutionResultNotFoundException}
+*/
+   default CompletableFuture getJobExecutionResult(
+   JobID jobId,
+   @RpcTimeout Time timeout) {
+   throw new UnsupportedOperationException();
+   }
+
+   /**
+* Tests if the {@link SerializedJobExecutionResult} is present.
--- End diff --

Javadoc needs to be updated.


---


[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...

2017-12-19 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5184

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

## What is the purpose of the change

Cache `JobExecutionResult` in `Dispatcher`, and add methods to 
`RestfulGateway` to enable retrieval of results through HTTP (not yet 
implemented). This will be needed so that accumulator results can be 
transmitted to the client.

## Brief change log

  - *Introduce new JobExecutionResult used by JobMaster to forward the 
information in the already existing JobExecutionResult.*
  - *Always cache a JobExecutionResult. Even in case of job failures. In 
case of job failures, the serialized exception is stored additionally.*
  - *Introduce new methods to RestfulGateway to allow retrieval of cached 
JobExecutionResults.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests to verify that the Dispatcher caches the job results 
when the job finishes successfully or by failure.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

CC: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8234

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5184


commit d05c76e621106810c32bc17aa0576923ba6be401
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults




---


[jira] [Created] (FLINK-8297) RocksDBListState stores whole list in single byte[]

2017-12-19 Thread JIRA
Jan Lukavský created FLINK-8297:
---

 Summary: RocksDBListState stores whole list in single byte[]
 Key: FLINK-8297
 URL: https://issues.apache.org/jira/browse/FLINK-8297
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 1.3.2, 1.4.0
Reporter: Jan Lukavský


RocksDBListState currently keeps whole list of data in single RocksDB key-value 
pair, which implies that the list actually must fit into memory. Larger lists 
are not supported and end up with OOME or other error. The RocksDBListState 
could be modified so that individual items in list are stored in separate keys 
in RocksDB and can then be iterated over. A simple implementation could reuse 
existing RocksDBMapState, with key as index to the list and a single 
RocksDBValueState keeping track of how many items has already been added to the 
list. Because this implementation might be less efficient in come cases, it 
would be good to make it opt-in by a construct like

{{new RocksDBStateBackend().enableLargeListsPerKey()}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8296) Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection

2017-12-19 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8296:
--

 Summary: Rework FlinkKafkaConsumerBestTest to not use Java 
reflection for dependency injection
 Key: FLINK-8296
 URL: https://issues.apache.org/jira/browse/FLINK-8296
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector, Tests
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.5.0, 1.4.1


The current {{FlinkKafkaConsumerBaseTest}} is heavily relying on Java 
reflection for dependency injection. Using reflection to compose unit tests 
really should be a last resort, and indicates that the tests there are highly 
implementation-specific, and that we should make the design more testable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-19 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297187#comment-16297187
 ] 

Eron Wright  edited comment on FLINK-8289 at 12/19/17 6:21 PM:
---

I'll use the terms 'advertised' versus 'bind' address to discuss this issue.   
Do you agree that the goal here is to return the advertised address?   The 
Flink docs are unclear on which configuration setting is applicable.

Two complications:  
1. *Yarn Proxy / Mesos Admin Router.*   In both environments, web traffic is 
expected to be proxied, so the advertised address should be the proxy address.
2. *SSL*.  To enable SSL on the web endpoints, two things are needed:
  a. Advertise a name-based (not IP-based) address.
  b.  Construct the advertised address with 'https' scheme.

See the proposed SSL spec for more information on point (2).
[FLIP - Service Authorization 
(SSL)|https://docs.google.com/document/d/13IRPb2GdL842rIzMgEn0ibOQHNku6W8aMf1p7gCPJjg/edit?usp=sharing]



was (Author: eronwright):
I'll use the terms 'advertised' versus 'bind' address to discuss this issue.   
Do you agree that the goal here is to return the advertised address?   The 
Flink docs are unclear on which configuration setting is applicable.

Two complications:  
1. *Yarn Proxy / Mesos Admin Router.*   In both environments, web traffic is 
expected to be proxied, so the advertised address should be the proxy address.
2. *SSL*.  To enable SSL on the web endpoints, two things are needed:
  a. Advertise a name-based (not IP-based) address.
  b.  Construct the advertised address with 'https' scheme.


> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-19 Thread Eron Wright (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297187#comment-16297187
 ] 

Eron Wright  commented on FLINK-8289:
-

I'll use the terms 'advertised' versus 'bind' address to discuss this issue.   
Do you agree that the goal here is to return the advertised address?   The 
Flink docs are unclear on which configuration setting is applicable.

Two complications:  
1. *Yarn Proxy / Mesos Admin Router.*   In both environments, web traffic is 
expected to be proxied, so the advertised address should be the proxy address.
2. *SSL*.  To enable SSL on the web endpoints, two things are needed:
  a. Advertise a name-based (not IP-based) address.
  b.  Construct the advertised address with 'https' scheme.


> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8295) Netty shading does not work properly

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297113#comment-16297113
 ] 

ASF GitHub Bot commented on FLINK-8295:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5183

[FLINK-8295][cassandra][build] properly shade netty for the datastax driver

## What is the purpose of the change

`com.datastax.driver.core.NettyUtil` expects netty to be present either at 
its original package (`io.netty`) or relocated to `com.datastax.shaded.netty`. 
By relocating it to this package we make sure the driver follows its designated 
path and is able to connect at all.

## Brief change log

- relocate netty to  `com.datastax.shaded.netty` instead of our own 
namespace

## Verifying this change

This change added tests and can be verified as follows:

- verified the build jar contains netty (only) in 
`com.datastax.shaded.netty` and not under 
`org.apache.flink.cassandra.shaded.io.netty`
- run a job that uses cassandra (should not have worked without adding 
netty before and should work now - haven't tested it yet - @twalthr can you 
jump in here?)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8295

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5183


commit 80c00a28dbf01ab4f31220c2005d78a76316
Author: Nico Kruber 
Date:   2017-12-19T17:14:19Z

[FLINK-8295][cassandra][build] properly shade netty for the datastax driver

com.datastax.driver.core.NettyUtil expects netty to be present either at its
original package or relocated to com.datastax.shaded.netty. By relocating it
to this package we make sure the driver follows its designated path.




> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5183: [FLINK-8295][cassandra][build] properly shade nett...

2017-12-19 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5183

[FLINK-8295][cassandra][build] properly shade netty for the datastax driver

## What is the purpose of the change

`com.datastax.driver.core.NettyUtil` expects netty to be present either at 
its original package (`io.netty`) or relocated to `com.datastax.shaded.netty`. 
By relocating it to this package we make sure the driver follows its designated 
path and is able to connect at all.

## Brief change log

- relocate netty to  `com.datastax.shaded.netty` instead of our own 
namespace

## Verifying this change

This change added tests and can be verified as follows:

- verified the build jar contains netty (only) in 
`com.datastax.shaded.netty` and not under 
`org.apache.flink.cassandra.shaded.io.netty`
- run a job that uses cassandra (should not have worked without adding 
netty before and should work now - haven't tested it yet - @twalthr can you 
jump in here?)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8295

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5183


commit 80c00a28dbf01ab4f31220c2005d78a76316
Author: Nico Kruber 
Date:   2017-12-19T17:14:19Z

[FLINK-8295][cassandra][build] properly shade netty for the datastax driver

com.datastax.driver.core.NettyUtil expects netty to be present either at its
original package or relocated to com.datastax.shaded.netty. By relocating it
to this package we make sure the driver follows its designated path.




---


[jira] [Commented] (FLINK-7594) Add a SQL client

2017-12-19 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297096#comment-16297096
 ] 

Timo Walther commented on FLINK-7594:
-

I published an initial design document under FLIP-24. Feel free to join the 
discussion on the dev@ mailing list.

> Add a SQL client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8295) Netty shading does not work properly

2017-12-19 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-8295:
--

Assignee: Nico Kruber

> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Nico Kruber
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7594) Add a SQL client

2017-12-19 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297045#comment-16297045
 ] 

sunjincheng commented on FLINK-7594:


+1, It's a good enhancement to flink.

> Add a SQL client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296984#comment-16296984
 ] 

ASF GitHub Bot commented on FLINK-7465:
---

Github user sunjincheng121 closed the pull request at:

https://github.com/apache/flink/pull/4652


> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...

2017-12-19 Thread sunjincheng121
Github user sunjincheng121 closed the pull request at:

https://github.com/apache/flink/pull/4652


---


[jira] [Created] (FLINK-8295) Netty shading does not work properly

2017-12-19 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8295:
---

 Summary: Netty shading does not work properly
 Key: FLINK-8295
 URL: https://issues.apache.org/jira/browse/FLINK-8295
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector, Core
Reporter: Timo Walther


Multiple users complained that the Cassandra connector is not usable in Flink 
1.4.0 due to wrong/insufficient shading of Netty.

See:
http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E

http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8295) Netty shading does not work properly

2017-12-19 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8295:

Affects Version/s: 1.4.0

> Netty shading does not work properly
> 
>
> Key: FLINK-8295
> URL: https://issues.apache.org/jira/browse/FLINK-8295
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Core
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>
> Multiple users complained that the Cassandra connector is not usable in Flink 
> 1.4.0 due to wrong/insufficient shading of Netty.
> See:
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E
> http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-12-19 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-7465.
--
Resolution: Workaround

> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI

2017-12-19 Thread sunjincheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296981#comment-16296981
 ] 

sunjincheng commented on FLINK-7465:


I close this issue because I notices that current implementation is high 
demanding for CPU, And we can do the same thing using distinct count. 

> Add build-in BloomFilterCount on TableAPI
> -
>
> Key: FLINK-7465
> URL: https://issues.apache.org/jira/browse/FLINK-7465
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
> Attachments: bloomfilter.png
>
>
> In this JIRA. use BloomFilter to implement counting functions.
> BloomFilter Algorithm description:
> An empty Bloom filter is a bit array of m bits, all set to 0. There must also 
> be k different hash functions defined, each of which maps or hashes some set 
> element to one of the m array positions, generating a uniform random 
> distribution. Typically, k is a constant, much smaller than m, which is 
> proportional to the number of elements to be added; the precise choice of k 
> and the constant of proportionality of m are determined by the intended false 
> positive rate of the filter.
> To add an element, feed it to each of the k hash functions to get k array 
> positions. Set the bits at all these positions to 1.
> To query for an element (test whether it is in the set), feed it to each of 
> the k hash functions to get k array positions. If any of the bits at these 
> positions is 0, the element is definitely not in the set – if it were, then 
> all the bits would have been set to 1 when it was inserted. If all are 1, 
> then either the element is in the set, or the bits have by chance been set to 
> 1 during the insertion of other elements, resulting in a false positive.
> An example of a Bloom filter, representing the set {x, y, z}. The colored 
> arrows show the positions in the bit array that each set element is mapped 
> to. The element w is not in the set {x, y, z}, because it hashes to one 
> bit-array position containing 0. For this figure, m = 18 and k = 3. The 
> sketch as follows:
> !bloomfilter.png!
> Reference:
> 1. https://en.wikipedia.org/wiki/Bloom_filter
> 2. 
> https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java
> Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296973#comment-16296973
 ] 

ASF GitHub Bot commented on FLINK-8162:
---

GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5182

[FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehindLatest metric

## What is the purpose of the change

- Emits [Kinesis' 
millisBehindLatest](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)
 metric, which can be used to detect delays in the pipeline

## Brief change log

  - Publish `millisBehindLatest` as a gauge metric under the `Kinesis` 
group using `` as parameter
  - Updated metrics documentation


## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? `metrics.md` file


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink kinesis-fork

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5182


commit bc8426ec3be364323d65cedcc1c1c5cb4e442c8b
Author: Cristian 
Date:   2017-12-19T15:14:22Z

Emit Kinesis' millisBehindLatest metric




> Kinesis Connector to report millisBehindLatest metric
> -
>
> Key: FLINK-8162
> URL: https://issues.apache.org/jira/browse/FLINK-8162
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Cristian
>Priority: Minor
>  Labels: kinesis
> Fix For: 1.5.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> When reading from Kinesis streams, one of the most valuable metrics is 
> "MillisBehindLatest" (see 
> https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201).
> Flink should use its metrics mechanism to report this value as a gauge, 
> tagging it with the shard id.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...

2017-12-19 Thread casidiablo
GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5182

[FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehindLatest metric

## What is the purpose of the change

- Emits [Kinesis' 
millisBehindLatest](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)
 metric, which can be used to detect delays in the pipeline

## Brief change log

  - Publish `millisBehindLatest` as a gauge metric under the `Kinesis` 
group using `` as parameter
  - Updated metrics documentation


## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? `metrics.md` file


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink kinesis-fork

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5182


commit bc8426ec3be364323d65cedcc1c1c5cb4e442c8b
Author: Cristian 
Date:   2017-12-19T15:14:22Z

Emit Kinesis' millisBehindLatest metric




---


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296834#comment-16296834
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4926
  
@djh4230 could you check whether the Hadoop configuration directory is in 
the classpath of the launched Flink components?


> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...

2017-12-19 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4926
  
@djh4230 could you check whether the Hadoop configuration directory is in 
the classpath of the launched Flink components?


---


[jira] [Commented] (FLINK-8291) For security, Job Manager web UI should be accessed with username/password

2017-12-19 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296829#comment-16296829
 ] 

Till Rohrmann commented on FLINK-8291:
--

[~lynchlee], you are right. We should add at some point user authentication and 
user access rights.

> For security, Job Manager web UI should be accessed with username/password 
> ---
>
> Key: FLINK-8291
> URL: https://issues.apache.org/jira/browse/FLINK-8291
> Project: Flink
>  Issue Type: Improvement
>  Components: Security, Webfrontend
>Affects Versions: 1.3.2
>Reporter: Lynch Lee
>
> Nowaldays,  we submit job from jobm webui without any key for login.
> For security, Job Manager web UI should be accessed with username/password 
> Should we ???



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5158: [hotfix][docs] Update debugging class loading doc to Java...

2017-12-19 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5158
  
Thanks!


---


[jira] [Updated] (FLINK-8284) Custom metrics not being exposed for Prometheus

2017-12-19 Thread Julio Biason (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julio Biason updated FLINK-8284:

Description: 
Following the documentation, we changed our filter that removes events with 
missing fields to a RichFilterFunction, so we can capture metrics about such 
events:

{code:scala}
public class MissingClientFilter extends RichFilterFunction {

private transient Counter counter;

@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.addGroup("events")
.counter("missingClient");
}

@Override
public boolean filter(LineData line) {
String client = line.get("client").toString();
boolean missing = client.trim().equals("");
if (!missing) {
this.count();
}
return !missing;
}

private void count() {
if (this.counter != null) {
this.counter.inc();
}
}
}
{code}


We also added Prometheus as our reporter:

{noformat}
metrics.reporters: prom
metrics.reporter.prom.port: 9105
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter
{noformat}

The problem is accessing port 9105 display all Flink metrics, but not ours.

  was:
Following the documentation, we changed our filter that removes events with 
missing fields to a RichFilterFunction, so we can capture metrics about such 
events:

{{public class MissingClientFilter extends RichFilterFunction {

private transient Counter counter;

@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.addGroup("events")
.counter("missingClient");
}

@Override
public boolean filter(LineData line) {
String client = line.get("client").toString();
boolean missing = client.trim().equals("");
if (!missing) {
this.count();
}
return !missing;
}

private void count() {
if (this.counter != null) {
this.counter.inc();
}
}
}}}

We also added Prometheus as our reporter:

{{metrics.reporters: prom
metrics.reporter.prom.port: 9105
metrics.reporter.prom.class: 
org.apache.flink.metrics.prometheus.PrometheusReporter}}

The problem is accessing port 9105 display all Flink metrics, but not ours.


> Custom metrics not being exposed for Prometheus
> ---
>
> Key: FLINK-8284
> URL: https://issues.apache.org/jira/browse/FLINK-8284
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation, Metrics
>Affects Versions: 1.4.0
> Environment: Linux/CentOS 7
>Reporter: Julio Biason
>
> Following the documentation, we changed our filter that removes events with 
> missing fields to a RichFilterFunction, so we can capture metrics about such 
> events:
> {code:scala}
> public class MissingClientFilter extends RichFilterFunction {
>   private transient Counter counter;
>   @Override
>   public void open(Configuration config) {
>   this.counter = getRuntimeContext()
>   .getMetricGroup()
>   .addGroup("events")
>   .counter("missingClient");
>   }
>   @Override
>   public boolean filter(LineData line) {
>   String client = line.get("client").toString();
>   boolean missing = client.trim().equals("");
>   if (!missing) {
>   this.count();
>   }
>   return !missing;
>   }
>   private void count() {
>   if (this.counter != null) {
>   this.counter.inc();
>   }
>   }
> }
> {code}
> We also added Prometheus as our reporter:
> {noformat}
> metrics.reporters: prom
> metrics.reporter.prom.port: 9105
> metrics.reporter.prom.class: 
> org.apache.flink.metrics.prometheus.PrometheusReporter
> {noformat}
> The problem is accessing port 9105 display all Flink metrics, but not ours.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8294) Missing examples/links in Data Sink docs

2017-12-19 Thread Julio Biason (JIRA)
Julio Biason created FLINK-8294:
---

 Summary: Missing examples/links in Data Sink docs
 Key: FLINK-8294
 URL: https://issues.apache.org/jira/browse/FLINK-8294
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Julio Biason


In the [Data 
Sink|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#data-sinks]
 documentation, there is no example on how to use said functions -- even if 
they are only intent for debugging (which is exactly what I want to do right 
now).

While {{print}} is quite simple, what I need is to get the resulting 
processing, so I'd probably need some of the {{write}} functions (since 
FLINK-8285 mentions that iterators are out).

I'd either suggest adding the examples or link the listed functions to their 
documentation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7594) Add a SQL client

2017-12-19 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-7594:

Summary: Add a SQL client  (was: Add a SQL CLI client)

> Add a SQL client
> 
>
> Key: FLINK-7594
> URL: https://issues.apache.org/jira/browse/FLINK-7594
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment a user can only specify queries within a Java/Scala program 
> which is nice for integrating table programs or parts of it with DataSet or 
> DataStream API. With more connectors coming up, it is time to also provide a 
> programming-free SQL client. The SQL client should consist of a CLI interface 
> and maybe also a REST API. The concrete design is still up for discussion.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296619#comment-16296619
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5181
  
I'm not yet certain this fix is the root cause of the stalling tests 
mentioned in https://issues.apache.org/jira/browse/FLINK-8283. It's something I 
stumbled across while investigating the failing tests.

Will run several local Travis runs over a period of time to see if the 
stalling tests still occur.


> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', 

[GitHub] flink issue #5181: [FLINK-8283] [kafka] Fix mock verification on final metho...

2017-12-19 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5181
  
I'm not yet certain this fix is the root cause of the stalling tests 
mentioned in https://issues.apache.org/jira/browse/FLINK-8283. It's something I 
stumbled across while investigating the failing tests.

Will run several local Travis runs over a period of time to see if the 
stalling tests still occur.


---



[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296611#comment-16296611
 ] 

ASF GitHub Bot commented on FLINK-8283:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5181

[FLINK-8283] [kafka] Fix mock verification on final method in 
FlinkKafkaConsumerBaseTest

## What is the purpose of the change

Prior to this PR,

`FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled()`
was incorrectly mock verifying the 
`AbstractFetcher::commitInternalOffsetsToKafka()` method, which is final and 
cannot be mocked. This commit PR fixes this by making the method non-final.
This seems to cause instabilities spanning several tests in the 
`FlinkKafkaConsumerBaseTest`.

Note that ideally, that method should be final to prevent accidental 
overrides, but we actually have a lot of methods in the `AbstractFetcher` that 
should actually be best as final, but are not and mocked in the unit tests 
(e.g., `AbstractFetcher::snapshotState`, `AbstractFetcher::emitRecord`, etc).

## Brief change log

- Make `AbstractFetcher::commitInternalOffsetsToKafka` non-final, so that 
it can be properly mocked in unit tests.

## Verifying this change

This change is already covered by existing tests in 
`FlinkKafkaConsumerBaseTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8283

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5181


commit c8265cea34811c901bba2e9cd56e4870bf17622f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T10:00:37Z

[FLINK-8283] [kafka] Fix mock verification on final method in 
FlinkKafkaConsumerBaseTest

Prior to this commit,

FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled()
was incorrectly veryfing a final method on the AbstractFetcher class
(specifically, the commitInternalOffsetsToKafka method).
This commit fixes this by making the method non-final.

Note that ideally, that method should be final to prevent accidental 
overrides,
but we actually have a lot of methods in the AbstractFetcher that should
actually be best as final, but are not and mocked in the unit tests.




> FlinkKafkaConsumerBase failing on Travis with no output in 10min
> 
>
> Key: FLINK-8283
> URL: https://issues.apache.org/jira/browse/FLINK-8283
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Critical
>  Labels: test-stability
>
> Since a few days, Travis builds with the {{connectors}} profile keep failing 
> more often with no new output being received within 10 minutes. It seems to 
> start with the Travis build for 
> https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9
>  but may have been introduced earlier. The printed offsets look strange 
> though.
> {code}
> 16:33:12,508 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Setting 
> restore state in the FlinkKafkaConsumer: 
> {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, 
> KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773}
> 16:33:12,520 INFO  
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
> Consumer subtask 2 will start reading 66 partitions with offsets in restored 
> state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, 
> KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, 
> 

[GitHub] flink pull request #5181: [FLINK-8283] [kafka] Fix mock verification on fina...

2017-12-19 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/5181

[FLINK-8283] [kafka] Fix mock verification on final method in 
FlinkKafkaConsumerBaseTest

## What is the purpose of the change

Prior to this PR,

`FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled()`
was incorrectly mock verifying the 
`AbstractFetcher::commitInternalOffsetsToKafka()` method, which is final and 
cannot be mocked. This commit PR fixes this by making the method non-final.
This seems to cause instabilities spanning several tests in the 
`FlinkKafkaConsumerBaseTest`.

Note that ideally, that method should be final to prevent accidental 
overrides, but we actually have a lot of methods in the `AbstractFetcher` that 
should actually be best as final, but are not and mocked in the unit tests 
(e.g., `AbstractFetcher::snapshotState`, `AbstractFetcher::emitRecord`, etc).

## Brief change log

- Make `AbstractFetcher::commitInternalOffsetsToKafka` non-final, so that 
it can be properly mocked in unit tests.

## Verifying this change

This change is already covered by existing tests in 
`FlinkKafkaConsumerBaseTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? n/a


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8283

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5181.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5181


commit c8265cea34811c901bba2e9cd56e4870bf17622f
Author: Tzu-Li (Gordon) Tai 
Date:   2017-12-19T10:00:37Z

[FLINK-8283] [kafka] Fix mock verification on final method in 
FlinkKafkaConsumerBaseTest

Prior to this commit,

FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled()
was incorrectly veryfing a final method on the AbstractFetcher class
(specifically, the commitInternalOffsetsToKafka method).
This commit fixes this by making the method non-final.

Note that ideally, that method should be final to prevent accidental 
overrides,
but we actually have a lot of methods in the AbstractFetcher that should
actually be best as final, but are not and mocked in the unit tests.




---


[jira] [Resolved] (FLINK-8215) Support implicit type widening for array/map constructors in SQL

2017-12-19 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8215.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5: 2142eeda9df262e989951c4b31273cbd9346567f

> Support implicit type widening for array/map constructors in SQL
> 
>
> Key: FLINK-8215
> URL: https://issues.apache.org/jira/browse/FLINK-8215
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
> Fix For: 1.5.0
>
>
> TableAPI goes through `LogicalNode.validate()`, which brings up the 
> collection validation and rejects inconsistent type, this will throw 
> `ValidationExcpetion` for something like `array(1.0, 2.0f)`.
> SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode 
> validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS 
> DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception.
> Root cause is the CodeGeneration for these collection value constructors does 
> not cast or resolve leastRestrictive type correctly. I see 2 options:
> 1. Strengthen validation to not allow resolving leastRestrictive type on SQL.
> 2. Making codegen support leastRestrictive type cast, such as using 
> `generateCast` instead of direct casting like `(ClassType) element`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8215) Support implicit type widening for array/map constructors in SQL

2017-12-19 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8215:

Summary: Support implicit type widening for array/map constructors in SQL  
(was: upport implicit type widening for array/map constructors in SQL)

> Support implicit type widening for array/map constructors in SQL
> 
>
> Key: FLINK-8215
> URL: https://issues.apache.org/jira/browse/FLINK-8215
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> TableAPI goes through `LogicalNode.validate()`, which brings up the 
> collection validation and rejects inconsistent type, this will throw 
> `ValidationExcpetion` for something like `array(1.0, 2.0f)`.
> SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode 
> validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS 
> DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception.
> Root cause is the CodeGeneration for these collection value constructors does 
> not cast or resolve leastRestrictive type correctly. I see 2 options:
> 1. Strengthen validation to not allow resolving leastRestrictive type on SQL.
> 2. Making codegen support leastRestrictive type cast, such as using 
> `generateCast` instead of direct casting like `(ClassType) element`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8215) Support implicit type widening for array/map constructors in SQL

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296601#comment-16296601
 ] 

ASF GitHub Bot commented on FLINK-8215:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5148


> Support implicit type widening for array/map constructors in SQL
> 
>
> Key: FLINK-8215
> URL: https://issues.apache.org/jira/browse/FLINK-8215
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> TableAPI goes through `LogicalNode.validate()`, which brings up the 
> collection validation and rejects inconsistent type, this will throw 
> `ValidationExcpetion` for something like `array(1.0, 2.0f)`.
> SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode 
> validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS 
> DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception.
> Root cause is the CodeGeneration for these collection value constructors does 
> not cast or resolve leastRestrictive type correctly. I see 2 options:
> 1. Strengthen validation to not allow resolving leastRestrictive type on SQL.
> 2. Making codegen support leastRestrictive type cast, such as using 
> `generateCast` instead of direct casting like `(ClassType) element`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5148: [FLINK-8215][Table] fix codegen issue on array/map...

2017-12-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5148


---


[jira] [Updated] (FLINK-8215) upport implicit type widening for array/map constructors in SQL

2017-12-19 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8215:

Summary: upport implicit type widening for array/map constructors in SQL  
(was: Collections codegen exception when constructing Array or Map via SQL API)

> upport implicit type widening for array/map constructors in SQL
> ---
>
> Key: FLINK-8215
> URL: https://issues.apache.org/jira/browse/FLINK-8215
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> TableAPI goes through `LogicalNode.validate()`, which brings up the 
> collection validation and rejects inconsistent type, this will throw 
> `ValidationExcpetion` for something like `array(1.0, 2.0f)`.
> SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode 
> validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS 
> DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception.
> Root cause is the CodeGeneration for these collection value constructors does 
> not cast or resolve leastRestrictive type correctly. I see 2 options:
> 1. Strengthen validation to not allow resolving leastRestrictive type on SQL.
> 2. Making codegen support leastRestrictive type cast, such as using 
> `generateCast` instead of direct casting like `(ClassType) element`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296581#comment-16296581
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157709904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Yes, I totally agree with your point of current status of spillable/spilled 
subpartitions and subpartition views.

And I also think that the `PipelinedSubpartition` is the most important 
path and the `SpillableSubpartition` should not be very sensitive. I think we 
already reach a consensus for the way of `SpillableSubpartition` and I will do 
for that later. :)


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157709904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Yes, I totally agree with your point of current status of spillable/spilled 
subpartitions and subpartition views.

And I also think that the `PipelinedSubpartition` is the most important 
path and the `SpillableSubpartition` should not be very sensitive. I think we 
already reach a consensus for the way of `SpillableSubpartition` and I will do 
for that later. :)


---


[jira] [Commented] (FLINK-8215) Collections codegen exception when constructing Array or Map via SQL API

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296576#comment-16296576
 ] 

ASF GitHub Bot commented on FLINK-8215:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5148
  
Thank for this fix @walterddr. The code looks good. I will merge this...


> Collections codegen exception when constructing Array or Map via SQL API
> 
>
> Key: FLINK-8215
> URL: https://issues.apache.org/jira/browse/FLINK-8215
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> TableAPI goes through `LogicalNode.validate()`, which brings up the 
> collection validation and rejects inconsistent type, this will throw 
> `ValidationExcpetion` for something like `array(1.0, 2.0f)`.
> SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode 
> validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS 
> DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception.
> Root cause is the CodeGeneration for these collection value constructors does 
> not cast or resolve leastRestrictive type correctly. I see 2 options:
> 1. Strengthen validation to not allow resolving leastRestrictive type on SQL.
> 2. Making codegen support leastRestrictive type cast, such as using 
> `generateCast` instead of direct casting like `(ClassType) element`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5148: [FLINK-8215][Table] fix codegen issue on array/map value ...

2017-12-19 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5148
  
Thank for this fix @walterddr. The code looks good. I will merge this...


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296562#comment-16296562
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157707628
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

yes, that would be nice


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157707628
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

yes, that would be nice


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296558#comment-16296558
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

Sorry my expression is not correct above. I mean we do not need 
`decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the 
`parent` as `SpillableSubpartition` in  `SpilledSubpartitionView`.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296559#comment-16296559
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Your absolutely right about not counting events . Therefore, we cannot use 
the queue's size  as I suggested.

Yes, `BufferAndAvailability` would need to be extended as well.

This integration/split of the spillable/spilled subpartitions and 
subpartition views and both of them working on the same structures requiring 
the same synchronisation pattern is imho really not nice and highly fragile. 
@pnowojski and me are currently re-designing the synchronisation in these parts 
of the code and are a bit sensitive to it now so let's drag him into this 
discussion as well: I would consider `PipelinedSubpartition` the hot path where 
we need to optimise most - spillable subpartitions are used in batch mode and 
have higher tolerances, especially when spilling to disk. if you returned the 
new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` 
however (retrieved under the `synchronized (buffers)` section), then you would 
not need the `volatile` either since you are already under the lock.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706995
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

Your absolutely right about not counting events . Therefore, we cannot use 
the queue's size  as I suggested.

Yes, `BufferAndAvailability` would need to be extended as well.

This integration/split of the spillable/spilled subpartitions and 
subpartition views and both of them working on the same structures requiring 
the same synchronisation pattern is imho really not nice and highly fragile. 
@pnowojski and me are currently re-designing the synchronisation in these parts 
of the code and are a bit sensitive to it now so let's drag him into this 
discussion as well: I would consider `PipelinedSubpartition` the hot path where 
we need to optimise most - spillable subpartitions are used in batch mode and 
have higher tolerances, especially when spilling to disk. if you returned the 
new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` 
however (retrieved under the `synchronized (buffers)` section), then you would 
not need the `volatile` either since you are already under the lock.


---


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157706951
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

Sorry my expression is not correct above. I mean we do not need 
`decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the 
`parent` as `SpillableSubpartition` in  `SpilledSubpartitionView`.


---


[GitHub] flink pull request #5158: [hotfix][docs] Update debugging class loading doc ...

2017-12-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5158


---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-19 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296546#comment-16296546
 ] 

Timo Walther commented on FLINK-8240:
-

Thanks for your response [~wheat9]. We need to decide how the syntax for 
{{CREATE EXTERNAL TABLE}} will look like. It could look more like Hive or more 
like the unified interace of this issue. But in any case such a statement would 
compile down to the unified interface. We won't support every combination of 
connector/encoding but with this abstraction we don't need to expose things 
like a {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}, etc. anymore. From 
an API level we have a clear separation that might (or might not) also separate 
components internally in the future.

Btw not all tables need to be built with a factory. For now, we will keep the 
builders inside every table source (like {{CsvTableSource.builder()}}). This is 
also needed because you cannot express everything as a string property. The 
factories will use the builders to create the table sources. 

> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296538#comment-16296538
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157703075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

`package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer 
buffer);`, already works without changing anything since 
`SpilledSubpartitionView` is in the same package


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157703075
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

`package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer 
buffer);`, already works without changing anything since 
`SpilledSubpartitionView` is in the same package


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296475#comment-16296475
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157694294
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

sure


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157694294
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 ---
@@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
try {
subpartition.finish();
 
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
+   assertEquals(4, subpartition.getTotalNumberOfBytes());
+
assertFalse(subpartition.add(mock(Buffer.class)));
+   assertEquals(1, subpartition.getTotalNumberOfBuffers());
+   assertEquals(0, subpartition.getBuffersInBacklog());
--- End diff --

sure


---


[jira] [Assigned] (FLINK-4352) Refactor CLI to use RESTful client for cluster communication

2017-12-19 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-4352:


Assignee: Till Rohrmann

> Refactor CLI to use RESTful client for cluster communication
> 
>
> Key: FLINK-4352
> URL: https://issues.apache.org/jira/browse/FLINK-4352
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Cluster Management
>Reporter: jingzhang
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> We have to refactor the Flink's CLI in order to be able to use the RESTful 
> client to communicate with the new Flip-6 Flink cluster.
> This could be done by implementing a new {{ClusterClient}} or by refactoring 
> the {{ClusterClient}} to not use the static methods of the {{JobClient}} but 
> instead an interface implementation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157693477
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

sure


---


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296468#comment-16296468
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157693477
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws 
Exception {
partition.add(buffer);
partition.add(buffer);
 
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
+   assertFalse(buffer.isRecycled());
assertEquals(3, partition.releaseMemory());
 
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
+   // still same statistics
+   assertEquals(3, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
+
partition.finish();
 
+   // + one EndOfPartitionEvent
+   assertEquals(4, partition.getTotalNumberOfBuffers());
+   assertEquals(3, partition.getBuffersInBacklog());
+   assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
--- End diff --

sure


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8293) Rework Flink's type and serialization docs

2017-12-19 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8293:
---

 Summary: Rework Flink's type and serialization docs
 Key: FLINK-8293
 URL: https://issues.apache.org/jira/browse/FLINK-8293
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Type Serialization System
Reporter: Timo Walther
Assignee: Timo Walther


The documentation about Flink's type and serialization system hasn't been 
updated for a while and there are a lot of users (especially beginners) that 
have problems with defining types for inputs, functions, state etc. We should 
rework the documentation a little bit to solve things like:

Type related things:
{code}
"Document all data types.
What TypeInfo is available? What are the limitation? Encourage TypeHints? 
returns()? Link to new Types class. How to declare a valid case class in Scala.
Look into log if type is no POJO type (e.g. when using org.json4s)
ResultTypeQueryable documentation
Case classes and Tuples do not support null!
Subtypes of POJOs are handled during runtime or via cache and registerType()
Explain all methods in ExecutionConfig.
Compatibility guarantees.
Pojos must have a void setter. Why are we so strict?
Update docs in api_concepts about types (Avro is not used for POJOs)!"
{code}

Serialization related things:
{code}
"Serialization overview. Big picture (what is serialized, how, why, where, 
when?).
When/why should I register a type or a subtype -- what does that do? 
Link to ""Streaming/Working with State/Custom Serialization for Managed 
State""."
{code}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296452#comment-16296452
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157691096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

The way of  `ArrayDeque#size()` for `getBuffersInBacklog()` may be not 
feasible because we do not know how many events in the `ArrayDeque` and they 
should not be considered as backlog length.

For the new API, we may need to modify the 
`ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping 
structure instead of `Buffer`, and do we also need to extend the 
`BufferAndAvailability` to add backlog in it?  By this way, it can get benefits 
for `PipelinedSubpartition` to reduce 'volatile`, but for 
`SpillableSubpartition`, the `volatile` may still be needed? Because the 
`getNextBuffer` and `decreaseBacklog` are in different parts for 
`SpillableSubpartitionView/SpilledSubpartitionView`.



> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157691096
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ---
@@ -52,6 +54,10 @@
/** Flag indicating whether the subpartition has been released. */
private volatile boolean isReleased;
 
+   /** The number of non-event buffers currently in this subpartition */
+   @GuardedBy("buffers")
+   private volatile int buffersInBacklog;
--- End diff --

The way of  `ArrayDeque#size()` for `getBuffersInBacklog()` may be not 
feasible because we do not know how many events in the `ArrayDeque` and they 
should not be considered as backlog length.

For the new API, we may need to modify the 
`ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping 
structure instead of `Buffer`, and do we also need to extend the 
`BufferAndAvailability` to add backlog in it?  By this way, it can get benefits 
for `PipelinedSubpartition` to reduce 'volatile`, but for 
`SpillableSubpartition`, the `volatile` may still be needed? Because the 
`getNextBuffer` and `decreaseBacklog` are in different parts for 
`SpillableSubpartitionView/SpilledSubpartitionView`.



---


[jira] [Updated] (FLINK-7736) Fix some of the alerts raised by lgtm.com

2017-12-19 Thread Malcolm Taylor (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Malcolm Taylor updated FLINK-7736:
--
Issue Type: Bug  (was: Improvement)

> Fix some of the alerts raised by lgtm.com
> -
>
> Key: FLINK-7736
> URL: https://issues.apache.org/jira/browse/FLINK-7736
> Project: Flink
>  Issue Type: Bug
>Reporter: Malcolm Taylor
>Assignee: Malcolm Taylor
>
> lgtm.com has identified a number of issues giving scope for improvement in 
> the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list]
> This issue is to address some of the simpler ones. Some of these are quite 
> clear bugs such as off-by-one errors. Others are areas where the code might 
> be made clearer, such as use of a variable name which shadows another 
> variable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-19 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7452.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5: e30066dbd1ebf3c5780df89d766554042c8345a7

> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
> Fix For: 1.5.0
>
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296446#comment-16296446
 ] 

ASF GitHub Bot commented on FLINK-7951:
---

Github user djh4230 commented on the issue:

https://github.com/apache/flink/pull/4926
  

![image](https://user-images.githubusercontent.com/8032384/34147925-5e0fa10c-e4da-11e7-992d-b422f59a7d16.png)

Not work! still can't read hdfs-site.xml and can't read hdfs namespace


![image](https://user-images.githubusercontent.com/8032384/34148009-a355c98a-e4da-11e7-93e3-ede389db9118.png)




> YarnApplicationMaster does not load HDFSConfiguration
> -
>
> Key: FLINK-7951
> URL: https://issues.apache.org/jira/browse/FLINK-7951
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.4.0
>
>
> When instantiating the {{YarnConfiguration}} we do not load the corresponding 
> {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...

2017-12-19 Thread djh4230
Github user djh4230 commented on the issue:

https://github.com/apache/flink/pull/4926
  

![image](https://user-images.githubusercontent.com/8032384/34147925-5e0fa10c-e4da-11e7-992d-b422f59a7d16.png)

Not work! still can't read hdfs-site.xml and can't read hdfs namespace


![image](https://user-images.githubusercontent.com/8032384/34148009-a355c98a-e4da-11e7-93e3-ede389db9118.png)




---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296444#comment-16296444
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4612


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-12-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4612


---


[jira] [Commented] (FLINK-8292) Remove unnecessary force cast in DataStreamSource

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296442#comment-16296442
 ] 

ASF GitHub Bot commented on FLINK-8292:
---

GitHub user Matrix42 opened a pull request:

https://github.com/apache/flink/pull/5180

[FLINK-8292] Remove unnecessary force cast in DataStreamSource

## What is the purpose of the change

Remove unnecessary force cast in DataStreamSource

## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change is already covered by existing tests, such as 
DataStreamTest.testParallelism()

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Matrix42/flink DataStreamSource

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5180


commit d5c55d2a73fdbcc0dfadd05ad87203ca70d64eac
Author: Matrix42 <934336...@qq.com>
Date:   2017-12-19T08:20:39Z

[FLINK-8292] Remove unnecessary force cast in DataStreamSource




> Remove unnecessary force cast in DataStreamSource
> -
>
> Key: FLINK-8292
> URL: https://issues.apache.org/jira/browse/FLINK-8292
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Matrix42
>Priority: Trivial
> Fix For: 1.5.0, 1.4.1
>
>
> In DataStreamSource there is a cast can be replaced by retuen `this`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5180: [FLINK-8292] Remove unnecessary force cast in Data...

2017-12-19 Thread Matrix42
GitHub user Matrix42 opened a pull request:

https://github.com/apache/flink/pull/5180

[FLINK-8292] Remove unnecessary force cast in DataStreamSource

## What is the purpose of the change

Remove unnecessary force cast in DataStreamSource

## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

This change is already covered by existing tests, such as 
DataStreamTest.testParallelism()

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: ( no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): ( no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no )

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Matrix42/flink DataStreamSource

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5180.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5180


commit d5c55d2a73fdbcc0dfadd05ad87203ca70d64eac
Author: Matrix42 <934336...@qq.com>
Date:   2017-12-19T08:20:39Z

[FLINK-8292] Remove unnecessary force cast in DataStreamSource




---


[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources

2017-12-19 Thread Haohui Mai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296435#comment-16296435
 ] 

Haohui Mai commented on FLINK-8240:
---

It seems that it is a great use case of layered table sources / converters, 
thus I'm not fully sure that all tables should be built using {{TableFactory}} 
yet.

Popping up one level, I have a relevant question -- assuming that we need to 
implement the {{CREATE EXTERNAL TABLE}} statement. How will the statement look 
like? Here is an example of Hive's {{CREATE EXTERNAL TABLE}} statement:

{code}
CREATE EXTERNAL TABLE weatherext ( wban INT, date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
LOCATION ‘ /hive/data/weatherext’;
{code}

It seems that combinations of {{ROW FORMAT}} and {{LOCATION}} are the 
effectively same as what you proposed -- but it does not seem to force all 
table sources to be aware of the compositions of connector / converter (i.e., 
{{TableFactory}}, at least at the API level.

Thoughts?

> Create unified interfaces to configure and instatiate TableSources
> --
>
> Key: FLINK-8240
> URL: https://issues.apache.org/jira/browse/FLINK-8240
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> At the moment every table source has different ways for configuration and 
> instantiation. Some table source are tailored to a specific encoding (e.g., 
> {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one 
> encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement 
> a builder or support table source converters for external catalogs.
> The table sources should have a unified interface for discovery, defining 
> common properties, and instantiation. The {{TableSourceConverters}} provide a 
> similar functionality but use an external catalog. We might generialize this 
> interface.
> In general a table source declaration depends on the following parts:
> {code}
> - Source
>   - Type (e.g. Kafka, Custom)
>   - Properties (e.g. topic, connection info)
> - Encoding
>   - Type (e.g. Avro, JSON, CSV)
>   - Schema (e.g. Avro class, JSON field names/types)
> - Rowtime descriptor/Proctime
>   - Watermark strategy and Watermark properties
>   - Time attribute info
> - Bucketization
> {code}
> This issue needs a design document before implementation. Any discussion is 
> very welcome.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8291) For security, Job Manager web UI should be accessed with username/password

2017-12-19 Thread Lynch Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296434#comment-16296434
 ] 

Lynch Lee commented on FLINK-8291:
--

[~till.rohrmann]  Do you aggree with me ??

> For security, Job Manager web UI should be accessed with username/password 
> ---
>
> Key: FLINK-8291
> URL: https://issues.apache.org/jira/browse/FLINK-8291
> Project: Flink
>  Issue Type: Improvement
>  Components: Security, Webfrontend
>Affects Versions: 1.3.2
>Reporter: Lynch Lee
>
> Nowaldays,  we submit job from jobm webui without any key for login.
> For security, Job Manager web UI should be accessed with username/password 
> Should we ???



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8292) Remove unnecessary force cast in DataStreamSource

2017-12-19 Thread Matrix42 (JIRA)
Matrix42 created FLINK-8292:
---

 Summary: Remove unnecessary force cast in DataStreamSource
 Key: FLINK-8292
 URL: https://issues.apache.org/jira/browse/FLINK-8292
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Reporter: Matrix42
Priority: Trivial


In DataStreamSource there is a cast can be replaced by retuen `this`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8292) Remove unnecessary force cast in DataStreamSource

2017-12-19 Thread Matrix42 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matrix42 updated FLINK-8292:

Fix Version/s: 1.4.1
   1.5.0

> Remove unnecessary force cast in DataStreamSource
> -
>
> Key: FLINK-8292
> URL: https://issues.apache.org/jira/browse/FLINK-8292
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Matrix42
>Priority: Trivial
> Fix For: 1.5.0, 1.4.1
>
>
> In DataStreamSource there is a cast can be replaced by retuen `this`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based

2017-12-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296431#comment-16296431
 ] 

ASF GitHub Bot commented on FLINK-7468:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157686388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` 
not `SpillableSubpartition`, after replacing the `ResultSubpartition` by 
`SpillableSubpartition`, we can make these methods package-private as you 
suggest. I will do that.


> Implement sender backlog logic for credit-based
> ---
>
> Key: FLINK-7468
> URL: https://issues.apache.org/jira/browse/FLINK-7468
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.5.0
>
>
> This is a part of work for credit-based network flow control.
> Receivers should know how many buffers are available on the sender side (the 
> backlog). The receivers use this information to decide how to distribute 
> floating buffers.
> The {{ResultSubpartition}} maintains the backlog which only indicates the 
> number of buffers in this subpartition, not including the number of events. 
> The backlog is increased for adding buffer to this subpartition, and 
> decreased for polling buffer from it.
> The backlog is attached in {{BufferResponse}} by sender as an absolute value 
> after the buffer being transferred.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

2017-12-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4559#discussion_r157686388
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java
 ---
@@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
 
abstract public boolean isReleased();
 
+   /**
+* Gets the number of non-event buffers in this subpartition.
+*/
+   abstract public int getBuffersInBacklog();
+
+   /**
+* Decreases the number of non-event buffers by one after fetching a 
non-event
+* buffer from this subpartition.
+*/
+   abstract public void decreaseBuffersInBacklog(Buffer buffer);
+
+   /**
+* Increases the number of non-event buffers by one after adding a 
non-event
+* buffer into this subpartition.
+*/
+   abstract public void increaseBuffersInBacklog(Buffer buffer);
--- End diff --

The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` 
not `SpillableSubpartition`, after replacing the `ResultSubpartition` by 
`SpillableSubpartition`, we can make these methods package-private as you 
suggest. I will do that.


---


  1   2   >