[jira] [Updated] (FLINK-8257) Unify the value checks for setParallelism()
[ https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-8257: --- Description: The {{setParallelism()}} method exist in many components from different levels. Some of the methods require the input value to be greater than {{1}} (e.g., {{StreamTransformation.setParallelism()}}), while some of them also allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the value checks for these methods. (was: The {{setParallelism()}} method exist in many components from different levels. Some of the methods require the input value to be greater than {{1}} (e.g., {{StreamTransformation.setParallelism()}}), while some of also allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the value checks for these methods.) > Unify the value checks for setParallelism() > --- > > Key: FLINK-8257 > URL: https://issues.apache.org/jira/browse/FLINK-8257 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > The {{setParallelism()}} method exist in many components from different > levels. Some of the methods require the input value to be greater than {{1}} > (e.g., {{StreamTransformation.setParallelism()}}), while some of them also > allow the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is > {{-1}} by default (e.g., {{DataSink.setParallelism()}}). We need to unify the > value checks for these methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8257) Unify the value checks for setParallelism()
[ https://issues.apache.org/jira/browse/FLINK-8257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui reassigned FLINK-8257: -- Assignee: Xingcan Cui > Unify the value checks for setParallelism() > --- > > Key: FLINK-8257 > URL: https://issues.apache.org/jira/browse/FLINK-8257 > Project: Flink > Issue Type: Improvement > Components: Configuration >Reporter: Xingcan Cui >Assignee: Xingcan Cui > > The {{setParallelism()}} method exist in many components from different > levels. Some of the methods require the input value to be greater than {{1}} > (e.g., {{StreamTransformation.setParallelism()}}), while some of also allow > the value to be {{ExecutionConfig.PARALLELISM_DEFAULT}}, which is {{-1}} by > default (e.g., {{DataSink.setParallelism()}}). We need to unify the value > checks for these methods. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297920#comment-16297920 ] shuai.xu commented on FLINK-8289: - Hi [~eronwright], yes, what we need is the advertised address. But for Yarn Proxy / Mesos Admin Router, the advertised address is not the proxy address, but the ip:port, and this address is register to the proxy address, so the router can redirect the proxy address to the real address of the rest server. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297820#comment-16297820 ] ASF GitHub Bot commented on FLINK-8295: --- Github user mcfongtw commented on the issue: https://github.com/apache/flink/pull/5183 +1 for this change, @NicoK . I did not see this error as I was working on FLINK-6805 - my bad :(. Also, you might want to remove those comment to avoid future confusion. > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5183: [FLINK-8295][cassandra][build] properly shade netty for t...
Github user mcfongtw commented on the issue: https://github.com/apache/flink/pull/5183 +1 for this change, @NicoK . I did not see this error as I was working on FLINK-6805 - my bad :(. Also, you might want to remove those comment to avoid future confusion. ---
[GitHub] flink pull request #5187: Merge pull request #1 from apache/master
GitHub user laolang113 opened a pull request: https://github.com/apache/flink/pull/5187 Merge pull request #1 from apache/master update from apache *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/laolang113/flink master Alternatively you can review and apply these changes as the patch at:
[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...
Github user djh4230 commented on the issue: https://github.com/apache/flink/pull/4926 ![image](https://user-images.githubusercontent.com/8032384/34188337-cf86b410-e570-11e7-9781-283685d438bb.png) The hadoop config files are in the flink lauched classpath ---
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297813#comment-16297813 ] ASF GitHub Bot commented on FLINK-7951: --- Github user djh4230 commented on the issue: https://github.com/apache/flink/pull/4926 ![image](https://user-images.githubusercontent.com/8032384/34188337-cf86b410-e570-11e7-9781-283685d438bb.png) The hadoop config files are in the flink lauched classpath > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8288) Register the web interface url to yarn for yarn job mode
[ https://issues.apache.org/jira/browse/FLINK-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297807#comment-16297807 ] ASF GitHub Bot commented on FLINK-8288: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5186 [FLINK-8288] [runtime] register job master rest endpoint url to yarn ## What is the purpose of the change This pull request pass the endpoint url of job master rest server to resource manager so it can register the url to YARN or Mesos. ## Verifying this change *(Please pick either of the following options)* This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5186.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5186 commit 9a65d995454fe8d49ba6b84094ea7e5ee687836e Author: shuai.xusDate: 2017-12-20T02:10:35Z [FLINK-8288] [runtime] register job master rest endpoint url to yarn > Register the web interface url to yarn for yarn job mode > > > Key: FLINK-8288 > URL: https://issues.apache.org/jira/browse/FLINK-8288 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > For flip-6 job mode, the resource manager is created before the web monitor, > so the web interface url is not set to resource manager, and the resource > manager can not register the url to yarn. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5186: [FLINK-8288] [runtime] register job master rest en...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5186 [FLINK-8288] [runtime] register job master rest endpoint url to yarn ## What is the purpose of the change This pull request pass the endpoint url of job master rest server to resource manager so it can register the url to YARN or Mesos. ## Verifying this change *(Please pick either of the following options)* This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8288 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5186.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5186 commit 9a65d995454fe8d49ba6b84094ea7e5ee687836e Author: shuai.xusDate: 2017-12-20T02:10:35Z [FLINK-8288] [runtime] register job master rest endpoint url to yarn ---
[jira] [Commented] (FLINK-5880) Add documentation for object reuse for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297798#comment-16297798 ] Elias Levy commented on FLINK-5880: --- Came across here to open this issue after reading the latest [blog post|https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime]. Hard to fault Databricks if the documentation about object reuse is not there. > Add documentation for object reuse for DataStream API > - > > Key: FLINK-5880 > URL: https://issues.apache.org/jira/browse/FLINK-5880 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aljoscha Krettek > > The batch documentation has this section: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297661#comment-16297661 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r157907259 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis"); + kinesisMetricGroup --- End diff -- use `addGroup("shard_id", subscribedShard.getShard().getShardId())` instead and register the metric on the returned group. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297662#comment-16297662 ] ASF GitHub Bot commented on FLINK-8162: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r157907331 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis"); + kinesisMetricGroup + .getAllVariables() + .put("", subscribedShard.getShard().getShardId()); + + kinesisMetricGroup.gauge("millisBehindLatest", (Gauge) () -> millisBehindLatest); --- End diff -- the cast shouldn't be necessary > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r157907331 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis"); + kinesisMetricGroup + .getAllVariables() + .put("", subscribedShard.getShard().getShardId()); + + kinesisMetricGroup.gauge("millisBehindLatest", (Gauge) () -> millisBehindLatest); --- End diff -- the cast shouldn't be necessary ---
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5182#discussion_r157907259 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -96,6 +100,14 @@ protected ShardConsumer(KinesisDataFetcher fetcherRef, SequenceNumber lastSequenceNum, KinesisProxyInterface kinesis) { this.fetcherRef = checkNotNull(fetcherRef); + MetricGroup kinesisMetricGroup = fetcherRef.getRuntimeContext() + .getMetricGroup() + .addGroup("Kinesis"); + kinesisMetricGroup --- End diff -- use `addGroup("shard_id", subscribedShard.getShard().getShardId())` instead and register the metric on the returned group. ---
[jira] [Commented] (FLINK-8297) RocksDBListState stores whole list in single byte[]
[ https://issues.apache.org/jira/browse/FLINK-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297476#comment-16297476 ] ASF GitHub Bot commented on FLINK-8297: --- GitHub user je-ik opened a pull request: https://github.com/apache/flink/pull/5185 [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for storing lists ## What is the purpose of the change Enable storing lists not fitting to memory per single key. ## Brief change log ## Verifying this change This change added tests and can be verified as follows: passes additional tests for RocksDBStateBackend.enableLargeListsPerKey() ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no, backward compatible - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/datadrivencz/flink rocksdb-backend-memory-optimization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5185 commit f1bbaa30901ba8a54b02908fd3eb3615301b4400 Author: Jan LukavskyDate: 2017-12-14T20:42:06Z [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for storing lists > RocksDBListState stores whole list in single byte[] > --- > > Key: FLINK-8297 > URL: https://issues.apache.org/jira/browse/FLINK-8297 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.4.0, 1.3.2 >Reporter: Jan Lukavský > > RocksDBListState currently keeps whole list of data in single RocksDB > key-value pair, which implies that the list actually must fit into memory. > Larger lists are not supported and end up with OOME or other error. The > RocksDBListState could be modified so that individual items in list are > stored in separate keys in RocksDB and can then be iterated over. A simple > implementation could reuse existing RocksDBMapState, with key as index to the > list and a single RocksDBValueState keeping track of how many items has > already been added to the list. Because this implementation might be less > efficient in come cases, it would be good to make it opt-in by a construct > like > {{new RocksDBStateBackend().enableLargeListsPerKey()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5185: [FLINK-8297] [flink-rocksdb] optionally use RocksD...
GitHub user je-ik opened a pull request: https://github.com/apache/flink/pull/5185 [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for storing lists ## What is the purpose of the change Enable storing lists not fitting to memory per single key. ## Brief change log ## Verifying this change This change added tests and can be verified as follows: passes additional tests for RocksDBStateBackend.enableLargeListsPerKey() ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no, backward compatible - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/datadrivencz/flink rocksdb-backend-memory-optimization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5185 commit f1bbaa30901ba8a54b02908fd3eb3615301b4400 Author: Jan LukavskyDate: 2017-12-14T20:42:06Z [FLINK-8297] [flink-rocksdb] optionally use RocksDBMapState internally for storing lists ---
[jira] [Updated] (FLINK-8233) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8233: Priority: Blocker (was: Major) > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8233 > URL: https://issues.apache.org/jira/browse/FLINK-8233 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8234: Priority: Blocker (was: Major) > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297450#comment-16297450 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157878862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) { } @Override - public void jobFinished(JobExecutionResult result) { + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { decrementCheckAndCleanup(); } @Override - public void jobFailed(Throwable cause) { + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { --- End diff -- Maybe rename to `JobResult` after all to avoid fqn. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157878862 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -358,12 +360,12 @@ private DetachedFinalizer(JobID jobID, int numJobManagersToWaitFor) { } @Override - public void jobFinished(JobExecutionResult result) { + public void jobFinished(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { decrementCheckAndCleanup(); } @Override - public void jobFailed(Throwable cause) { + public void jobFailed(org.apache.flink.runtime.jobmaster.JobExecutionResult result) { --- End diff -- Maybe rename to `JobResult` after all to avoid fqn. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297443#comment-16297443 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877969 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final MapaccumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; + + private JobExecutionResult( + final JobID jobId, + final Map accumulatorResults, + final long netRuntime, + @Nullable final SerializedThrowable serializedThrowable) { + + checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0"); + + this.jobId = requireNonNull(jobId); + this.accumulatorResults = requireNonNull(accumulatorResults); + this.netRuntime = netRuntime; + this.serializedThrowable = serializedThrowable; + } + + public boolean isSuccess() { --- End diff -- Javadocs are missing. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877969 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobExecutionResult.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.SerializedValue; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional + * {@link SerializedThrowable} when the job failed. + * + * This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}. + */ +public class JobExecutionResult { + + private final JobID jobId; + + private final MapaccumulatorResults; + + private final long netRuntime; + + private final SerializedThrowable serializedThrowable; + + private JobExecutionResult( + final JobID jobId, + final Map accumulatorResults, + final long netRuntime, + @Nullable final SerializedThrowable serializedThrowable) { + + checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0"); + + this.jobId = requireNonNull(jobId); + this.accumulatorResults = requireNonNull(accumulatorResults); + this.netRuntime = netRuntime; + this.serializedThrowable = serializedThrowable; + } + + public boolean isSuccess() { --- End diff -- Javadocs are missing. ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297440#comment-16297440 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? CC: @tillrohrmann > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157877406 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java --- @@ -458,7 +465,14 @@ public JobExecutionResult getResult() throws JobExecutionException, InterruptedE } } else if (result != null) { - return result; + try { + return new SerializedJobExecutionResult( + jobId, + result.getNetRuntime(), + result.getAccumulatorResults()).toJobExecutionResult(ClassLoader.getSystemClassLoader()); --- End diff -- Because the exception is serialized in `OnCompletionActions#jobFailed(JobExecutionResult);`, I have to deserialize it here again. I wonder if this is sane? CC: @tillrohrmann ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297436#comment-16297436 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876793 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the +* failure cause. +* +* @param jobId ID of the job that we are interested in. +* @param timeout Timeout for the asynchronous operation. +* +* @see #isJobExecutionResultPresent(JobID, Time) +* +* @return {@link CompletableFuture} containing the {@link JobExecutionResult} or a +* {@link Throwable} which represents the failure cause. If there is no result, the future will +* be completed exceptionally with +* {@link org.apache.flink.runtime.messages.JobExecutionResultNotFoundException} +*/ + default CompletableFuture getJobExecutionResult( + JobID jobId, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } + + /** +* Tests if the {@link SerializedJobExecutionResult} is present. --- End diff -- Javadoc needs to be updated. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297435#comment-16297435 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the --- End diff -- Javadoc needs to be updated. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297433#comment-16297433 ] ASF GitHub Bot commented on FLINK-8234: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5184 [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher ## What is the purpose of the change Cache `JobExecutionResult` in `Dispatcher`, and add methods to `RestfulGateway` to enable retrieval of results through HTTP (not yet implemented). This will be needed so that accumulator results can be transmitted to the client. ## Brief change log - *Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult.* - *Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally.* - *Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests to verify that the Dispatcher caches the job results when the job finishes successfully or by failure.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5184 commit d05c76e621106810c32bc17aa0576923ba6be401 Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876761 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the --- End diff -- Javadoc needs to be updated. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5184#discussion_r157876793 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -92,4 +94,42 @@ * @return Future containing the collection of instance ids and the corresponding metric query service path */ CompletableFuture>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout); + + /** +* Returns the {@link SerializedJobExecutionResult} for a job, or in case the job failed, the +* failure cause. +* +* @param jobId ID of the job that we are interested in. +* @param timeout Timeout for the asynchronous operation. +* +* @see #isJobExecutionResultPresent(JobID, Time) +* +* @return {@link CompletableFuture} containing the {@link JobExecutionResult} or a +* {@link Throwable} which represents the failure cause. If there is no result, the future will +* be completed exceptionally with +* {@link org.apache.flink.runtime.messages.JobExecutionResultNotFoundException} +*/ + default CompletableFuture getJobExecutionResult( + JobID jobId, + @RpcTimeout Time timeout) { + throw new UnsupportedOperationException(); + } + + /** +* Tests if the {@link SerializedJobExecutionResult} is present. --- End diff -- Javadoc needs to be updated. ---
[GitHub] flink pull request #5184: [FLINK-8234][flip6] Cache JobExecutionResult in Di...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5184 [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher ## What is the purpose of the change Cache `JobExecutionResult` in `Dispatcher`, and add methods to `RestfulGateway` to enable retrieval of results through HTTP (not yet implemented). This will be needed so that accumulator results can be transmitted to the client. ## Brief change log - *Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult.* - *Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally.* - *Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests to verify that the Dispatcher caches the job results when the job finishes successfully or by failure.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) CC: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8234 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5184 commit d05c76e621106810c32bc17aa0576923ba6be401 Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults ---
[jira] [Created] (FLINK-8297) RocksDBListState stores whole list in single byte[]
Jan Lukavský created FLINK-8297: --- Summary: RocksDBListState stores whole list in single byte[] Key: FLINK-8297 URL: https://issues.apache.org/jira/browse/FLINK-8297 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 1.3.2, 1.4.0 Reporter: Jan Lukavský RocksDBListState currently keeps whole list of data in single RocksDB key-value pair, which implies that the list actually must fit into memory. Larger lists are not supported and end up with OOME or other error. The RocksDBListState could be modified so that individual items in list are stored in separate keys in RocksDB and can then be iterated over. A simple implementation could reuse existing RocksDBMapState, with key as index to the list and a single RocksDBValueState keeping track of how many items has already been added to the list. Because this implementation might be less efficient in come cases, it would be good to make it opt-in by a construct like {{new RocksDBStateBackend().enableLargeListsPerKey()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8296) Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection
Tzu-Li (Gordon) Tai created FLINK-8296: -- Summary: Rework FlinkKafkaConsumerBestTest to not use Java reflection for dependency injection Key: FLINK-8296 URL: https://issues.apache.org/jira/browse/FLINK-8296 Project: Flink Issue Type: Improvement Components: Kafka Connector, Tests Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.5.0, 1.4.1 The current {{FlinkKafkaConsumerBaseTest}} is heavily relying on Java reflection for dependency injection. Using reflection to compose unit tests really should be a last resort, and indicates that the tests there are highly implementation-specific, and that we should make the design more testable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297187#comment-16297187 ] Eron Wright edited comment on FLINK-8289 at 12/19/17 6:21 PM: --- I'll use the terms 'advertised' versus 'bind' address to discuss this issue. Do you agree that the goal here is to return the advertised address? The Flink docs are unclear on which configuration setting is applicable. Two complications: 1. *Yarn Proxy / Mesos Admin Router.* In both environments, web traffic is expected to be proxied, so the advertised address should be the proxy address. 2. *SSL*. To enable SSL on the web endpoints, two things are needed: a. Advertise a name-based (not IP-based) address. b. Construct the advertised address with 'https' scheme. See the proposed SSL spec for more information on point (2). [FLIP - Service Authorization (SSL)|https://docs.google.com/document/d/13IRPb2GdL842rIzMgEn0ibOQHNku6W8aMf1p7gCPJjg/edit?usp=sharing] was (Author: eronwright): I'll use the terms 'advertised' versus 'bind' address to discuss this issue. Do you agree that the goal here is to return the advertised address? The Flink docs are unclear on which configuration setting is applicable. Two complications: 1. *Yarn Proxy / Mesos Admin Router.* In both environments, web traffic is expected to be proxied, so the advertised address should be the proxy address. 2. *SSL*. To enable SSL on the web endpoints, two things are needed: a. Advertise a name-based (not IP-based) address. b. Construct the advertised address with 'https' scheme. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297187#comment-16297187 ] Eron Wright commented on FLINK-8289: - I'll use the terms 'advertised' versus 'bind' address to discuss this issue. Do you agree that the goal here is to return the advertised address? The Flink docs are unclear on which configuration setting is applicable. Two complications: 1. *Yarn Proxy / Mesos Admin Router.* In both environments, web traffic is expected to be proxied, so the advertised address should be the proxy address. 2. *SSL*. To enable SSL on the web endpoints, two things are needed: a. Advertise a name-based (not IP-based) address. b. Construct the advertised address with 'https' scheme. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297113#comment-16297113 ] ASF GitHub Bot commented on FLINK-8295: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5183 [FLINK-8295][cassandra][build] properly shade netty for the datastax driver ## What is the purpose of the change `com.datastax.driver.core.NettyUtil` expects netty to be present either at its original package (`io.netty`) or relocated to `com.datastax.shaded.netty`. By relocating it to this package we make sure the driver follows its designated path and is able to connect at all. ## Brief change log - relocate netty to `com.datastax.shaded.netty` instead of our own namespace ## Verifying this change This change added tests and can be verified as follows: - verified the build jar contains netty (only) in `com.datastax.shaded.netty` and not under `org.apache.flink.cassandra.shaded.io.netty` - run a job that uses cassandra (should not have worked without adding netty before and should work now - haven't tested it yet - @twalthr can you jump in here?) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8295 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5183 commit 80c00a28dbf01ab4f31220c2005d78a76316 Author: Nico KruberDate: 2017-12-19T17:14:19Z [FLINK-8295][cassandra][build] properly shade netty for the datastax driver com.datastax.driver.core.NettyUtil expects netty to be present either at its original package or relocated to com.datastax.shaded.netty. By relocating it to this package we make sure the driver follows its designated path. > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5183: [FLINK-8295][cassandra][build] properly shade nett...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5183 [FLINK-8295][cassandra][build] properly shade netty for the datastax driver ## What is the purpose of the change `com.datastax.driver.core.NettyUtil` expects netty to be present either at its original package (`io.netty`) or relocated to `com.datastax.shaded.netty`. By relocating it to this package we make sure the driver follows its designated path and is able to connect at all. ## Brief change log - relocate netty to `com.datastax.shaded.netty` instead of our own namespace ## Verifying this change This change added tests and can be verified as follows: - verified the build jar contains netty (only) in `com.datastax.shaded.netty` and not under `org.apache.flink.cassandra.shaded.io.netty` - run a job that uses cassandra (should not have worked without adding netty before and should work now - haven't tested it yet - @twalthr can you jump in here?) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8295 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5183.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5183 commit 80c00a28dbf01ab4f31220c2005d78a76316 Author: Nico KruberDate: 2017-12-19T17:14:19Z [FLINK-8295][cassandra][build] properly shade netty for the datastax driver com.datastax.driver.core.NettyUtil expects netty to be present either at its original package or relocated to com.datastax.shaded.netty. By relocating it to this package we make sure the driver follows its designated path. ---
[jira] [Commented] (FLINK-7594) Add a SQL client
[ https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297096#comment-16297096 ] Timo Walther commented on FLINK-7594: - I published an initial design document under FLIP-24. Feel free to join the discussion on the dev@ mailing list. > Add a SQL client > > > Key: FLINK-7594 > URL: https://issues.apache.org/jira/browse/FLINK-7594 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment a user can only specify queries within a Java/Scala program > which is nice for integrating table programs or parts of it with DataSet or > DataStream API. With more connectors coming up, it is time to also provide a > programming-free SQL client. The SQL client should consist of a CLI interface > and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber reassigned FLINK-8295: -- Assignee: Nico Kruber > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther >Assignee: Nico Kruber > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7594) Add a SQL client
[ https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297045#comment-16297045 ] sunjincheng commented on FLINK-7594: +1, It's a good enhancement to flink. > Add a SQL client > > > Key: FLINK-7594 > URL: https://issues.apache.org/jira/browse/FLINK-7594 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment a user can only specify queries within a Java/Scala program > which is nice for integrating table programs or parts of it with DataSet or > DataStream API. With more connectors coming up, it is time to also provide a > programming-free SQL client. The SQL client should consist of a CLI interface > and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296984#comment-16296984 ] ASF GitHub Bot commented on FLINK-7465: --- Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/4652 > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4652: [FLINK-7465][table]Add cardinality count for table...
Github user sunjincheng121 closed the pull request at: https://github.com/apache/flink/pull/4652 ---
[jira] [Created] (FLINK-8295) Netty shading does not work properly
Timo Walther created FLINK-8295: --- Summary: Netty shading does not work properly Key: FLINK-8295 URL: https://issues.apache.org/jira/browse/FLINK-8295 Project: Flink Issue Type: Bug Components: Cassandra Connector, Core Reporter: Timo Walther Multiple users complained that the Cassandra connector is not usable in Flink 1.4.0 due to wrong/insufficient shading of Netty. See: http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8295) Netty shading does not work properly
[ https://issues.apache.org/jira/browse/FLINK-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8295: Affects Version/s: 1.4.0 > Netty shading does not work properly > > > Key: FLINK-8295 > URL: https://issues.apache.org/jira/browse/FLINK-8295 > Project: Flink > Issue Type: Bug > Components: Cassandra Connector, Core >Affects Versions: 1.4.0 >Reporter: Timo Walther > > Multiple users complained that the Cassandra connector is not usable in Flink > 1.4.0 due to wrong/insufficient shading of Netty. > See: > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3Cb1f584b918c8aaf98b744c168407b0f5%40dbruhn.de%3E > http://mail-archives.apache.org/mod_mbox/flink-user/201712.mbox/%3CCACk7FTgMPR03bPBoKzmeVKCqS%2BumTR1u1X%2BKdPtHRgbnUZiO3A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-7465. -- Resolution: Workaround > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7465) Add build-in BloomFilterCount on TableAPI
[ https://issues.apache.org/jira/browse/FLINK-7465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296981#comment-16296981 ] sunjincheng commented on FLINK-7465: I close this issue because I notices that current implementation is high demanding for CPU, And we can do the same thing using distinct count. > Add build-in BloomFilterCount on TableAPI > - > > Key: FLINK-7465 > URL: https://issues.apache.org/jira/browse/FLINK-7465 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > Attachments: bloomfilter.png > > > In this JIRA. use BloomFilter to implement counting functions. > BloomFilter Algorithm description: > An empty Bloom filter is a bit array of m bits, all set to 0. There must also > be k different hash functions defined, each of which maps or hashes some set > element to one of the m array positions, generating a uniform random > distribution. Typically, k is a constant, much smaller than m, which is > proportional to the number of elements to be added; the precise choice of k > and the constant of proportionality of m are determined by the intended false > positive rate of the filter. > To add an element, feed it to each of the k hash functions to get k array > positions. Set the bits at all these positions to 1. > To query for an element (test whether it is in the set), feed it to each of > the k hash functions to get k array positions. If any of the bits at these > positions is 0, the element is definitely not in the set – if it were, then > all the bits would have been set to 1 when it was inserted. If all are 1, > then either the element is in the set, or the bits have by chance been set to > 1 during the insertion of other elements, resulting in a false positive. > An example of a Bloom filter, representing the set {x, y, z}. The colored > arrows show the positions in the bit array that each set element is mapped > to. The element w is not in the set {x, y, z}, because it hashes to one > bit-array position containing 0. For this figure, m = 18 and k = 3. The > sketch as follows: > !bloomfilter.png! > Reference: > 1. https://en.wikipedia.org/wiki/Bloom_filter > 2. > https://github.com/apache/hive/blob/master/storage-api/src/java/org/apache/hive/common/util/BloomFilter.java > Hi [~fhueske] [~twalthr] I appreciated if you can give me some advice. :-) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296973#comment-16296973 ] ASF GitHub Bot commented on FLINK-8162: --- GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5182 [FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehindLatest metric ## What is the purpose of the change - Emits [Kinesis' millisBehindLatest](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) metric, which can be used to detect delays in the pipeline ## Brief change log - Publish `millisBehindLatest` as a gauge metric under the `Kinesis` group using `` as parameter - Updated metrics documentation ## Verifying this change This change is already covered by existing tests, such as `ShardConsumerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? `metrics.md` file You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink kinesis-fork Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5182 commit bc8426ec3be364323d65cedcc1c1c5cb4e442c8b Author: CristianDate: 2017-12-19T15:14:22Z Emit Kinesis' millisBehindLatest metric > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5182: [FLINK-8162] [kinesis-connector] Emit Kinesis' mil...
GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5182 [FLINK-8162] [kinesis-connector] Emit Kinesis' millisBehindLatest metric ## What is the purpose of the change - Emits [Kinesis' millisBehindLatest](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html) metric, which can be used to detect delays in the pipeline ## Brief change log - Publish `millisBehindLatest` as a gauge metric under the `Kinesis` group using `` as parameter - Updated metrics documentation ## Verifying this change This change is already covered by existing tests, such as `ShardConsumerTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? `metrics.md` file You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink kinesis-fork Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5182.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5182 commit bc8426ec3be364323d65cedcc1c1c5cb4e442c8b Author: CristianDate: 2017-12-19T15:14:22Z Emit Kinesis' millisBehindLatest metric ---
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296834#comment-16296834 ] ASF GitHub Bot commented on FLINK-7951: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4926 @djh4230 could you check whether the Hadoop configuration directory is in the classpath of the launched Flink components? > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4926 @djh4230 could you check whether the Hadoop configuration directory is in the classpath of the launched Flink components? ---
[jira] [Commented] (FLINK-8291) For security, Job Manager web UI should be accessed with username/password
[ https://issues.apache.org/jira/browse/FLINK-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296829#comment-16296829 ] Till Rohrmann commented on FLINK-8291: -- [~lynchlee], you are right. We should add at some point user authentication and user access rights. > For security, Job Manager web UI should be accessed with username/password > --- > > Key: FLINK-8291 > URL: https://issues.apache.org/jira/browse/FLINK-8291 > Project: Flink > Issue Type: Improvement > Components: Security, Webfrontend >Affects Versions: 1.3.2 >Reporter: Lynch Lee > > Nowaldays, we submit job from jobm webui without any key for login. > For security, Job Manager web UI should be accessed with username/password > Should we ??? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5158: [hotfix][docs] Update debugging class loading doc to Java...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5158 Thanks! ---
[jira] [Updated] (FLINK-8284) Custom metrics not being exposed for Prometheus
[ https://issues.apache.org/jira/browse/FLINK-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julio Biason updated FLINK-8284: Description: Following the documentation, we changed our filter that removes events with missing fields to a RichFilterFunction, so we can capture metrics about such events: {code:scala} public class MissingClientFilter extends RichFilterFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .addGroup("events") .counter("missingClient"); } @Override public boolean filter(LineData line) { String client = line.get("client").toString(); boolean missing = client.trim().equals(""); if (!missing) { this.count(); } return !missing; } private void count() { if (this.counter != null) { this.counter.inc(); } } } {code} We also added Prometheus as our reporter: {noformat} metrics.reporters: prom metrics.reporter.prom.port: 9105 metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter {noformat} The problem is accessing port 9105 display all Flink metrics, but not ours. was: Following the documentation, we changed our filter that removes events with missing fields to a RichFilterFunction, so we can capture metrics about such events: {{public class MissingClientFilter extends RichFilterFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .addGroup("events") .counter("missingClient"); } @Override public boolean filter(LineData line) { String client = line.get("client").toString(); boolean missing = client.trim().equals(""); if (!missing) { this.count(); } return !missing; } private void count() { if (this.counter != null) { this.counter.inc(); } } }}} We also added Prometheus as our reporter: {{metrics.reporters: prom metrics.reporter.prom.port: 9105 metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter}} The problem is accessing port 9105 display all Flink metrics, but not ours. > Custom metrics not being exposed for Prometheus > --- > > Key: FLINK-8284 > URL: https://issues.apache.org/jira/browse/FLINK-8284 > Project: Flink > Issue Type: Bug > Components: Documentation, Metrics >Affects Versions: 1.4.0 > Environment: Linux/CentOS 7 >Reporter: Julio Biason > > Following the documentation, we changed our filter that removes events with > missing fields to a RichFilterFunction, so we can capture metrics about such > events: > {code:scala} > public class MissingClientFilter extends RichFilterFunction { > private transient Counter counter; > @Override > public void open(Configuration config) { > this.counter = getRuntimeContext() > .getMetricGroup() > .addGroup("events") > .counter("missingClient"); > } > @Override > public boolean filter(LineData line) { > String client = line.get("client").toString(); > boolean missing = client.trim().equals(""); > if (!missing) { > this.count(); > } > return !missing; > } > private void count() { > if (this.counter != null) { > this.counter.inc(); > } > } > } > {code} > We also added Prometheus as our reporter: > {noformat} > metrics.reporters: prom > metrics.reporter.prom.port: 9105 > metrics.reporter.prom.class: > org.apache.flink.metrics.prometheus.PrometheusReporter > {noformat} > The problem is accessing port 9105 display all Flink metrics, but not ours. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8294) Missing examples/links in Data Sink docs
Julio Biason created FLINK-8294: --- Summary: Missing examples/links in Data Sink docs Key: FLINK-8294 URL: https://issues.apache.org/jira/browse/FLINK-8294 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.4.0 Reporter: Julio Biason In the [Data Sink|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#data-sinks] documentation, there is no example on how to use said functions -- even if they are only intent for debugging (which is exactly what I want to do right now). While {{print}} is quite simple, what I need is to get the resulting processing, so I'd probably need some of the {{write}} functions (since FLINK-8285 mentions that iterators are out). I'd either suggest adding the examples or link the listed functions to their documentation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7594) Add a SQL client
[ https://issues.apache.org/jira/browse/FLINK-7594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-7594: Summary: Add a SQL client (was: Add a SQL CLI client) > Add a SQL client > > > Key: FLINK-7594 > URL: https://issues.apache.org/jira/browse/FLINK-7594 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment a user can only specify queries within a Java/Scala program > which is nice for integrating table programs or parts of it with DataSet or > DataStream API. With more connectors coming up, it is time to also provide a > programming-free SQL client. The SQL client should consist of a CLI interface > and maybe also a REST API. The concrete design is still up for discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296619#comment-16296619 ] ASF GitHub Bot commented on FLINK-8283: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5181 I'm not yet certain this fix is the root cause of the stalling tests mentioned in https://issues.apache.org/jira/browse/FLINK-8283. It's something I stumbled across while investigating the failing tests. Will run several local Travis runs over a period of time to see if the stalling tests still occur. > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic',
[GitHub] flink issue #5181: [FLINK-8283] [kafka] Fix mock verification on final metho...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5181 I'm not yet certain this fix is the root cause of the stalling tests mentioned in https://issues.apache.org/jira/browse/FLINK-8283. It's something I stumbled across while investigating the failing tests. Will run several local Travis runs over a period of time to see if the stalling tests still occur. ---
[jira] [Commented] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296611#comment-16296611 ] ASF GitHub Bot commented on FLINK-8283: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5181 [FLINK-8283] [kafka] Fix mock verification on final method in FlinkKafkaConsumerBaseTest ## What is the purpose of the change Prior to this PR, `FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled()` was incorrectly mock verifying the `AbstractFetcher::commitInternalOffsetsToKafka()` method, which is final and cannot be mocked. This commit PR fixes this by making the method non-final. This seems to cause instabilities spanning several tests in the `FlinkKafkaConsumerBaseTest`. Note that ideally, that method should be final to prevent accidental overrides, but we actually have a lot of methods in the `AbstractFetcher` that should actually be best as final, but are not and mocked in the unit tests (e.g., `AbstractFetcher::snapshotState`, `AbstractFetcher::emitRecord`, etc). ## Brief change log - Make `AbstractFetcher::commitInternalOffsetsToKafka` non-final, so that it can be properly mocked in unit tests. ## Verifying this change This change is already covered by existing tests in `FlinkKafkaConsumerBaseTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8283 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5181.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5181 commit c8265cea34811c901bba2e9cd56e4870bf17622f Author: Tzu-Li (Gordon) TaiDate: 2017-12-19T10:00:37Z [FLINK-8283] [kafka] Fix mock verification on final method in FlinkKafkaConsumerBaseTest Prior to this commit, FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled() was incorrectly veryfing a final method on the AbstractFetcher class (specifically, the commitInternalOffsetsToKafka method). This commit fixes this by making the method non-final. Note that ideally, that method should be final to prevent accidental overrides, but we actually have a lot of methods in the AbstractFetcher that should actually be best as final, but are not and mocked in the unit tests. > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, >
[GitHub] flink pull request #5181: [FLINK-8283] [kafka] Fix mock verification on fina...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5181 [FLINK-8283] [kafka] Fix mock verification on final method in FlinkKafkaConsumerBaseTest ## What is the purpose of the change Prior to this PR, `FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled()` was incorrectly mock verifying the `AbstractFetcher::commitInternalOffsetsToKafka()` method, which is final and cannot be mocked. This commit PR fixes this by making the method non-final. This seems to cause instabilities spanning several tests in the `FlinkKafkaConsumerBaseTest`. Note that ideally, that method should be final to prevent accidental overrides, but we actually have a lot of methods in the `AbstractFetcher` that should actually be best as final, but are not and mocked in the unit tests (e.g., `AbstractFetcher::snapshotState`, `AbstractFetcher::emitRecord`, etc). ## Brief change log - Make `AbstractFetcher::commitInternalOffsetsToKafka` non-final, so that it can be properly mocked in unit tests. ## Verifying this change This change is already covered by existing tests in `FlinkKafkaConsumerBaseTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8283 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5181.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5181 commit c8265cea34811c901bba2e9cd56e4870bf17622f Author: Tzu-Li (Gordon) TaiDate: 2017-12-19T10:00:37Z [FLINK-8283] [kafka] Fix mock verification on final method in FlinkKafkaConsumerBaseTest Prior to this commit, FlinkKafkaConsumerBaseTest::testSnapshotStateWithCommitOnCheckpointsEnabled() was incorrectly veryfing a final method on the AbstractFetcher class (specifically, the commitInternalOffsetsToKafka method). This commit fixes this by making the method non-final. Note that ideally, that method should be final to prevent accidental overrides, but we actually have a lot of methods in the AbstractFetcher that should actually be best as final, but are not and mocked in the unit tests. ---
[jira] [Resolved] (FLINK-8215) Support implicit type widening for array/map constructors in SQL
[ https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8215. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: 2142eeda9df262e989951c4b31273cbd9346567f > Support implicit type widening for array/map constructors in SQL > > > Key: FLINK-8215 > URL: https://issues.apache.org/jira/browse/FLINK-8215 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > Fix For: 1.5.0 > > > TableAPI goes through `LogicalNode.validate()`, which brings up the > collection validation and rejects inconsistent type, this will throw > `ValidationExcpetion` for something like `array(1.0, 2.0f)`. > SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode > validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS > DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception. > Root cause is the CodeGeneration for these collection value constructors does > not cast or resolve leastRestrictive type correctly. I see 2 options: > 1. Strengthen validation to not allow resolving leastRestrictive type on SQL. > 2. Making codegen support leastRestrictive type cast, such as using > `generateCast` instead of direct casting like `(ClassType) element`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8215) Support implicit type widening for array/map constructors in SQL
[ https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8215: Summary: Support implicit type widening for array/map constructors in SQL (was: upport implicit type widening for array/map constructors in SQL) > Support implicit type widening for array/map constructors in SQL > > > Key: FLINK-8215 > URL: https://issues.apache.org/jira/browse/FLINK-8215 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > TableAPI goes through `LogicalNode.validate()`, which brings up the > collection validation and rejects inconsistent type, this will throw > `ValidationExcpetion` for something like `array(1.0, 2.0f)`. > SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode > validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS > DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception. > Root cause is the CodeGeneration for these collection value constructors does > not cast or resolve leastRestrictive type correctly. I see 2 options: > 1. Strengthen validation to not allow resolving leastRestrictive type on SQL. > 2. Making codegen support leastRestrictive type cast, such as using > `generateCast` instead of direct casting like `(ClassType) element`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8215) Support implicit type widening for array/map constructors in SQL
[ https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296601#comment-16296601 ] ASF GitHub Bot commented on FLINK-8215: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5148 > Support implicit type widening for array/map constructors in SQL > > > Key: FLINK-8215 > URL: https://issues.apache.org/jira/browse/FLINK-8215 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > TableAPI goes through `LogicalNode.validate()`, which brings up the > collection validation and rejects inconsistent type, this will throw > `ValidationExcpetion` for something like `array(1.0, 2.0f)`. > SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode > validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS > DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception. > Root cause is the CodeGeneration for these collection value constructors does > not cast or resolve leastRestrictive type correctly. I see 2 options: > 1. Strengthen validation to not allow resolving leastRestrictive type on SQL. > 2. Making codegen support leastRestrictive type cast, such as using > `generateCast` instead of direct casting like `(ClassType) element`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5148: [FLINK-8215][Table] fix codegen issue on array/map...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5148 ---
[jira] [Updated] (FLINK-8215) upport implicit type widening for array/map constructors in SQL
[ https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8215: Summary: upport implicit type widening for array/map constructors in SQL (was: Collections codegen exception when constructing Array or Map via SQL API) > upport implicit type widening for array/map constructors in SQL > --- > > Key: FLINK-8215 > URL: https://issues.apache.org/jira/browse/FLINK-8215 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > TableAPI goes through `LogicalNode.validate()`, which brings up the > collection validation and rejects inconsistent type, this will throw > `ValidationExcpetion` for something like `array(1.0, 2.0f)`. > SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode > validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS > DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception. > Root cause is the CodeGeneration for these collection value constructors does > not cast or resolve leastRestrictive type correctly. I see 2 options: > 1. Strengthen validation to not allow resolving leastRestrictive type on SQL. > 2. Making codegen support leastRestrictive type cast, such as using > `generateCast` instead of direct casting like `(ClassType) element`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296581#comment-16296581 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157709904 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- Yes, I totally agree with your point of current status of spillable/spilled subpartitions and subpartition views. And I also think that the `PipelinedSubpartition` is the most important path and the `SpillableSubpartition` should not be very sensitive. I think we already reach a consensus for the way of `SpillableSubpartition` and I will do for that later. :) > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157709904 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- Yes, I totally agree with your point of current status of spillable/spilled subpartitions and subpartition views. And I also think that the `PipelinedSubpartition` is the most important path and the `SpillableSubpartition` should not be very sensitive. I think we already reach a consensus for the way of `SpillableSubpartition` and I will do for that later. :) ---
[jira] [Commented] (FLINK-8215) Collections codegen exception when constructing Array or Map via SQL API
[ https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296576#comment-16296576 ] ASF GitHub Bot commented on FLINK-8215: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5148 Thank for this fix @walterddr. The code looks good. I will merge this... > Collections codegen exception when constructing Array or Map via SQL API > > > Key: FLINK-8215 > URL: https://issues.apache.org/jira/browse/FLINK-8215 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Rong Rong >Assignee: Rong Rong > > TableAPI goes through `LogicalNode.validate()`, which brings up the > collection validation and rejects inconsistent type, this will throw > `ValidationExcpetion` for something like `array(1.0, 2.0f)`. > SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode > validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS > DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception. > Root cause is the CodeGeneration for these collection value constructors does > not cast or resolve leastRestrictive type correctly. I see 2 options: > 1. Strengthen validation to not allow resolving leastRestrictive type on SQL. > 2. Making codegen support leastRestrictive type cast, such as using > `generateCast` instead of direct casting like `(ClassType) element`. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5148: [FLINK-8215][Table] fix codegen issue on array/map value ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5148 Thank for this fix @walterddr. The code looks good. I will merge this... ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296562#comment-16296562 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157707628 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- yes, that would be nice > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157707628 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- yes, that would be nice ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296558#comment-16296558 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157706951 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- Sorry my expression is not correct above. I mean we do not need `decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the `parent` as `SpillableSubpartition` in `SpilledSubpartitionView`. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296559#comment-16296559 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157706995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- Your absolutely right about not counting events . Therefore, we cannot use the queue's size as I suggested. Yes, `BufferAndAvailability` would need to be extended as well. This integration/split of the spillable/spilled subpartitions and subpartition views and both of them working on the same structures requiring the same synchronisation pattern is imho really not nice and highly fragile. @pnowojski and me are currently re-designing the synchronisation in these parts of the code and are a bit sensitive to it now so let's drag him into this discussion as well: I would consider `PipelinedSubpartition` the hot path where we need to optimise most - spillable subpartitions are used in batch mode and have higher tolerances, especially when spilling to disk. if you returned the new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` however (retrieved under the `synchronized (buffers)` section), then you would not need the `volatile` either since you are already under the lock. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157706995 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- Your absolutely right about not counting events . Therefore, we cannot use the queue's size as I suggested. Yes, `BufferAndAvailability` would need to be extended as well. This integration/split of the spillable/spilled subpartitions and subpartition views and both of them working on the same structures requiring the same synchronisation pattern is imho really not nice and highly fragile. @pnowojski and me are currently re-designing the synchronisation in these parts of the code and are a bit sensitive to it now so let's drag him into this discussion as well: I would consider `PipelinedSubpartition` the hot path where we need to optimise most - spillable subpartitions are used in batch mode and have higher tolerances, especially when spilling to disk. if you returned the new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` however (retrieved under the `synchronized (buffers)` section), then you would not need the `volatile` either since you are already under the lock. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157706951 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- Sorry my expression is not correct above. I mean we do not need `decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the `parent` as `SpillableSubpartition` in `SpilledSubpartitionView`. ---
[GitHub] flink pull request #5158: [hotfix][docs] Update debugging class loading doc ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5158 ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296546#comment-16296546 ] Timo Walther commented on FLINK-8240: - Thanks for your response [~wheat9]. We need to decide how the syntax for {{CREATE EXTERNAL TABLE}} will look like. It could look more like Hive or more like the unified interace of this issue. But in any case such a statement would compile down to the unified interface. We won't support every combination of connector/encoding but with this abstraction we don't need to expose things like a {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}, etc. anymore. From an API level we have a clear separation that might (or might not) also separate components internally in the future. Btw not all tables need to be built with a factory. For now, we will keep the builders inside every table source (like {{CsvTableSource.builder()}}). This is also needed because you cannot express everything as a string property. The factories will use the builders to create the table sources. > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296538#comment-16296538 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157703075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- `package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer buffer);`, already works without changing anything since `SpilledSubpartitionView` is in the same package > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157703075 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- `package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer buffer);`, already works without changing anything since `SpilledSubpartitionView` is in the same package ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296475#comment-16296475 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157694294 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- sure > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157694294 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- sure ---
[jira] [Assigned] (FLINK-4352) Refactor CLI to use RESTful client for cluster communication
[ https://issues.apache.org/jira/browse/FLINK-4352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-4352: Assignee: Till Rohrmann > Refactor CLI to use RESTful client for cluster communication > > > Key: FLINK-4352 > URL: https://issues.apache.org/jira/browse/FLINK-4352 > Project: Flink > Issue Type: Improvement > Components: Client, Cluster Management >Reporter: jingzhang >Assignee: Till Rohrmann > Labels: flip-6 > > We have to refactor the Flink's CLI in order to be able to use the RESTful > client to communicate with the new Flip-6 Flink cluster. > This could be done by implementing a new {{ClusterClient}} or by refactoring > the {{ClusterClient}} to not use the static methods of the {{JobClient}} but > instead an interface implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157693477 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception { partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- sure ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296468#comment-16296468 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157693477 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception { partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- sure > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8293) Rework Flink's type and serialization docs
Timo Walther created FLINK-8293: --- Summary: Rework Flink's type and serialization docs Key: FLINK-8293 URL: https://issues.apache.org/jira/browse/FLINK-8293 Project: Flink Issue Type: Improvement Components: Documentation, Type Serialization System Reporter: Timo Walther Assignee: Timo Walther The documentation about Flink's type and serialization system hasn't been updated for a while and there are a lot of users (especially beginners) that have problems with defining types for inputs, functions, state etc. We should rework the documentation a little bit to solve things like: Type related things: {code} "Document all data types. What TypeInfo is available? What are the limitation? Encourage TypeHints? returns()? Link to new Types class. How to declare a valid case class in Scala. Look into log if type is no POJO type (e.g. when using org.json4s) ResultTypeQueryable documentation Case classes and Tuples do not support null! Subtypes of POJOs are handled during runtime or via cache and registerType() Explain all methods in ExecutionConfig. Compatibility guarantees. Pojos must have a void setter. Why are we so strict? Update docs in api_concepts about types (Avro is not used for POJOs)!" {code} Serialization related things: {code} "Serialization overview. Big picture (what is serialized, how, why, where, when?). When/why should I register a type or a subtype -- what does that do? Link to ""Streaming/Working with State/Custom Serialization for Managed State""." {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296452#comment-16296452 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157691096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- The way of `ArrayDeque#size()` for `getBuffersInBacklog()` may be not feasible because we do not know how many events in the `ArrayDeque` and they should not be considered as backlog length. For the new API, we may need to modify the `ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping structure instead of `Buffer`, and do we also need to extend the `BufferAndAvailability` to add backlog in it? By this way, it can get benefits for `PipelinedSubpartition` to reduce 'volatile`, but for `SpillableSubpartition`, the `volatile` may still be needed? Because the `getNextBuffer` and `decreaseBacklog` are in different parts for `SpillableSubpartitionView/SpilledSubpartitionView`. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157691096 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- The way of `ArrayDeque#size()` for `getBuffersInBacklog()` may be not feasible because we do not know how many events in the `ArrayDeque` and they should not be considered as backlog length. For the new API, we may need to modify the `ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping structure instead of `Buffer`, and do we also need to extend the `BufferAndAvailability` to add backlog in it? By this way, it can get benefits for `PipelinedSubpartition` to reduce 'volatile`, but for `SpillableSubpartition`, the `volatile` may still be needed? Because the `getNextBuffer` and `decreaseBacklog` are in different parts for `SpillableSubpartitionView/SpilledSubpartitionView`. ---
[jira] [Updated] (FLINK-7736) Fix some of the alerts raised by lgtm.com
[ https://issues.apache.org/jira/browse/FLINK-7736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Malcolm Taylor updated FLINK-7736: -- Issue Type: Bug (was: Improvement) > Fix some of the alerts raised by lgtm.com > - > > Key: FLINK-7736 > URL: https://issues.apache.org/jira/browse/FLINK-7736 > Project: Flink > Issue Type: Bug >Reporter: Malcolm Taylor >Assignee: Malcolm Taylor > > lgtm.com has identified a number of issues giving scope for improvement in > the code: [https://lgtm.com/projects/g/apache/flink/alerts/?mode=list] > This issue is to address some of the simpler ones. Some of these are quite > clear bugs such as off-by-one errors. Others are areas where the code might > be made clearer, such as use of a variable name which shadows another > variable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-7452. - Resolution: Fixed Fix Version/s: 1.5.0 Fixed in 1.5: e30066dbd1ebf3c5780df89d766554042c8345a7 > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > Fix For: 1.5.0 > > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296446#comment-16296446 ] ASF GitHub Bot commented on FLINK-7951: --- Github user djh4230 commented on the issue: https://github.com/apache/flink/pull/4926 ![image](https://user-images.githubusercontent.com/8032384/34147925-5e0fa10c-e4da-11e7-992d-b422f59a7d16.png) Not work! still can't read hdfs-site.xml and can't read hdfs namespace ![image](https://user-images.githubusercontent.com/8032384/34148009-a355c98a-e4da-11e7-93e3-ede389db9118.png) > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...
Github user djh4230 commented on the issue: https://github.com/apache/flink/pull/4926 ![image](https://user-images.githubusercontent.com/8032384/34147925-5e0fa10c-e4da-11e7-992d-b422f59a7d16.png) Not work! still can't read hdfs-site.xml and can't read hdfs namespace ![image](https://user-images.githubusercontent.com/8032384/34148009-a355c98a-e4da-11e7-93e3-ede389db9118.png) ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296444#comment-16296444 ] ASF GitHub Bot commented on FLINK-7452: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4612 > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4612 ---
[jira] [Commented] (FLINK-8292) Remove unnecessary force cast in DataStreamSource
[ https://issues.apache.org/jira/browse/FLINK-8292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296442#comment-16296442 ] ASF GitHub Bot commented on FLINK-8292: --- GitHub user Matrix42 opened a pull request: https://github.com/apache/flink/pull/5180 [FLINK-8292] Remove unnecessary force cast in DataStreamSource ## What is the purpose of the change Remove unnecessary force cast in DataStreamSource ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change This change is already covered by existing tests, such as DataStreamTest.testParallelism() ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Matrix42/flink DataStreamSource Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5180.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5180 commit d5c55d2a73fdbcc0dfadd05ad87203ca70d64eac Author: Matrix42 <934336...@qq.com> Date: 2017-12-19T08:20:39Z [FLINK-8292] Remove unnecessary force cast in DataStreamSource > Remove unnecessary force cast in DataStreamSource > - > > Key: FLINK-8292 > URL: https://issues.apache.org/jira/browse/FLINK-8292 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Matrix42 >Priority: Trivial > Fix For: 1.5.0, 1.4.1 > > > In DataStreamSource there is a cast can be replaced by retuen `this` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5180: [FLINK-8292] Remove unnecessary force cast in Data...
GitHub user Matrix42 opened a pull request: https://github.com/apache/flink/pull/5180 [FLINK-8292] Remove unnecessary force cast in DataStreamSource ## What is the purpose of the change Remove unnecessary force cast in DataStreamSource ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change This change is already covered by existing tests, such as DataStreamTest.testParallelism() ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): ( no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no ) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Matrix42/flink DataStreamSource Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5180.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5180 commit d5c55d2a73fdbcc0dfadd05ad87203ca70d64eac Author: Matrix42 <934336...@qq.com> Date: 2017-12-19T08:20:39Z [FLINK-8292] Remove unnecessary force cast in DataStreamSource ---
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296435#comment-16296435 ] Haohui Mai commented on FLINK-8240: --- It seems that it is a great use case of layered table sources / converters, thus I'm not fully sure that all tables should be built using {{TableFactory}} yet. Popping up one level, I have a relevant question -- assuming that we need to implement the {{CREATE EXTERNAL TABLE}} statement. How will the statement look like? Here is an example of Hive's {{CREATE EXTERNAL TABLE}} statement: {code} CREATE EXTERNAL TABLE weatherext ( wban INT, date STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ LOCATION ‘ /hive/data/weatherext’; {code} It seems that combinations of {{ROW FORMAT}} and {{LOCATION}} are the effectively same as what you proposed -- but it does not seem to force all table sources to be aware of the compositions of connector / converter (i.e., {{TableFactory}}, at least at the API level. Thoughts? > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8291) For security, Job Manager web UI should be accessed with username/password
[ https://issues.apache.org/jira/browse/FLINK-8291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296434#comment-16296434 ] Lynch Lee commented on FLINK-8291: -- [~till.rohrmann] Do you aggree with me ?? > For security, Job Manager web UI should be accessed with username/password > --- > > Key: FLINK-8291 > URL: https://issues.apache.org/jira/browse/FLINK-8291 > Project: Flink > Issue Type: Improvement > Components: Security, Webfrontend >Affects Versions: 1.3.2 >Reporter: Lynch Lee > > Nowaldays, we submit job from jobm webui without any key for login. > For security, Job Manager web UI should be accessed with username/password > Should we ??? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8292) Remove unnecessary force cast in DataStreamSource
Matrix42 created FLINK-8292: --- Summary: Remove unnecessary force cast in DataStreamSource Key: FLINK-8292 URL: https://issues.apache.org/jira/browse/FLINK-8292 Project: Flink Issue Type: Improvement Components: DataStream API Reporter: Matrix42 Priority: Trivial In DataStreamSource there is a cast can be replaced by retuen `this` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8292) Remove unnecessary force cast in DataStreamSource
[ https://issues.apache.org/jira/browse/FLINK-8292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matrix42 updated FLINK-8292: Fix Version/s: 1.4.1 1.5.0 > Remove unnecessary force cast in DataStreamSource > - > > Key: FLINK-8292 > URL: https://issues.apache.org/jira/browse/FLINK-8292 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Matrix42 >Priority: Trivial > Fix For: 1.5.0, 1.4.1 > > > In DataStreamSource there is a cast can be replaced by retuen `this` -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296431#comment-16296431 ] ASF GitHub Bot commented on FLINK-7468: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157686388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` not `SpillableSubpartition`, after replacing the `ResultSubpartition` by `SpillableSubpartition`, we can make these methods package-private as you suggest. I will do that. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157686388 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` not `SpillableSubpartition`, after replacing the `ResultSubpartition` by `SpillableSubpartition`, we can make these methods package-private as you suggest. I will do that. ---