[GitHub] flink issue #4926: [FLINK-7951] Load YarnConfiguration with default Hadoop c...
Github user djh4230 commented on the issue: https://github.com/apache/flink/pull/4926 Does that mean it's not a issue? But the issue still happened in flink-1.4.How can i fix the issue? ---
[jira] [Commented] (FLINK-7951) YarnApplicationMaster does not load HDFSConfiguration
[ https://issues.apache.org/jira/browse/FLINK-7951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296279#comment-16296279 ] ASF GitHub Bot commented on FLINK-7951: --- Github user djh4230 commented on the issue: https://github.com/apache/flink/pull/4926 Does that mean it's not a issue? But the issue still happened in flink-1.4.How can i fix the issue? > YarnApplicationMaster does not load HDFSConfiguration > - > > Key: FLINK-7951 > URL: https://issues.apache.org/jira/browse/FLINK-7951 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Fix For: 1.4.0 > > > When instantiating the {{YarnConfiguration}} we do not load the corresponding > {{HDFSConfiguration}}. This causes that we do not read the {{hdfs-site.xml}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8260) Document API of Kafka 0.11 Producer
[ https://issues.apache.org/jira/browse/FLINK-8260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296200#comment-16296200 ] ASF GitHub Bot commented on FLINK-8260: --- GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5179 [FLINK-8260/8287] [kafka] Bunch of improvements to Kafka producer Javadocs / document ## What is the purpose of the change This PR collects several improvements to the `FlinkKafkaProducer` Javadocs and user document, with a focus on: - Educate proper producer construction to write timestamps in Flink to Kafka, and not demonstrate deprecated usage. - Clarify partitioning behaviours for the producer, for all variations of constructors that could be used. It also has some miscellaneous trivial fixes that were found meanwhile. ## Brief change log The commit history should serve as a clear list of what has been changed. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8260/8287 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5179.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5179 commit 47bb55a80c308506e7388715eb3b54a0c7733bcf Author: Tzu-Li (Gordon) TaiDate: 2017-12-18T23:21:31Z [FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs commit 16fd116598b05f0b4ed4e4535fe9419767d69915 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:21:20Z [FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour commit 51a64315d59c9dcab39f5ec52b7fa3d6599ca3fd Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:29:38Z [FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010 This commit moves deprecated factory methods of the FlinkKafkaProducer010 behind regular constructors, for better navigation and readability of the code. commit 7219f159a7db502edf2746d1eafa835c85680931 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:34:17Z [hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration FlinkKafkaProducer010Configuration is the return type of the deprecated writeToKafkaWithTimestamp factory methods. Therefore, the class should also be deprecated as well. commit c84033e763c2e59ea4a21d186c0c5f29c4f2a02d Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:36:31Z [hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09 The previous link was referencing a non-existent constructor signature. commit e11f21a888a407f23d18c4c4450584443a7da4c8 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:38:32Z [hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010 > Document API of Kafka 0.11 Producer > --- > > Key: FLINK-8260 > URL: https://issues.apache.org/jira/browse/FLINK-8260 > Project: Flink > Issue Type: Improvement > Components: Documentation >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Fix For: 1.5.0, 1.4.1 > > > The API of the Flink Kafka Producer changed for Kafka 0.11, for example there > is no {{writeToKafkaWithTimestamps}} method anymore. > This needs to be added to the [Kafka connector > documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-producer], > i.e., a new tab with a code snippet needs to be added for Kafka 0.11. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5178: [hotfix] Fix typo in TestableKinesisDataFetcher
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5178 +1, thanks for the fix. Merging this ... ---
[GitHub] flink pull request #5179: [FLINK-8260/8287] [kafka] Bunch of improvements to...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/5179 [FLINK-8260/8287] [kafka] Bunch of improvements to Kafka producer Javadocs / document ## What is the purpose of the change This PR collects several improvements to the `FlinkKafkaProducer` Javadocs and user document, with a focus on: - Educate proper producer construction to write timestamps in Flink to Kafka, and not demonstrate deprecated usage. - Clarify partitioning behaviours for the producer, for all variations of constructors that could be used. It also has some miscellaneous trivial fixes that were found meanwhile. ## Brief change log The commit history should serve as a clear list of what has been changed. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? n/a You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8260/8287 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5179.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5179 commit 47bb55a80c308506e7388715eb3b54a0c7733bcf Author: Tzu-Li (Gordon) TaiDate: 2017-12-18T23:21:31Z [FLINK-8260] [kafka] Fix usage of deprecated instantiation methods in FlinkKafkaProducer docs commit 16fd116598b05f0b4ed4e4535fe9419767d69915 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:21:20Z [FLINK-8287] [kafka] Improve Kafka producer Javadocs / doc to clarify partitioning behaviour commit 51a64315d59c9dcab39f5ec52b7fa3d6599ca3fd Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:29:38Z [FLINK-8260] [kafka] Reorder deprecated / regular constructors of FlinkKafkaProducer010 This commit moves deprecated factory methods of the FlinkKafkaProducer010 behind regular constructors, for better navigation and readability of the code. commit 7219f159a7db502edf2746d1eafa835c85680931 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:34:17Z [hotfix] [kafka] Properly deprecate FlinkKafkaProducer010Configuration FlinkKafkaProducer010Configuration is the return type of the deprecated writeToKafkaWithTimestamp factory methods. Therefore, the class should also be deprecated as well. commit c84033e763c2e59ea4a21d186c0c5f29c4f2a02d Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:36:31Z [hotfix] [kafka] Fix stale Javadoc link in FlinkKafkaProducer09 The previous link was referencing a non-existent constructor signature. commit e11f21a888a407f23d18c4c4450584443a7da4c8 Author: Tzu-Li (Gordon) Tai Date: 2017-12-19T04:38:32Z [hotfix] [kafka] Add serialVersionUID to FlinkKafkaProducer010 ---
[jira] [Comment Edited] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
[ https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296123#comment-16296123 ] jia liu edited comment on FLINK-8248 at 12/19/17 3:29 AM: -- It happened again, I've read the source code. Found that may be a bug at *org.apache.flink.cep.nfa.SharedBufferSerializer.serialize()*. It did not put the *SharedBufferEntry* related *edges* into *entryIDs* in the first loop, But query from *entryIDs* in the second loop at line 942. {code:java} @Override public void serialize(SharedBufferrecord, DataOutputView target) throws IOException { Map > pages = record.pages; Map , Integer> entryIDs = new HashMap<>(); int totalEdges = 0; int entryCounter = 0; // number of pages target.writeInt(pages.size()); for (Map.Entry > pageEntry: pages.entrySet()) { SharedBufferPage page = pageEntry.getValue(); // key for the current page keySerializer.serialize(page.getKey(), target); // number of page entries target.writeInt(page.entries.size()); for (Map.Entry > sharedBufferEntry: page.entries.entrySet()) { SharedBufferEntry sharedBuffer = sharedBufferEntry.getValue(); // assign id to the sharedBufferEntry for the future // serialization of the previous relation entryIDs.put(sharedBuffer, entryCounter++); ValueTimeWrapper valueTimeWrapper = sharedBuffer.getValueTime(); valueSerializer.serialize(valueTimeWrapper.getValue(), target); target.writeLong(valueTimeWrapper.getTimestamp()); target.writeInt(valueTimeWrapper.getCounter()); int edges = sharedBuffer.edges.size(); totalEdges += edges; target.writeInt(sharedBuffer.referenceCounter); } } // write the edges between the shared buffer entries target.writeInt(totalEdges); for (Map.Entry > pageEntry: pages.entrySet()) { SharedBufferPage page = pageEntry.getValue(); for (Map.Entry > sharedBufferEntry: page.entries.entrySet()) { SharedBufferEntry sharedBuffer = sharedBufferEntry.getValue(); Integer id = entryIDs.get(sharedBuffer); Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer + "pages: " + pages + "entryIDs: " + entryIDs); for (SharedBufferEdge edge: sharedBuffer.edges) { // in order to serialize the previous relation we simply serialize the ids // of the source and target SharedBufferEntry if (edge.target != null) { Integer targetId = entryIDs.get(edge.getTarget()); Preconditions.checkState(targetId != null, "Could not find id for entry: " + edge.getTarget() + "pages: " + pages + "entryIDs: " + entryIDs); target.writeInt(id); target.writeInt(targetId); versionSerializer.serialize(edge.version, target); } else { target.writeInt(id); target.writeInt(-1); versionSerializer.serialize(edge.version, target); } }
[jira] [Reopened] (FLINK-8248) RocksDB state backend Checkpointing is not working with KeyedCEP in 1.4
[ https://issues.apache.org/jira/browse/FLINK-8248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jia liu reopened FLINK-8248: It happened again, I've read the source code. Found that may be a bug at *org.apache.flink.cep.nfa.SharedBufferSerializer.serialize()*. It did not put the *SharedBufferEntry* related *edges* into *entryIDs* in the first loop, But query from *entryIDs* at line 942. {code:java} @Override public void serialize(SharedBufferrecord, DataOutputView target) throws IOException { Map > pages = record.pages; Map , Integer> entryIDs = new HashMap<>(); int totalEdges = 0; int entryCounter = 0; // number of pages target.writeInt(pages.size()); for (Map.Entry > pageEntry: pages.entrySet()) { SharedBufferPage page = pageEntry.getValue(); // key for the current page keySerializer.serialize(page.getKey(), target); // number of page entries target.writeInt(page.entries.size()); for (Map.Entry > sharedBufferEntry: page.entries.entrySet()) { SharedBufferEntry sharedBuffer = sharedBufferEntry.getValue(); // assign id to the sharedBufferEntry for the future // serialization of the previous relation entryIDs.put(sharedBuffer, entryCounter++); ValueTimeWrapper valueTimeWrapper = sharedBuffer.getValueTime(); valueSerializer.serialize(valueTimeWrapper.getValue(), target); target.writeLong(valueTimeWrapper.getTimestamp()); target.writeInt(valueTimeWrapper.getCounter()); int edges = sharedBuffer.edges.size(); totalEdges += edges; target.writeInt(sharedBuffer.referenceCounter); } } // write the edges between the shared buffer entries target.writeInt(totalEdges); for (Map.Entry > pageEntry: pages.entrySet()) { SharedBufferPage page = pageEntry.getValue(); for (Map.Entry > sharedBufferEntry: page.entries.entrySet()) { SharedBufferEntry sharedBuffer = sharedBufferEntry.getValue(); Integer id = entryIDs.get(sharedBuffer); Preconditions.checkState(id != null, "Could not find id for entry: " + sharedBuffer + "pages: " + pages + "entryIDs: " + entryIDs); for (SharedBufferEdge edge: sharedBuffer.edges) { // in order to serialize the previous relation we simply serialize the ids // of the source and target SharedBufferEntry if (edge.target != null) { Integer targetId = entryIDs.get(edge.getTarget()); Preconditions.checkState(targetId != null, "Could not find id for entry: " + edge.getTarget() + "pages: " + pages + "entryIDs: " + entryIDs); target.writeInt(id); target.writeInt(targetId); versionSerializer.serialize(edge.version, target); } else { target.writeInt(id); target.writeInt(-1); versionSerializer.serialize(edge.version, target); } } } } } {code} data example : {noformat}
[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade from deprecated c...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5171 No, those classes have been deprecated in AWS SDK 1.11.171. ---
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296107#comment-16296107 ] ASF GitHub Bot commented on FLINK-8271: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5171 No, those classes have been deprecated in AWS SDK 1.11.171. > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes
[ https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296073#comment-16296073 ] dongxiao.yang edited comment on FLINK-8270 at 12/19/17 2:56 AM: Yes , the PR should fixes the problem correctly .So can i expect this bug will be fixed in the next release version 1.4.1? was (Author: dongxiao.yang): Yes , the PR should fixes the problem correctly .So can i expect this bug will be fixed in the next release vesion 1.4.1? > TaskManagers do not use correct local path for shipped Keytab files in Yarn > deployment modes > > > Key: FLINK-8270 > URL: https://issues.apache.org/jira/browse/FLINK-8270 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Reported on ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html > This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are > again not using the correct local paths for shipped Keytab files. > The cause was accidental due to this change: > https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e. > Things to consider: > 1) The above accidental breaking change was actually targeting a minor > refactor on the "integration test scenario" code block in > {{YarnTaskManagerRunner}}. It would be best if we can remove that test case > code block from the main code. > 2) Unit test coverage is apparently not enough. As this incidence shows, any > slight changes can cause this issue to easily resurface again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8270) TaskManagers do not use correct local path for shipped Keytab files in Yarn deployment modes
[ https://issues.apache.org/jira/browse/FLINK-8270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16296073#comment-16296073 ] dongxiao.yang commented on FLINK-8270: -- Yes , the PR should fixes the problem correctly .So can i expect this bug will be fixed in the next release vesion 1.4.1? > TaskManagers do not use correct local path for shipped Keytab files in Yarn > deployment modes > > > Key: FLINK-8270 > URL: https://issues.apache.org/jira/browse/FLINK-8270 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > Reported on ML: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-4-0-keytab-is-unreadable-td17292.html > This is a "recurrence" of FLINK-5580. The TMs in Yarn deployment modes are > again not using the correct local paths for shipped Keytab files. > The cause was accidental due to this change: > https://github.com/apache/flink/commit/7f1c23317453859ce3b136b6e13f698d3fee34a1#diff-a81afdf5ce0872836ac6fadb603d483e. > Things to consider: > 1) The above accidental breaking change was actually targeting a minor > refactor on the "integration test scenario" code block in > {{YarnTaskManagerRunner}}. It would be best if we can remove that test case > code block from the main code. > 2) Unit test coverage is apparently not enough. As this incidence shows, any > slight changes can cause this issue to easily resurface again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8290) Modify clientId to groupId in flink-connector-kafka-0.8
xymaqingxiang created FLINK-8290: Summary: Modify clientId to groupId in flink-connector-kafka-0.8 Key: FLINK-8290 URL: https://issues.apache.org/jira/browse/FLINK-8290 Project: Flink Issue Type: Improvement Reporter: xymaqingxiang Now the Clientid that consumes the all topics are constant("flink-kafka-consumer-legacy-" + broker.id()), and it is not easy for us to look at kafka's log, so I recommend that it be modified to groupid. We can modify the SimpleConsumerThread.java file, as shown below: {code:java} private final String clientIdFormGroup; ... this.clientIdFormGroup = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id()); ... final String clientId = clientIdFormGroup; {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8288) Register the web interface url to yarn for yarn job mode
[ https://issues.apache.org/jira/browse/FLINK-8288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8288: Affects Version/s: 1.5.0 > Register the web interface url to yarn for yarn job mode > > > Key: FLINK-8288 > URL: https://issues.apache.org/jira/browse/FLINK-8288 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > For flip-6 job mode, the resource manager is created before the web monitor, > so the web interface url is not set to resource manager, and the resource > manager can not register the url to yarn. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu updated FLINK-8289: Affects Version/s: 1.5.0 > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
shuai.xu created FLINK-8289: --- Summary: The RestServerEndpoint should return the address with real ip when getRestAdddress Key: FLINK-8289 URL: https://issues.apache.org/jira/browse/FLINK-8289 Project: Flink Issue Type: Bug Reporter: shuai.xu Now when RestServerEndpoint.getRestAddress, it will return an address same with the value of config rest.address, the default it 127.0.0.1:9067, but this address can not be accessed from another machine. And the ip for Dispatcher and JobMaster are usually dynamically, so user will configure it to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address will be registered to YARN or Mesos, but this address can not be accessed from another machine also. So it need to return the real ip:port for user to access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] brucewoo closed FLINK-8281. --- Resolution: Not A Problem Fix Version/s: 1.4.0 > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to > org.apache.flink.core.fs.WrappingProxyCloseable > - > > Key: FLINK-8281 > URL: https://issues.apache.org/jira/browse/FLINK-8281 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 > 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: brucewoo >Priority: Critical > Fix For: 1.4.0 > > > {noformat} > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1 for operator window: > (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS > api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, > start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, > proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, > api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[na:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: > (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS > total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS > proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > ~[na:1.8.0_151] > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 common frames omitted > Caused by: java.io.IOException: Could not open output stream for state > backend > at >
[jira] [Created] (FLINK-8288) Register the web interface url to yarn for yarn job mode
shuai.xu created FLINK-8288: --- Summary: Register the web interface url to yarn for yarn job mode Key: FLINK-8288 URL: https://issues.apache.org/jira/browse/FLINK-8288 Project: Flink Issue Type: Bug Components: Cluster Management Reporter: shuai.xu Assignee: shuai.xu For flip-6 job mode, the resource manager is created before the web monitor, so the web interface url is not set to resource manager, and the resource manager can not register the url to yarn. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] brucewoo updated FLINK-8281: http://www.apache.org/dyn/closer.lua/flink/flink-1.4.0/flink-1.4.0-bin-hadoop27-scala_2.11.tgz > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to > org.apache.flink.core.fs.WrappingProxyCloseable > - > > Key: FLINK-8281 > URL: https://issues.apache.org/jira/browse/FLINK-8281 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 > 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: brucewoo >Priority: Critical > > {noformat} > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1 for operator window: > (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS > api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, > start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, > proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, > api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[na:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: > (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS > total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS > proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > ~[na:1.8.0_151] > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) > ... 7 common frames omitted > Caused by: java.io.IOException: Could not open output stream for state > backend > at >
[GitHub] flink pull request #5178: [hotfix] Fix typo in TestableKinesisDataFetcher
GitHub user casidiablo opened a pull request: https://github.com/apache/flink/pull/5178 [hotfix] Fix typo in TestableKinesisDataFetcher You can merge this pull request into a Git repository by running: $ git pull https://github.com/casidiablo/flink hotfix/typo-gst Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5178 commit 0fa920614acae70efcebf3816d988cd02affcade Author: CristianDate: 2017-12-18T23:56:03Z [hotfix] Fix typo in TestableKinesisDataFetcher ---
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295892#comment-16295892 ] Cristian commented on FLINK-8162: - I will try to test and push a PR soon. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8287) Flink Kafka Producer docs should clearly state what partitioner is used by default
Tzu-Li (Gordon) Tai created FLINK-8287: -- Summary: Flink Kafka Producer docs should clearly state what partitioner is used by default Key: FLINK-8287 URL: https://issues.apache.org/jira/browse/FLINK-8287 Project: Flink Issue Type: Improvement Components: Documentation, Kafka Connector Reporter: Tzu-Li (Gordon) Tai Assignee: Tzu-Li (Gordon) Tai Fix For: 1.5.0, 1.4.1 See original discussion in ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaProducerXX-td16951.html It is worth mentioning what partitioning scheme is used by the {{FlinkKafkaProducer}} by default when writing to Kafka, as it seems user are often surprised by the default {{FlinkFixedPartitioner}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8162) Kinesis Connector to report millisBehindLatest metric
[ https://issues.apache.org/jira/browse/FLINK-8162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295843#comment-16295843 ] Furruska commented on FLINK-8162: - Can you share your fork with me? I'm interested in this but haven't been able to make it work. > Kinesis Connector to report millisBehindLatest metric > - > > Key: FLINK-8162 > URL: https://issues.apache.org/jira/browse/FLINK-8162 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Cristian >Priority: Minor > Labels: kinesis > Fix For: 1.5.0 > > Original Estimate: 24h > Remaining Estimate: 24h > > When reading from Kinesis streams, one of the most valuable metrics is > "MillisBehindLatest" (see > https://github.com/aws/aws-sdk-java/blob/25f0821f69bf94ec456f602f2b83ea2b0ca15643/aws-java-sdk-kinesis/src/main/java/com/amazonaws/services/kinesis/model/GetRecordsResult.java#L187-L201). > Flink should use its metrics mechanism to report this value as a gauge, > tagging it with the shard id. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295816#comment-16295816 ] ASF GitHub Bot commented on FLINK-8271: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5171 Without the SDK upgrade, the API actually isn't yet deprecated yet, right? It is only deprecated in a newer SDK version. If so, I would prefer to not merge this until we actually try to upgrade the AWS Java SDK version. Then, this change would be more context relevant. > upgrade from deprecated classes to AmazonKinesis > > > Key: FLINK-8271 > URL: https://issues.apache.org/jira/browse/FLINK-8271 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0, 1.4.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5171: [FLINK-8271][Kinesis connector] upgrade from deprecated c...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5171 Without the SDK upgrade, the API actually isn't yet deprecated yet, right? It is only deprecated in a newer SDK version. If so, I would prefer to not merge this until we actually try to upgrade the AWS Java SDK version. Then, this change would be more context relevant. ---
[jira] [Comment Edited] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295806#comment-16295806 ] Stefan Richter edited comment on FLINK-8281 at 12/18/17 10:58 PM: -- Hi, I had a look at your stacktraces and something seems very strange about the stack trace but also the problem itself. In particular, this line doesn't make any sense to me if you check the line reported number: {code} at org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:1) {code} If I follow the the actual codepath, I also cannot see how this cast can ever fail. Are you sure your jar contains the proper dependencies for your Flink version? was (Author: srichter): Hi, I had a look at your stacktraces and something seems very strange the stack trace but also the problem itself. In particular, this line doesn't make any sense to me if you check the line reported number: {code} at org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:1) {code} If I follow the the actual codepath, I also cannot see how this cast can ever fail. Are you sure your jar contains the proper dependencies for your Flink version? > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to > org.apache.flink.core.fs.WrappingProxyCloseable > - > > Key: FLINK-8281 > URL: https://issues.apache.org/jira/browse/FLINK-8281 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 > 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: brucewoo >Priority: Critical > > {noformat} > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1 for operator window: > (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS > api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, > start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, > proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, > api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[na:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: > (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS > total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS > proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > ~[na:1.8.0_151] > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] >
[jira] [Commented] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295806#comment-16295806 ] Stefan Richter commented on FLINK-8281: --- Hi, I had a look at your stacktraces and something seems very strange the stack trace but also the problem itself. In particular, this line doesn't make any sense to me if you check the line reported number: {code} at org.apache.flink.core.fs.SafetyNetCloseableRegistry.doRegister(SafetyNetCloseableRegistry.java:1) {code} If I follow the the actual codepath, I also cannot see how this cast can ever fail. Are you sure your jar contains the proper dependencies for your Flink version? > org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to > org.apache.flink.core.fs.WrappingProxyCloseable > - > > Key: FLINK-8281 > URL: https://issues.apache.org/jira/browse/FLINK-8281 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.0 > Environment: Linux 4.4.0-104-generic #127-Ubuntu SMP Mon Dec 11 > 12:16:42 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux >Reporter: brucewoo >Priority: Critical > > {noformat} > org.apache.flink.streaming.runtime.tasks.AsynchronousException: > java.lang.Exception: Could not materialize checkpoint 1 for operator window: > (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS > api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, > start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, > proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, > api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[na:1.8.0_151] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[na:1.8.0_151] > at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] > Caused by: java.lang.Exception: Could not materialize checkpoint 1 for > operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: > (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS > total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS > proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > ~[na:1.8.0_151] > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > ~[na:1.8.0_151] > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Suppressed: java.lang.Exception: Could not properly cancel managed > keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) > ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] > ... 5 common frames omitted > Caused by: java.util.concurrent.ExecutionException: > java.io.IOException: Could not open output stream for state backend > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at >
[jira] [Commented] (FLINK-3720) Add warm starts for models
[ https://issues.apache.org/jira/browse/FLINK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295735#comment-16295735 ] ASF GitHub Bot commented on FLINK-3720: --- Github user rawkintrevo commented on the issue: https://github.com/apache/flink/pull/1865 closing this > Add warm starts for models > -- > > Key: FLINK-3720 > URL: https://issues.apache.org/jira/browse/FLINK-3720 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Trevor Grant >Assignee: Trevor Grant > > Add 'warm-start' to Iterative Solver. > - Make weight vector settable (this will allow for model saving/loading) > - Make iterator existing weight vector if available > - Keep track of what iteration we're on for additional partial fits in SGD > (and anywhere else it makes sense). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3720) Add warm starts for models
[ https://issues.apache.org/jira/browse/FLINK-3720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295736#comment-16295736 ] ASF GitHub Bot commented on FLINK-3720: --- Github user rawkintrevo closed the pull request at: https://github.com/apache/flink/pull/1865 > Add warm starts for models > -- > > Key: FLINK-3720 > URL: https://issues.apache.org/jira/browse/FLINK-3720 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Trevor Grant >Assignee: Trevor Grant > > Add 'warm-start' to Iterative Solver. > - Make weight vector settable (this will allow for model saving/loading) > - Make iterator existing weight vector if available > - Keep track of what iteration we're on for additional partial fits in SGD > (and anywhere else it makes sense). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #1875: [FLINK-3742][ml][wip] Add Multilayer Perceptron
Github user rawkintrevo closed the pull request at: https://github.com/apache/flink/pull/1875 ---
[GitHub] flink pull request #1865: [FLINK-3720][ml][wip] Add Warm Starts for Iterativ...
Github user rawkintrevo closed the pull request at: https://github.com/apache/flink/pull/1865 ---
[jira] [Commented] (FLINK-3742) Add Multi Layer Perceptron Predictor
[ https://issues.apache.org/jira/browse/FLINK-3742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295734#comment-16295734 ] ASF GitHub Bot commented on FLINK-3742: --- Github user rawkintrevo closed the pull request at: https://github.com/apache/flink/pull/1875 > Add Multi Layer Perceptron Predictor > > > Key: FLINK-3742 > URL: https://issues.apache.org/jira/browse/FLINK-3742 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Trevor Grant >Assignee: Trevor Grant >Priority: Minor > > https://en.wikipedia.org/wiki/Multilayer_perceptron > Multilayer perceptron is a simple sort of artificial neural network. It > creates a directed graph in which the edges are parameter weights and nodes > are non-linear activation functions. It is solved via a method known as back > propagation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3742) Add Multi Layer Perceptron Predictor
[ https://issues.apache.org/jira/browse/FLINK-3742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295733#comment-16295733 ] ASF GitHub Bot commented on FLINK-3742: --- Github user rawkintrevo commented on the issue: https://github.com/apache/flink/pull/1875 closing this > Add Multi Layer Perceptron Predictor > > > Key: FLINK-3742 > URL: https://issues.apache.org/jira/browse/FLINK-3742 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Trevor Grant >Assignee: Trevor Grant >Priority: Minor > > https://en.wikipedia.org/wiki/Multilayer_perceptron > Multilayer perceptron is a simple sort of artificial neural network. It > creates a directed graph in which the edges are parameter weights and nodes > are non-linear activation functions. It is solved via a method known as back > propagation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #1865: [FLINK-3720][ml][wip] Add Warm Starts for Iterative Solve...
Github user rawkintrevo commented on the issue: https://github.com/apache/flink/pull/1865 closing this ---
[GitHub] flink issue #1875: [FLINK-3742][ml][wip] Add Multilayer Perceptron
Github user rawkintrevo commented on the issue: https://github.com/apache/flink/pull/1875 closing this ---
[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295697#comment-16295697 ] ASF GitHub Bot commented on FLINK-8116: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5121 Thanks @ankitiitb1069 @ggevay for the work and review. The changes LGTM, minus my comment. I'll address my comments while merging this ... > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5121: [FLINK-8116] Stale comments referring to Checkpointed int...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5121 Thanks @ankitiitb1069 @ggevay for the work and review. The changes LGTM, minus my comment. I'll address my comments while merging this ... ---
[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295695#comment-16295695 ] ASF GitHub Bot commented on FLINK-8116: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5121#discussion_r157606375 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -61,9 +61,9 @@ * isRunning = false; * } * - * public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; } + * public void snapshotState(FunctionSnapshotContext context) { } * - * public void restoreState(Long state) { this.count = state; } + * public void initializeState(FunctionInitializationContext context) { } --- End diff -- These methods should handle checkpointing of the count state. > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295694#comment-16295694 ] ASF GitHub Bot commented on FLINK-8116: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5121#discussion_r157606329 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -61,9 +61,9 @@ * isRunning = false; * } * - * public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; } + * public void snapshotState(FunctionSnapshotContext context) { } --- End diff -- These methods should handle checkpointing of the `count` state. > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5121#discussion_r157606329 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -61,9 +61,9 @@ * isRunning = false; * } * - * public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; } + * public void snapshotState(FunctionSnapshotContext context) { } --- End diff -- These methods should handle checkpointing of the `count` state. ---
[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5121#discussion_r157606375 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -61,9 +61,9 @@ * isRunning = false; * } * - * public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; } + * public void snapshotState(FunctionSnapshotContext context) { } * - * public void restoreState(Long state) { this.count = state; } + * public void initializeState(FunctionInitializationContext context) { } --- End diff -- These methods should handle checkpointing of the count state. ---
[jira] [Comment Edited] (FLINK-7860) Support YARN proxy user in Flink (impersonation)
[ https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295656#comment-16295656 ] Shuyi Chen edited comment on FLINK-7860 at 12/18/17 9:11 PM: - Hi [~eronwright], can you take a look at this as well? was (Author: suez1224): [~eronwright] can you take a look at this as well? > Support YARN proxy user in Flink (impersonation) > > > Key: FLINK-7860 > URL: https://issues.apache.org/jira/browse/FLINK-7860 > Project: Flink > Issue Type: New Feature > Components: YARN >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8286) Investigate Flink-Yarn-Kerberos integration for flip-6
Shuyi Chen created FLINK-8286: - Summary: Investigate Flink-Yarn-Kerberos integration for flip-6 Key: FLINK-8286 URL: https://issues.apache.org/jira/browse/FLINK-8286 Project: Flink Issue Type: Task Components: Security Reporter: Shuyi Chen Assignee: Shuyi Chen Priority: Blocker Fix For: 1.5.0 We've found some issues with the Flink-Yarn-Kerberos integration in the current deployment model, we will need to investigate and test it for flip-6 when it's ready. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7860) Support YARN proxy user in Flink (impersonation)
[ https://issues.apache.org/jira/browse/FLINK-7860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16294694#comment-16294694 ] Shuyi Chen edited comment on FLINK-7860 at 12/18/17 9:01 PM: - I am proposing adding the following new options: security.kerberos.login.proxyuser.principal: the proxy user's principal security.kerberos.login.proxyuser.keytab: the proxy user's keytab path In the client code, it will use security.kerberos.login.principal and security.kerberos.login.keytab to login and impersonate the proxy user. Before the appMaster and container launch, in the launch context, set security.kerberos.login.principal to the value of security.kerberos.login.proxyuser.principal, set security.kerberos.login.keytab to the value of security.kerberos.login.proxyuser.keytab. So in the appMaster and container, it will always use the proxy user's credential. was (Author: suez1224): I am proposing adding the following new options: security.kerberos.login.proxyuser.principal: the proxy user's principal security.kerberos.login.proxyuser.keytab: the proxy user's keytab path In the client code, it will use security.kerberos.login.principal and security.kerberos.login.keytab to login and impersonate the proxy user. Before the appMaster and container launch, set security.kerberos.login.principal to the value of security.kerberos.login.proxyuser.principal, set security.kerberos.login.keytab to the value of security.kerberos.login.proxyuser.keytab. So in the appMaster and container, it will always use the proxy user's credential. > Support YARN proxy user in Flink (impersonation) > > > Key: FLINK-7860 > URL: https://issues.apache.org/jira/browse/FLINK-7860 > Project: Flink > Issue Type: New Feature > Components: YARN >Reporter: Shuyi Chen >Assignee: Shuyi Chen > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295518#comment-16295518 ] Tzu-Li (Gordon) Tai edited comment on FLINK-8285 at 12/18/17 7:26 PM: -- Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ ... Yes, the {{DataStream}} API does not currently officially support a {{collect}} counterpart. It could make sense, though, to move that from {{flink-streaming-contribs}} to be supported in the {{DataStream}} API out-of-the-box. There is already some discussion going on in dismantling the {{flink-streaming-contribs}} module, so we can keep this in mind. was (Author: tzulitai): Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ ... Yes, the {{DataStream}} API does not currently officially support a {{collect}} counterpart. It could make sense, though, to move that from {{flink-streaming-contribs}} to be supported in the {{DataStream}} API out-of-the-box. There is already some discussion going on in dismantling the {{flink-streaming-contribs}} module. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295518#comment-16295518 ] Tzu-Li (Gordon) Tai commented on FLINK-8285: Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ Yes, the {{DataStream}} API does not currently officially support a {{collect}} counterpart. It could make sense, though, to move that from {{flink-streaming-contribs}} to be supported in the {{DataStream}} API out-of-the-box. There is already some discussion going on in dismantling the {{flink-streaming-contribs}} module. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295518#comment-16295518 ] Tzu-Li (Gordon) Tai edited comment on FLINK-8285 at 12/18/17 7:24 PM: -- Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ ... Yes, the {{DataStream}} API does not currently officially support a {{collect}} counterpart. It could make sense, though, to move that from {{flink-streaming-contribs}} to be supported in the {{DataStream}} API out-of-the-box. There is already some discussion going on in dismantling the {{flink-streaming-contribs}} module. was (Author: tzulitai): Ah, sorry, I somehow was thinking of the {{DataSet}} API :/ Yes, the {{DataStream}} API does not currently officially support a {{collect}} counterpart. It could make sense, though, to move that from {{flink-streaming-contribs}} to be supported in the {{DataStream}} API out-of-the-box. There is already some discussion going on in dismantling the {{flink-streaming-contribs}} module. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working
[ https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295515#comment-16295515 ] ASF GitHub Bot commented on FLINK-8275: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5172 This PR probably fixes the problem, but it would be good to address the deeper problem that the code is confusing. At least we could add some commentary to the code. The specific problems, in my view, are: 1. A filename is transmitted from client -> AM -> TM in the env variable `_KEYTAB_PATH` but the value doesn't appear to be used. In effect it is a flag asserting that a keytab named `krb5.keytab` is available. Alternatives: a. Use `krb5.keytab` as the value. b. Eliminate the env check and simply look for the file; if present, use it. 2. The existence of the "integration test code" has an unclear purpose. It mutates the Hadoop configuration, why? Is the code active in any production scenario? Note that `YarnTaskExecutorRunner` implements this in a slightly different way, and should be re-tested for 1.5.0 (since I don't think it is in use yet). > Flink YARN deployment with Kerberos enabled not working > > > Key: FLINK-8275 > URL: https://issues.apache.org/jira/browse/FLINK-8275 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.4.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0 > > > The local keytab path in YarnTaskManagerRunner is incorrectly set to the > ApplicationMaster's local keytab path. This causes jobs to fail because the > TaskManager can't read the keytab. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5172: [FLINK-8275] [Security] fix keytab local path in YarnTask...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5172 This PR probably fixes the problem, but it would be good to address the deeper problem that the code is confusing. At least we could add some commentary to the code. The specific problems, in my view, are: 1. A filename is transmitted from client -> AM -> TM in the env variable `_KEYTAB_PATH` but the value doesn't appear to be used. In effect it is a flag asserting that a keytab named `krb5.keytab` is available. Alternatives: a. Use `krb5.keytab` as the value. b. Eliminate the env check and simply look for the file; if present, use it. 2. The existence of the "integration test code" has an unclear purpose. It mutates the Hadoop configuration, why? Is the code active in any production scenario? Note that `YarnTaskExecutorRunner` implements this in a slightly different way, and should be re-tested for 1.5.0 (since I don't think it is in use yet). ---
[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295507#comment-16295507 ] Julio Biason commented on FLINK-8285: - [~tzulitai] Yeah, it's not relevant, because there is no {{DataStream.collect()}}: {{{ [error] tests.scala:28: value collect is not a member of org.apache.flink.streaming.api.scala.DataStream[org.azion.com.models.metrics.metrictuple.Metric] [error] val output = pipeline.collect() }}} > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295496#comment-16295496 ] Julio Biason commented on FLINK-8285: - [~tzulitai] Actually, it is relevant: It could completely replace the section, because DataStream.collect() is not mentioned anywhere else in the page. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working
[ https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295493#comment-16295493 ] ASF GitHub Bot commented on FLINK-8275: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5172 @suez1224 keep in mind, that contribution PRs should initially have one commit with the commit message appropriately set (the title of the PR would be a good commit message for your case). > Flink YARN deployment with Kerberos enabled not working > > > Key: FLINK-8275 > URL: https://issues.apache.org/jira/browse/FLINK-8275 > Project: Flink > Issue Type: Bug > Components: Security >Affects Versions: 1.4.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Fix For: 1.5.0 > > > The local keytab path in YarnTaskManagerRunner is incorrectly set to the > ApplicationMaster's local keytab path. This causes jobs to fail because the > TaskManager can't read the keytab. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5172: [FLINK-8275] [Security] fix keytab local path in YarnTask...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5172 @suez1224 keep in mind, that contribution PRs should initially have one commit with the commit message appropriately set (the title of the PR would be a good commit message for your case). ---
[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295484#comment-16295484 ] Tzu-Li (Gordon) Tai commented on FLINK-8285: [~JBiason] just as a side note irrelevant to this JIRA, {{DataStream.collect()}} should also work for what you are trying to do. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295479#comment-16295479 ] Julio Biason commented on FLINK-8285: - Well, I was expecting to use the DataStreamUtils.colect to retrieve the processing result in a test, starting with a localEnvironment inside the test and using a Seq as input; since I split the pipeline creation on its own function, I could throw any DataStream and check only the final result, as an E2E test. But yeah, if it's using unstable package, it shouldn't even be mentioned in the docs. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295463#comment-16295463 ] Tzu-Li (Gordon) Tai commented on FLINK-8285: I'm not knowledgable of the full context here, but IMO maybe the docs should not be demonstrating utilities that is available only via the {{flink-streaming-contribs}} module. Code there is not really maintained and is considered instable. > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8285) Iterator Data Sink doesn't mention required module
[ https://issues.apache.org/jira/browse/FLINK-8285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Julio Biason updated FLINK-8285: Description: In the docs about [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], it's mentioned that one could use an interator for retrieving the result of the stream. But there is no mention of any external packages (as it happens with some examples in the metrics) and trying to use causes an error: [error] object contrib is not a member of package org.apache.flink [error] import org.apache.flink.contrib.streaming.DataStreamUtils The line in question (as copied'n'pasted directly from the examples): import org.apache.flink.contrib.streaming.DataStreamUtils (PS: Source is in Scala, not Java) was: In the docs about [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], it's mentioned that one could use an interator for retrieving the result of the stream. But there is no mention of any external packages (as it happens with some examples in the metrics) and trying to use causes an error: [error] object contrib is not a member of package org.apache.flink [error] import org.apache.flink.contrib.streaming.DataStreamUtils The line in question (as copied'n'pasted directly from the examples): import org.apache.flink.contrib.streaming.DataStreamUtils > Iterator Data Sink doesn't mention required module > -- > > Key: FLINK-8285 > URL: https://issues.apache.org/jira/browse/FLINK-8285 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0 > Environment: Linux CentOS/7 >Reporter: Julio Biason > > In the docs about > [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], > it's mentioned that one could use an interator for retrieving the result of > the stream. > But there is no mention of any external packages (as it happens with some > examples in the metrics) and trying to use causes an error: > [error] object contrib is not a member of package org.apache.flink > [error] import org.apache.flink.contrib.streaming.DataStreamUtils > The line in question (as copied'n'pasted directly from the examples): > import org.apache.flink.contrib.streaming.DataStreamUtils > (PS: Source is in Scala, not Java) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8285) Iterator Data Sink doesn't mention required module
Julio Biason created FLINK-8285: --- Summary: Iterator Data Sink doesn't mention required module Key: FLINK-8285 URL: https://issues.apache.org/jira/browse/FLINK-8285 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.4.0 Environment: Linux CentOS/7 Reporter: Julio Biason In the docs about [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/datastream_api.html#iterator-data-sink], it's mentioned that one could use an interator for retrieving the result of the stream. But there is no mention of any external packages (as it happens with some examples in the metrics) and trying to use causes an error: [error] object contrib is not a member of package org.apache.flink [error] import org.apache.flink.contrib.streaming.DataStreamUtils The line in question (as copied'n'pasted directly from the examples): import org.apache.flink.contrib.streaming.DataStreamUtils -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8240) Create unified interfaces to configure and instatiate TableSources
[ https://issues.apache.org/jira/browse/FLINK-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295353#comment-16295353 ] Timo Walther commented on FLINK-8240: - Hi everyone, I think we don't need a design document for it but it would be great to hear some opinions. I introduced descriptors that allow to describe connectors, encoding, and time attributes. My current API design looks like: {code} tableEnv .from( FileSystem() .path("/path/to/csv")) .withEncoding( CSV() .field("myfield", Types.STRING) .field("myfield2", Types.INT) .quoteCharacter(';') .fieldDelimiter("#") .lineDelimiter("\r\n") .commentPrefix("%%") .ignoreFirstLine() .ignoreParseErrors()) .withRowtime( Rowtime() .onField("rowtime") .withTimestampFromDataStream() .withWatermarkFromDataStream()) .withProctime( Proctime() .onField("myproctime")) .toTableSource() {code} These descriptors are converted into pure key-value properties. Such as: {code} "connector.filesystem.path" -> "/myfile" "encoding.csv.fields.0.name" -> "field1", "encoding.csv.fields.0.type" -> "STRING", "encoding.csv.fields.1.name" -> "field2", "encoding.csv.fields.1.type" -> "TIMESTAMP", "encoding.csv.fields.2.name" -> "field3", "encoding.csv.fields.2.type" -> "ANY(java.lang.Class)", "encoding.csv.fields.3.name" -> "field4", "encoding.csv.fields.3.type" -> "ROW(test INT, row VARCHAR)", "encoding.csv.line-delimiter" -> "^" {code} The properties are fully expressed as strings. This allows to save them also in configuration files. Which might be interesting for FLINK-7594. The question is how do we want to translate the properties into actual table sources. Or more precisely: How do we want to supply converters? Should they be part of the {{TableSource}} interface? Or should table sources be annotated with some factory class? Right now we have a similar functionality for external catalogs but this is too specific and does not consider encodings or time attributes. Furthermore, it would be better to use Java {{ServiceLoader}}s instead of classpath scanning. This is also used for Flink's file systems. So my idea would be to have a class {{TableFactory}} that declares a connector e.g. "kafka_0.10" and supported encodings "csv", "avro" (similar to FLINK-7643). All built-in table sources need to provide such a factory. What do you think? [~fhueske] [~jark] [~wheat9] [~ykt836] > Create unified interfaces to configure and instatiate TableSources > -- > > Key: FLINK-8240 > URL: https://issues.apache.org/jira/browse/FLINK-8240 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther > > At the moment every table source has different ways for configuration and > instantiation. Some table source are tailored to a specific encoding (e.g., > {{KafkaAvroTableSource}}, {{KafkaJsonTableSource}}) or only support one > encoding for reading (e.g., {{CsvTableSource}}). Each of them might implement > a builder or support table source converters for external catalogs. > The table sources should have a unified interface for discovery, defining > common properties, and instantiation. The {{TableSourceConverters}} provide a > similar functionality but use an external catalog. We might generialize this > interface. > In general a table source declaration depends on the following parts: > {code} > - Source > - Type (e.g. Kafka, Custom) > - Properties (e.g. topic, connection info) > - Encoding > - Type (e.g. Avro, JSON, CSV) > - Schema (e.g. Avro class, JSON field names/types) > - Rowtime descriptor/Proctime > - Watermark strategy and Watermark properties > - Time attribute info > - Bucketization > {code} > This issue needs a design document before implementation. Any discussion is > very welcome. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5176: [FLINK-8279][blob] fall back to TaskManager temp director...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5176 @NicoK was this PR a response to an actual issue, and did it resolve it? Do I understand correctly that this will cause the JM (blob server) to use `taskmanager.tmp.dirs`? In addition to the TM blob cache, of course. For the TM on YARN, is there really any effect? Looking at MAPREDUCE-6472, seems the temp folder is already within the container. Can you explain the actual effect? Thanks. Note that the Mesos implementation doesn't actually configure `taskmanager.tmp.dirs` at this time (though there's some dead code in TM). A proper treatment of tmp folders on Mesos would involve the use of volumes. ---
[jira] [Commented] (FLINK-8279) Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp directories
[ https://issues.apache.org/jira/browse/FLINK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295317#comment-16295317 ] ASF GitHub Bot commented on FLINK-8279: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/5176 @NicoK was this PR a response to an actual issue, and did it resolve it? Do I understand correctly that this will cause the JM (blob server) to use `taskmanager.tmp.dirs`? In addition to the TM blob cache, of course. For the TM on YARN, is there really any effect? Looking at MAPREDUCE-6472, seems the temp folder is already within the container. Can you explain the actual effect? Thanks. Note that the Mesos implementation doesn't actually configure `taskmanager.tmp.dirs` at this time (though there's some dead code in TM). A proper treatment of tmp folders on Mesos would involve the use of volumes. > Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp > directories > - > > Key: FLINK-8279 > URL: https://issues.apache.org/jira/browse/FLINK-8279 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Currently, the BLOB server and cache processes (temporarily) stash incoming > files into their local file system in the directory given by the > {{blob.storage.directory}} configuration property. If this property is not > set or empty, it will fall back to {{java.io.tmpdir}}. > Instead, in a Mesos/YARN environment, we could use the temporary directories > they assigned to the Flink job which are not only the proper folder to use, > but may also offer some more space. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6065) Make TransportClient for ES5 pluggable
[ https://issues.apache.org/jira/browse/FLINK-6065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295316#comment-16295316 ] ASF GitHub Bot commented on FLINK-6065: --- Github user sschaef closed the pull request at: https://github.com/apache/flink/pull/3934 > Make TransportClient for ES5 pluggable > -- > > Key: FLINK-6065 > URL: https://issues.apache.org/jira/browse/FLINK-6065 > Project: Flink > Issue Type: Improvement > Components: ElasticSearch Connector, Streaming Connectors >Reporter: Robert Metzger > > This JIRA is based on a user request: > http://stackoverflow.com/questions/42807454/flink-xpack-elasticsearch-5-elasticsearchsecurityexception-missing-autentication?noredirect=1#comment72728053_42807454 > Currently, in the {{Elasticsearch5ApiCallBridge}} the > {{PreBuiltTransportClient}} is hardcoded. It would be nice to make this > client pluggable to allow using other clients such as the > {{PreBuiltXPackTransportClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3934: [FLINK-6065] Add initClient method to Elasticsearc...
Github user sschaef closed the pull request at: https://github.com/apache/flink/pull/3934 ---
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295303#comment-16295303 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544965 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -161,6 +172,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private) > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295299#comment-16295299 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157545208 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -237,6 +243,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + if (buffer != null && buffer.isBuffer()) { + synchronized (buffers) { + buffersInBacklog--; + } + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private) > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295305#comment-16295305 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157541024 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception { try { subpartition.release(); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- same here - please test with a real `Buffer` instance > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295300#comment-16295300 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157538818 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception { partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- good, can you also add the backlog correctness checks to the `reader.getNextBuffer()` lines below to ensure they are correct after taking buffers out? > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295307#comment-16295307 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157548033 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- I shortly thought about relying on `buffers.size()` here to reduce complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may show some race conditions then without synchronisation. However, if we picked up the idea again of returning the backlog size with the buffer itself (which is retrieved under the lock), i.e. similar to `BufferAndAvailability` being returned by the `SequenceNumberingViewReader`, this would work and we would not need the `volatile` here. Since you split the implementations into `PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a viable approach again. What do you think? What would you prefer? > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295301#comment-16295301 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157539147 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -239,6 +261,10 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception // Spill now assertEquals(2, partition.releaseMemory()); + // still same statistics: + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- same here - please add the checks to the `reader.getNextBuffer()` lines below > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295306#comment-16295306 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157540910 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- Actually, this never increases the backlog, even if the subpartition is not finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. Can you test with a real `Buffer` instead? > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295304#comment-16295304 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544794 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- I'm not quite sure the latter two methods should be in `ResultSubpartition` now since they are quite internal. `increaseBuffersInBacklog()` is only called by `PipelinedSubpartition` and `SpillableSubpartition`. `decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable subpartition views and therefore could be package-private in `SpillableSubpartition` only. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295308#comment-16295308 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157548895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -77,6 +78,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- If the interface of `getNextBuffer()` was changed as suggested above, we could remove the `volatile` here as well. > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7468) Implement sender backlog logic for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295302#comment-16295302 ] ASF GitHub Bot commented on FLINK-7468: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157538061 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + // ...should have resulted in a notification verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result assertNotNull(view.getNextBuffer()); assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); // Add data to the queue... subpartition.add(createBuffer()); + + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + + // Add event to the queue... + Buffer event = createBuffer(); + event.tagAsEvent(); + subpartition.add(event); + + assertEquals(3, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); --- End diff -- good catch - the event-adding path was not tested yet > Implement sender backlog logic for credit-based > --- > > Key: FLINK-7468 > URL: https://issues.apache.org/jira/browse/FLINK-7468 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.5.0 > > > This is a part of work for credit-based network flow control. > Receivers should know how many buffers are available on the sender side (the > backlog). The receivers use this information to decide how to distribute > floating buffers. > The {{ResultSubpartition}} maintains the backlog which only indicates the > number of buffers in this subpartition, not including the number of events. > The backlog is increased for adding buffer to this subpartition, and > decreased for polling buffer from it. > The backlog is attached in {{BufferResponse}} by sender as an absolute value > after the buffer being transferred. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544965 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -161,6 +172,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog--; + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private) ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157548033 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java --- @@ -52,6 +54,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- I shortly thought about relying on `buffers.size()` here to reduce complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may show some race conditions then without synchronisation. However, if we picked up the idea again of returning the backlog size with the buffer itself (which is retrieved under the lock), i.e. similar to `BufferAndAvailability` being returned by the `SequenceNumberingViewReader`, this would work and we would not need the `volatile` here. Since you split the implementations into `PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a viable approach again. What do you think? What would you prefer? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157540910 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception { try { subpartition.finish(); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(4, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- Actually, this never increases the backlog, even if the subpartition is not finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. Can you test with a real `Buffer` instead? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157539147 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -239,6 +261,10 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception // Spill now assertEquals(2, partition.releaseMemory()); + // still same statistics: + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(2, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- same here - please add the checks to the `reader.getNextBuffer()` lines below ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157538061 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java --- @@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { // Add data to the queue... subpartition.add(createBuffer()); + assertEquals(1, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); + // ...should have resulted in a notification verify(listener, times(1)).notifyBuffersAvailable(eq(1L)); // ...and one available result assertNotNull(view.getNextBuffer()); assertNull(view.getNextBuffer()); + assertEquals(0, subpartition.getBuffersInBacklog()); // Add data to the queue... subpartition.add(createBuffer()); + + assertEquals(2, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); + assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); verify(listener, times(2)).notifyBuffersAvailable(eq(1L)); + + // Add event to the queue... + Buffer event = createBuffer(); + event.tagAsEvent(); + subpartition.add(event); + + assertEquals(3, subpartition.getTotalNumberOfBuffers()); + assertEquals(1, subpartition.getBuffersInBacklog()); --- End diff -- good catch - the event-adding path was not tested yet ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157548895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -77,6 +78,10 @@ /** Flag indicating whether the subpartition has been released. */ private volatile boolean isReleased; + /** The number of non-event buffers currently in this subpartition */ + @GuardedBy("buffers") + private volatile int buffersInBacklog; --- End diff -- If the interface of `getNextBuffer()` was changed as suggested above, we could remove the `volatile` here as well. ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157541024 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java --- @@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception { try { subpartition.release(); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); + assertEquals(0, subpartition.getTotalNumberOfBytes()); + assertFalse(subpartition.add(mock(Buffer.class))); + assertEquals(0, subpartition.getTotalNumberOfBuffers()); + assertEquals(0, subpartition.getBuffersInBacklog()); --- End diff -- same here - please test with a real `Buffer` instance ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157545208 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -237,6 +243,29 @@ public boolean isReleased() { return isReleased; } + @Override + public int getBuffersInBacklog() { + return buffersInBacklog; + } + + @Override + public void decreaseBuffersInBacklog(Buffer buffer) { + if (buffer != null && buffer.isBuffer()) { + synchronized (buffers) { + buffersInBacklog--; + } + } + } + + @Override + public void increaseBuffersInBacklog(Buffer buffer) { + assert Thread.holdsLock(buffers); + + if (buffer != null && buffer.isBuffer()) { + buffersInBacklog++; + } + } --- End diff -- please check the access-level (the latter two could be private) ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157538818 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception { partition.add(buffer); partition.add(buffer); + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + + assertFalse(buffer.isRecycled()); assertEquals(3, partition.releaseMemory()); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) + // still same statistics + assertEquals(3, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3, partition.getTotalNumberOfBytes()); + partition.finish(); + // + one EndOfPartitionEvent + assertEquals(4, partition.getTotalNumberOfBuffers()); + assertEquals(3, partition.getBuffersInBacklog()); + assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes()); --- End diff -- good, can you also add the backlog correctness checks to the `reader.getNextBuffer()` lines below to ensure they are correct after taking buffers out? ---
[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...
Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4559#discussion_r157544794 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java --- @@ -99,6 +82,23 @@ protected Throwable getFailureCause() { abstract public boolean isReleased(); + /** +* Gets the number of non-event buffers in this subpartition. +*/ + abstract public int getBuffersInBacklog(); + + /** +* Decreases the number of non-event buffers by one after fetching a non-event +* buffer from this subpartition. +*/ + abstract public void decreaseBuffersInBacklog(Buffer buffer); + + /** +* Increases the number of non-event buffers by one after adding a non-event +* buffer into this subpartition. +*/ + abstract public void increaseBuffersInBacklog(Buffer buffer); --- End diff -- I'm not quite sure the latter two methods should be in `ResultSubpartition` now since they are quite internal. `increaseBuffersInBacklog()` is only called by `PipelinedSubpartition` and `SpillableSubpartition`. `decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable subpartition views and therefore could be package-private in `SpillableSubpartition` only. ---
[jira] [Commented] (FLINK-8279) Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp directories
[ https://issues.apache.org/jira/browse/FLINK-8279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295290#comment-16295290 ] ASF GitHub Bot commented on FLINK-8279: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/5176#discussion_r157548338 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -127,21 +132,31 @@ private static BlobStoreService createFileSystemBlobStore(Configuration configur } /** -* Creates a local storage directory for a blob service under the given parent directory. +* Creates a local storage directory for a blob service under the configuration parameter given +* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is null or empty, we will +* fall back to the TaskManager temp directories (given by +* {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn falls back to +* {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set to +* java.io.tmpdir) and choose one among them at random. * -* @param basePath -* base path, i.e. parent directory, of the storage directory to use (if null or -* empty, the path in java.io.tmpdir will be used) +* @param config +* Flink configuration * * @return a new local storage directory * * @throws IOException * thrown if the local file storage cannot be created or is not usable */ - static File initLocalStorageDirectory(String basePath) throws IOException { + static File initLocalStorageDirectory(Configuration config) throws IOException { + + String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY); + File baseDir; if (StringUtils.isNullOrWhitespaceOnly(basePath)) { - baseDir = new File(System.getProperty("java.io.tmpdir")); + final String[] tmpDirPaths = config.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); --- End diff -- Consider encapsulating this parsing logic into `TaskManagerServicesConfiguration` or similar. > Use Mesos/YARN temp directories as fallback for BlobServer/Cache temp > directories > - > > Key: FLINK-8279 > URL: https://issues.apache.org/jira/browse/FLINK-8279 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > Currently, the BLOB server and cache processes (temporarily) stash incoming > files into their local file system in the directory given by the > {{blob.storage.directory}} configuration property. If this property is not > set or empty, it will fall back to {{java.io.tmpdir}}. > Instead, in a Mesos/YARN environment, we could use the temporary directories > they assigned to the Flink job which are not only the proper folder to use, > but may also offer some more space. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5176: [FLINK-8279][blob] fall back to TaskManager temp d...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/5176#discussion_r157548338 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -127,21 +132,31 @@ private static BlobStoreService createFileSystemBlobStore(Configuration configur } /** -* Creates a local storage directory for a blob service under the given parent directory. +* Creates a local storage directory for a blob service under the configuration parameter given +* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is null or empty, we will +* fall back to the TaskManager temp directories (given by +* {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn falls back to +* {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set to +* java.io.tmpdir) and choose one among them at random. * -* @param basePath -* base path, i.e. parent directory, of the storage directory to use (if null or -* empty, the path in java.io.tmpdir will be used) +* @param config +* Flink configuration * * @return a new local storage directory * * @throws IOException * thrown if the local file storage cannot be created or is not usable */ - static File initLocalStorageDirectory(String basePath) throws IOException { + static File initLocalStorageDirectory(Configuration config) throws IOException { + + String basePath = config.getString(BlobServerOptions.STORAGE_DIRECTORY); + File baseDir; if (StringUtils.isNullOrWhitespaceOnly(basePath)) { - baseDir = new File(System.getProperty("java.io.tmpdir")); + final String[] tmpDirPaths = config.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); --- End diff -- Consider encapsulating this parsing logic into `TaskManagerServicesConfiguration` or similar. ---
[jira] [Updated] (FLINK-8281) org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream cannot be cast to org.apache.flink.core.fs.WrappingProxyCloseable
[ https://issues.apache.org/jira/browse/FLINK-8281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8281: Description: {noformat} org.apache.flink.streaming.runtime.tasks.AsynchronousException: java.lang.Exception: Could not materialize checkpoint 1 for operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:945) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_151] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_151] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_151] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] Caused by: java.lang.Exception: Could not materialize checkpoint 1 for operator window: (TumblingGroupWindow('w$, 'RowTime, 6.millis)), select: (COUNT(*) AS api_call_count, SUM(bytes) AS total_bytes, SUM(numbers) AS total_numbers, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> select: (CAST(w$end) AS proc_end_time, api_call_count, total_bytes, total_numbers) -> to: Row (1/1). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:946) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] ... 5 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[na:1.8.0_151] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[na:1.8.0_151] at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] ... 5 common frames omitted Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:92) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939) ~[flink-connector-cdp_2.11-1.4.0-jar-with-dependencies.jar:na] ... 5 common frames omitted Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not open output stream for state backend at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89) ... 7 common frames omitted Caused by: java.io.IOException: Could not open output stream for state backend at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:371) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:228) at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:212) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:236) at
[jira] [Assigned] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai reassigned FLINK-8283: -- Assignee: Tzu-Li (Gordon) Tai > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Assignee: Tzu-Li (Gordon) Tai >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=131}=-915623761775, >
[jira] [Created] (FLINK-8284) Custom metrics not being exposed for Prometheus
Julio Biason created FLINK-8284: --- Summary: Custom metrics not being exposed for Prometheus Key: FLINK-8284 URL: https://issues.apache.org/jira/browse/FLINK-8284 Project: Flink Issue Type: Bug Components: Documentation, Metrics Affects Versions: 1.4.0 Environment: Linux/CentOS 7 Reporter: Julio Biason Following the documentation, we changed our filter that removes events with missing fields to a RichFilterFunction, so we can capture metrics about such events: {{public class MissingClientFilter extends RichFilterFunction { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext() .getMetricGroup() .addGroup("events") .counter("missingClient"); } @Override public boolean filter(LineData line) { String client = line.get("client").toString(); boolean missing = client.trim().equals(""); if (!missing) { this.count(); } return !missing; } private void count() { if (this.counter != null) { this.counter.inc(); } } }}} We also added Prometheus as our reporter: {{metrics.reporters: prom metrics.reporter.prom.port: 9105 metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter}} The problem is accessing port 9105 display all Flink metrics, but not ours. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
[ https://issues.apache.org/jira/browse/FLINK-8283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8283: --- Labels: test-stability (was: ) > FlinkKafkaConsumerBase failing on Travis with no output in 10min > > > Key: FLINK-8283 > URL: https://issues.apache.org/jira/browse/FLINK-8283 > Project: Flink > Issue Type: Bug > Components: Kafka Connector, Tests >Affects Versions: 1.5.0 >Reporter: Nico Kruber >Priority: Critical > Labels: test-stability > > Since a few days, Travis builds with the {{connectors}} profile keep failing > more often with no new output being received within 10 minutes. It seems to > start with the Travis build for > https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 > but may have been introduced earlier. The printed offsets look strange > though. > {code} > 16:33:12,508 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting > restore state in the FlinkKafkaConsumer: > {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, > KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} > 16:33:12,520 INFO > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - > Consumer subtask 2 will start reading 66 partitions with offsets in restored > state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=131}=-915623761775, > KafkaTopicPartition{topic='test-topic', partition=761}=-915623761775, >
[jira] [Created] (FLINK-8283) FlinkKafkaConsumerBase failing on Travis with no output in 10min
Nico Kruber created FLINK-8283: -- Summary: FlinkKafkaConsumerBase failing on Travis with no output in 10min Key: FLINK-8283 URL: https://issues.apache.org/jira/browse/FLINK-8283 Project: Flink Issue Type: Bug Components: Kafka Connector, Tests Affects Versions: 1.5.0 Reporter: Nico Kruber Priority: Critical Since a few days, Travis builds with the {{connectors}} profile keep failing more often with no new output being received within 10 minutes. It seems to start with the Travis build for https://github.com/apache/flink/commit/840cbfbf0845b60dbf02dd2f37f696f1db21b1e9 but may have been introduced earlier. The printed offsets look strange though. {code} 16:33:12,508 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Setting restore state in the FlinkKafkaConsumer: {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773, KafkaTopicPartition{topic='test-topic', partition=1}=-915623761773} 16:33:12,520 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 2 will start reading 66 partitions with offsets in restored state: {KafkaTopicPartition{topic='test-topic', partition=851}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=716}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=461}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=206}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=971}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=836}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=581}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=326}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=71}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=956}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=701}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=446}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=191}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=56}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=821}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=566}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=311}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=881}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=626}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=371}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=236}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=746}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=491}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=356}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=101}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=866}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=611}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=476}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=221}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=986}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=731}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=596}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=341}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=86}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=656}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=401}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=146}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=911}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=776}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=521}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=266}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=11}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=896}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=641}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=386}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=131}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=761}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=506}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=251}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=116}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=176}=-915623761775, KafkaTopicPartition{topic='test-topic', partition=941}=-915623761775, KafkaTopicPartition{topic='test-topic',
[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails
[ https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295207#comment-16295207 ] Timo Walther commented on FLINK-8282: - I don't know if this is needed. Actually, the {{transform()}} method is rather internal. [~aljoscha] what is your opinion here? > Transformation with TwoInputStreamOperator fails > > > Key: FLINK-8282 > URL: https://issues.apache.org/jira/browse/FLINK-8282 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Timo Walther > > The following program fails because of multiple reasons (see exceptions > below). The transformation with a {{TwoInputStreamOperator}} does not extend > {{AbstractStreamOperator}}. I think this is the main cause why it fails. > Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream ds1 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(100L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > DataStream ds2 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(200L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > ds1.connect(ds2.broadcast()).transform("test", Types.INT, new > TwoInputStreamOperator() { > @Override > public void processElement1(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processElement2(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processWatermark1(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processWatermark2(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processLatencyMarker1(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void processLatencyMarker2(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void setup(StreamTask containingTask, > StreamConfig config, Output output) { > } > @Override > public void open() throws Exception { > } > @Override > public void close() throws Exception { > } > @Override > public void dispose() throws Exception { > } > @Override > public OperatorSnapshotResult snapshotState(long > checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws > Exception { > return null; > } > @Override > public void initializeState(OperatorSubtaskState > stateHandles) throws Exception { > } > @Override > public void notifyOfCompletedCheckpoint(long > checkpointId) throws Exception { > } > @Override > public void setKeyContextElement1(StreamRecord > record) throws Exception { > } > @Override > public void
[jira] [Commented] (FLINK-8282) Transformation with TwoInputStreamOperator fails
[ https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295193#comment-16295193 ] Xingcan Cui commented on FLINK-8282: Hi [~twalthr], thanks for this ticket. I just wonder whether we could create an anonymous {{TwoInputStreamOperator}} directly since the Javadoc declares that to create a custom operator, we should use {{AbstractStreamOperator}} as a base class. > Transformation with TwoInputStreamOperator fails > > > Key: FLINK-8282 > URL: https://issues.apache.org/jira/browse/FLINK-8282 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Timo Walther > > The following program fails because of multiple reasons (see exceptions > below). The transformation with a {{TwoInputStreamOperator}} does not extend > {{AbstractStreamOperator}}. I think this is the main cause why it fails. > Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream ds1 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(100L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > DataStream ds2 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(200L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > ds1.connect(ds2.broadcast()).transform("test", Types.INT, new > TwoInputStreamOperator() { > @Override > public void processElement1(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processElement2(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processWatermark1(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processWatermark2(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processLatencyMarker1(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void processLatencyMarker2(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void setup(StreamTask containingTask, > StreamConfig config, Output output) { > } > @Override > public void open() throws Exception { > } > @Override > public void close() throws Exception { > } > @Override > public void dispose() throws Exception { > } > @Override > public OperatorSnapshotResult snapshotState(long > checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws > Exception { > return null; > } > @Override > public void initializeState(OperatorSubtaskState > stateHandles) throws Exception { > } > @Override > public void notifyOfCompletedCheckpoint(long > checkpointId) throws Exception { > } > @Override > public void setKeyContextElement1(StreamRecord > record) throws
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295132#comment-16295132 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5168 > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5168: [FLINK-8234][flip6] WIP
Github user GJL closed the pull request at: https://github.com/apache/flink/pull/5168 ---
[jira] [Created] (FLINK-8282) Transformation with TwoInputStreamOperator fails
Timo Walther created FLINK-8282: --- Summary: Transformation with TwoInputStreamOperator fails Key: FLINK-8282 URL: https://issues.apache.org/jira/browse/FLINK-8282 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Timo Walther The following program fails because of multiple reasons (see exceptions below). The transformation with a {{TwoInputStreamOperator}} does not extend {{AbstractStreamOperator}}. I think this is the main cause why it fails. Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. {code} final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream ds1 = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { ctx.emitWatermark(new Watermark(100L)); ctx.collect(12); while (true) Thread.yield(); } @Override public void cancel() { } }); DataStream ds2 = env.addSource(new SourceFunction() { @Override public void run(SourceContext ctx) throws Exception { ctx.emitWatermark(new Watermark(200L)); ctx.collect(12); while (true) Thread.yield(); } @Override public void cancel() { } }); ds1.connect(ds2.broadcast()).transform("test", Types.INT, new TwoInputStreamOperator() { @Override public void processElement1(StreamRecord element) throws Exception { System.out.println(); } @Override public void processElement2(StreamRecord element) throws Exception { System.out.println(); } @Override public void processWatermark1(Watermark mark) throws Exception { System.out.println(); } @Override public void processWatermark2(Watermark mark) throws Exception { System.out.println(); } @Override public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception { } @Override public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception { } @Override public void setup(StreamTask containingTask, StreamConfig config, Output output) { } @Override public void open() throws Exception { } @Override public void close() throws Exception { } @Override public void dispose() throws Exception { } @Override public OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { return null; } @Override public void initializeState(OperatorSubtaskState stateHandles) throws Exception { } @Override public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { } @Override public void setKeyContextElement1(StreamRecord record) throws Exception { } @Override public void setKeyContextElement2(StreamRecord record) throws Exception { } @Override public ChainingStrategy getChainingStrategy() { return null; } @Override public void setChainingStrategy(ChainingStrategy strategy) {
[jira] [Updated] (FLINK-8282) Transformation with TwoInputStreamOperator fails
[ https://issues.apache.org/jira/browse/FLINK-8282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8282: Affects Version/s: 1.4.0 > Transformation with TwoInputStreamOperator fails > > > Key: FLINK-8282 > URL: https://issues.apache.org/jira/browse/FLINK-8282 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Timo Walther > > The following program fails because of multiple reasons (see exceptions > below). The transformation with a {{TwoInputStreamOperator}} does not extend > {{AbstractStreamOperator}}. I think this is the main cause why it fails. > Either we fix the exceptions or we check for {{AbstractStreamOperator}} first. > {code} > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > DataStream ds1 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(100L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > DataStream ds2 = env.addSource(new > SourceFunction() { > @Override > public void run(SourceContext ctx) throws > Exception { > ctx.emitWatermark(new Watermark(200L)); > ctx.collect(12); > while (true) Thread.yield(); > } > @Override > public void cancel() { > } > }); > ds1.connect(ds2.broadcast()).transform("test", Types.INT, new > TwoInputStreamOperator() { > @Override > public void processElement1(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processElement2(StreamRecord > element) throws Exception { > System.out.println(); > } > @Override > public void processWatermark1(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processWatermark2(Watermark mark) throws > Exception { > System.out.println(); > } > @Override > public void processLatencyMarker1(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void processLatencyMarker2(LatencyMarker > latencyMarker) throws Exception { > } > @Override > public void setup(StreamTask containingTask, > StreamConfig config, Output output) { > } > @Override > public void open() throws Exception { > } > @Override > public void close() throws Exception { > } > @Override > public void dispose() throws Exception { > } > @Override > public OperatorSnapshotResult snapshotState(long > checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws > Exception { > return null; > } > @Override > public void initializeState(OperatorSubtaskState > stateHandles) throws Exception { > } > @Override > public void notifyOfCompletedCheckpoint(long > checkpointId) throws Exception { > } > @Override > public void setKeyContextElement1(StreamRecord > record) throws Exception { > } > @Override > public void setKeyContextElement2(StreamRecord > record) throws Exception { > } > @Override >
[jira] [Commented] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()
[ https://issues.apache.org/jira/browse/FLINK-7495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295128#comment-16295128 ] ASF GitHub Bot commented on FLINK-7495: --- Github user PedroMrChaves commented on the issue: https://github.com/apache/flink/pull/4621 This change needs to also be added to previous versions. I'm using version 1.3.2 and realised that the initializeState(..) function was not being called. > AbstractUdfStreamOperator#initializeState() should be called in > AsyncWaitOperator#initializeState() > --- > > Key: FLINK-7495 > URL: https://issues.apache.org/jira/browse/FLINK-7495 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Ted Yu >Assignee: Fang Yong >Priority: Minor > Fix For: 1.4.0 > > > {code} > recoveredStreamElements = context > .getOperatorStateStore() > .getListState(new ListStateDescriptor<>(STATE_NAME, > inStreamElementSerializer)); > {code} > Call to AbstractUdfStreamOperator#initializeState() should be added in the > beginning -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4621: [FLINK-7495] Call to AbstractUdfStreamOperator#initialize...
Github user PedroMrChaves commented on the issue: https://github.com/apache/flink/pull/4621 This change needs to also be added to previous versions. I'm using version 1.3.2 and realised that the initializeState(..) function was not being called. ---
[jira] [Commented] (FLINK-8278) Scala examples in Metric documentation do not compile
[ https://issues.apache.org/jira/browse/FLINK-8278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295027#comment-16295027 ] ASF GitHub Bot commented on FLINK-8278: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5177 [FLINK-8278] [doc] Fix the private member init problem for Scala examples in docs ## What is the purpose of the change This PR fixes the improper initialization problem for Scala private members in docs. ## Brief change log - Assigns the private members with the proper value "_". ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8278 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5177.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5177 commit f51787b81a8acc7aacea1ce7cafa94ff0d3ce70f Author: Xingcan CuiDate: 2017-12-18T13:55:30Z [FLINK-8278][doc]Scala examples in Metric documentation do not compile > Scala examples in Metric documentation do not compile > - > > Key: FLINK-8278 > URL: https://issues.apache.org/jira/browse/FLINK-8278 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.4.0, 1.3.2, 1.5.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > The Scala examples in the [Metrics > documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html] > do not compile. > The line > {code} > @transient private var counter: Counter > {code} > needs to be extended to > {code} > @transient private var counter: Counter = _ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5177: [FLINK-8278] [doc] Fix the private member init pro...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5177 [FLINK-8278] [doc] Fix the private member init problem for Scala examples in docs ## What is the purpose of the change This PR fixes the improper initialization problem for Scala private members in docs. ## Brief change log - Assigns the private members with the proper value "_". ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8278 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5177.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5177 commit f51787b81a8acc7aacea1ce7cafa94ff0d3ce70f Author: Xingcan CuiDate: 2017-12-18T13:55:30Z [FLINK-8278][doc]Scala examples in Metric documentation do not compile ---
[jira] [Commented] (FLINK-8234) Cache JobExecutionResult from finished JobManagerRunners
[ https://issues.apache.org/jira/browse/FLINK-8234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16295011#comment-16295011 ] ASF GitHub Bot commented on FLINK-8234: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5168#discussion_r157489280 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java --- @@ -357,6 +362,28 @@ public void start() throws Exception { return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort()); } + @Override + public CompletableFuture> getJobExecutionResult( + final JobID jobId, + final Time timeout) { + final Either jobExecutionResult = + jobExecutionResultCache.get(jobId); + if (jobExecutionResult == null) { + return FutureUtils.completedExceptionally(new JobExecutionResultNotFoundException(jobId)); + } else { + return CompletableFuture.completedFuture(jobExecutionResult); + } + } + + @Override + public CompletableFuture isJobExecutionResultPresent(final JobID jobId, final Time timeout) { + final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId); + if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } + return CompletableFuture.completedFuture(jobExecutionResultPresent); --- End diff -- But this would never return a future containing `false`. > Cache JobExecutionResult from finished JobManagerRunners > > > Key: FLINK-8234 > URL: https://issues.apache.org/jira/browse/FLINK-8234 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > In order to serve the {{JobExecutionResults}} we have to cache them in the > {{Dispatcher}} after the {{JobManagerRunner}} has finished. The cache should > have a configurable size and should periodically clean up stale entries in > order to avoid memory leaks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)