[jira] [Commented] (FLINK-8217) Properly annotate APIs of flink-connector-kinesis
[ https://issues.apache.org/jira/browse/FLINK-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284616#comment-16284616 ] ASF GitHub Bot commented on FLINK-8217: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5138 @greghogan this is a subtask of FLINK-8192. @tzulitai What do you think of having a FLIP to FLINK-8192? > Properly annotate APIs of flink-connector-kinesis > - > > Key: FLINK-8217 > URL: https://issues.apache.org/jira/browse/FLINK-8217 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector >Affects Versions: 1.5.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5138: [FLINK-8217] [Kinesis connector] Properly annotate APIs o...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5138 @greghogan this is a subtask of FLINK-8192. @tzulitai What do you think of having a FLIP to FLINK-8192? ---
[jira] [Commented] (FLINK-8227) Optimize the performance of SharedBufferSerializer
[ https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284570#comment-16284570 ] ASF GitHub Bot commented on FLINK-8227: --- GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5142 [FLINK-8227] Optimize the performance of SharedBufferSerializer ## What is the purpose of the change *This pull request optimize the performance of SharedBufferSerializer* ## Verifying this change This change is a performance improvement without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink optimize_sharedbuffer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5142.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5142 commit b586abec579ef7f251333032c9385d7e71f3799b Author: Dian FuDate: 2017-12-09T03:51:04Z [FLINK-8227] Optimize the performance of SharedBufferSerializer > Optimize the performance of SharedBufferSerializer > -- > > Key: FLINK-8227 > URL: https://issues.apache.org/jira/browse/FLINK-8227 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > > Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and > put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But > we obverse that in some cases the calculation of hashCode may become the > bottleneck. The performance will decrease as the number of > {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of > {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about > {{N * N}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5142: [FLINK-8227] Optimize the performance of SharedBuf...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5142 [FLINK-8227] Optimize the performance of SharedBufferSerializer ## What is the purpose of the change *This pull request optimize the performance of SharedBufferSerializer* ## Verifying this change This change is a performance improvement without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink optimize_sharedbuffer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5142.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5142 commit b586abec579ef7f251333032c9385d7e71f3799b Author: Dian FuDate: 2017-12-09T03:51:04Z [FLINK-8227] Optimize the performance of SharedBufferSerializer ---
[jira] [Created] (FLINK-8227) Optimize the performance of SharedBufferSerializer
Dian Fu created FLINK-8227: -- Summary: Optimize the performance of SharedBufferSerializer Key: FLINK-8227 URL: https://issues.apache.org/jira/browse/FLINK-8227 Project: Flink Issue Type: Bug Components: CEP Reporter: Dian Fu Assignee: Dian Fu Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But we obverse that in some cases the calculation of hashCode may become the bottleneck. The performance will decrease as the number of {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about {{N * N}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry
[ https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284560#comment-16284560 ] Dian Fu commented on FLINK-8226: It will cause exceptions such as when serializing the NFA. {code} Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(2017-12-05 09:49:05,2017-12-05 09:49:05,normal, 1512438545000, 0), 1) {code} > Dangling reference generated after NFA clean up timed out SharedBufferEntry > --- > > Key: FLINK-8226 > URL: https://issues.apache.org/jira/browse/FLINK-8226 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry
[ https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284558#comment-16284558 ] ASF GitHub Bot commented on FLINK-8226: --- GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5141 [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry …med out SharedBufferEntry *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## What is the purpose of the change *(For example: This pull request fix the issue that dangling reference generated after NFA clean up timed out SharedBufferEntry. Exception will be thrown when serializing NFA.* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added tests NFATest#testTimeoutWindowPruning2* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink dangling_ref Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5141 commit 982bfafaabcfbfd78f4fcbdd9438eab9c8be65bb Author: Dian FuDate: 2017-12-09T02:55:14Z [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry > Dangling reference generated after NFA clean up timed out SharedBufferEntry > --- > > Key: FLINK-8226 > URL: https://issues.apache.org/jira/browse/FLINK-8226 > Project: Flink > Issue Type: Bug > Components: CEP >Reporter: Dian Fu >Assignee: Dian Fu > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...
GitHub user dianfu opened a pull request: https://github.com/apache/flink/pull/5141 [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry â¦med out SharedBufferEntry *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## What is the purpose of the change *(For example: This pull request fix the issue that dangling reference generated after NFA clean up timed out SharedBufferEntry. Exception will be thrown when serializing NFA.* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *Added tests NFATest#testTimeoutWindowPruning2* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dianfu/flink dangling_ref Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5141.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5141 commit 982bfafaabcfbfd78f4fcbdd9438eab9c8be65bb Author: Dian FuDate: 2017-12-09T02:55:14Z [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry ---
[jira] [Created] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry
Dian Fu created FLINK-8226: -- Summary: Dangling reference generated after NFA clean up timed out SharedBufferEntry Key: FLINK-8226 URL: https://issues.apache.org/jira/browse/FLINK-8226 Project: Flink Issue Type: Bug Components: CEP Reporter: Dian Fu Assignee: Dian Fu -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables
[ https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284543#comment-16284543 ] ASF GitHub Bot commented on FLINK-7797: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5140 [FLINK-7797] [table] Add support for windowed outer joins for streaming tables ## What is the purpose of the change This PR adds support for windowed outer joins for streaming tables. ## Brief change log - Adjusts the plan translation logic to accept stream window outer join. - Adheres an ever emitted flag to each row. When a row is removed from the cache (or detected as not cached), a null padding join result will be emitted if necessary. - Adds a custom `JoinAwareCollector` to track whether there's a successfully joined result for both sides in each join loop. - Adds table/SQL translation tests, and also join integration tests. Since the runtime logic is built on the existing window inner join, no new harness tests are added. - Updates the SQL/Table API docs. ## Verifying this change This PR can be verified by the cases added in `JoinTest` and `JoinITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (**yes**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (**yes**) - If yes, how is the feature documented? (**removes the restriction notes**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-7797 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5140 commit 34d3fde8049ec407849b61901acd8258a6a1f919 Author: Xingcan CuiDate: 2017-12-07T17:28:40Z [FLINK-7797] [table] Add support for windowed outer joins for streaming tables > Add support for windowed outer joins for streaming tables > - > > Key: FLINK-7797 > URL: https://issues.apache.org/jira/browse/FLINK-7797 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, only windowed inner joins for streaming tables are supported. > This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER > joins. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5140 [FLINK-7797] [table] Add support for windowed outer joins for streaming tables ## What is the purpose of the change This PR adds support for windowed outer joins for streaming tables. ## Brief change log - Adjusts the plan translation logic to accept stream window outer join. - Adheres an ever emitted flag to each row. When a row is removed from the cache (or detected as not cached), a null padding join result will be emitted if necessary. - Adds a custom `JoinAwareCollector` to track whether there's a successfully joined result for both sides in each join loop. - Adds table/SQL translation tests, and also join integration tests. Since the runtime logic is built on the existing window inner join, no new harness tests are added. - Updates the SQL/Table API docs. ## Verifying this change This PR can be verified by the cases added in `JoinTest` and `JoinITCase`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (**yes**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (**yes**) - If yes, how is the feature documented? (**removes the restriction notes**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-7797 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5140.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5140 commit 34d3fde8049ec407849b61901acd8258a6a1f919 Author: Xingcan CuiDate: 2017-12-07T17:28:40Z [FLINK-7797] [table] Add support for windowed outer joins for streaming tables ---
[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface
[ https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284230#comment-16284230 ] ASF GitHub Bot commented on FLINK-8116: --- Github user ankitiitb1069 commented on a diff in the pull request: https://github.com/apache/flink/pull/5121#discussion_r155877893 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -34,17 +34,17 @@ * The run method can run for as long as necessary. The source must, however, react to an * invocation of {@link #cancel()} by breaking out of its main loop. * - * Checkpointed Sources + * CheckpointedFunction Sources * - * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} + * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * interface must ensure that state checkpointing, updating of internal state and emission of * elements are not done concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * * This is the basic pattern one should follow when implementing a (checkpointed) source: * * {@code - * public class ExampleSource implements SourceFunction, Checkpointed { + * public class ExampleSource implements SourceFunction, CheckpointedFunction { --- End diff -- Please check, I have made the changes, but could not make up what to write inside of these functions > Stale comments referring to Checkpointed interface > -- > > Key: FLINK-8116 > URL: https://issues.apache.org/jira/browse/FLINK-8116 > Project: Flink > Issue Type: Bug > Components: DataStream API, Documentation >Reporter: Gabor Gevay >Priority: Trivial > Labels: starter > Fix For: 1.5.0 > > > Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by > the {{CheckpointedFunction}} interface. > However, in {{SourceFunction}} there are two comments still referring to the > old {{Checkpointed}} interface. (The code examples there also need to be > modified.) > Note that the problem also occurs in {{StreamExecutionEnvironment}}, and > possibly other places as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...
Github user ankitiitb1069 commented on a diff in the pull request: https://github.com/apache/flink/pull/5121#discussion_r155877893 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java --- @@ -34,17 +34,17 @@ * The run method can run for as long as necessary. The source must, however, react to an * invocation of {@link #cancel()} by breaking out of its main loop. * - * Checkpointed Sources + * CheckpointedFunction Sources * - * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} + * Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * interface must ensure that state checkpointing, updating of internal state and emission of * elements are not done concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * * This is the basic pattern one should follow when implementing a (checkpointed) source: * * {@code - * public class ExampleSource implements SourceFunction, Checkpointed { + * public class ExampleSource implements SourceFunction, CheckpointedFunction { --- End diff -- Please check, I have made the changes, but could not make up what to write inside of these functions ---
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283741#comment-16283741 ] ASF GitHub Bot commented on FLINK-8220: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5134 I also think being more on the cautious side is actually a good thing. Given how scared and confused many people and companies are with respect to open source and accidentally infringement of IP, being cautious is good, in my opinion. There is an interesting comment from Ted Dunning on a related question, basically saying that if things could be a problem or not, think if some cautious/picky users might perceive them as a problem: https://lists.apache.org/thread.html/fc3992c13cc30f889c820d1cfd6be61b63a5be4efa7c9101262474c9@%3Clegal-discuss.apache.org%3E > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5134 I also think being more on the cautious side is actually a good thing. Given how scared and confused many people and companies are with respect to open source and accidentally infringement of IP, being cautious is good, in my opinion. There is an interesting comment from Ted Dunning on a related question, basically saying that if things could be a problem or not, think if some cautious/picky users might perceive them as a problem: https://lists.apache.org/thread.html/fc3992c13cc30f889c820d1cfd6be61b63a5be4efa7c9101262474c9@%3Clegal-discuss.apache.org%3E ---
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283734#comment-16283734 ] ASF GitHub Bot commented on FLINK-8220: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5134 @greg I think there is one more requirement by the ASF (not necessarily the Apache License itself), which is making sure that downstream consumers of Apache projects are always explicitly aware when they are consuming something (even when the build themselves) that has friction with the ASL 2.0. My preferred solution would still be a separate repository. Executing benchmarks during tests is pretty tricky anyways, because their result depends so much on where the benchmark is executed, and what happens concurrently on the machine, is it a VM or container, etc. > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5134 @greg I think there is one more requirement by the ASF (not necessarily the Apache License itself), which is making sure that downstream consumers of Apache projects are always explicitly aware when they are consuming something (even when the build themselves) that has friction with the ASL 2.0. My preferred solution would still be a separate repository. Executing benchmarks during tests is pretty tricky anyways, because their result depends so much on where the benchmark is executed, and what happens concurrently on the machine, is it a VM or container, etc. ---
[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4987#discussion_r155804733 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -757,6 +775,64 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { return CompletableFuture.completedFuture(executionGraph.getState()); } + //-- + // RestfulGateway RPC methods + //-- + + @Override + public CompletableFuture requestRestAddress(Time timeout) { + return restAddressFuture; + } + + @Override + public CompletableFuture requestJob(JobID jobId, Time timeout) { + if (Objects.equals(jobGraph.getJobID(), jobId)) { --- End diff -- When I see `Objects.equals`, I am assuming that it's possible that both arguments can be null. However, `jobGraph.getJobID()` is always non-null. ---
[jira] [Commented] (FLINK-8029) Create common WebMonitorEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283732#comment-16283732 ] ASF GitHub Bot commented on FLINK-8029: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4987#discussion_r155804733 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -757,6 +775,64 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { return CompletableFuture.completedFuture(executionGraph.getState()); } + //-- + // RestfulGateway RPC methods + //-- + + @Override + public CompletableFuture requestRestAddress(Time timeout) { + return restAddressFuture; + } + + @Override + public CompletableFuture requestJob(JobID jobId, Time timeout) { + if (Objects.equals(jobGraph.getJobID(), jobId)) { --- End diff -- When I see `Objects.equals`, I am assuming that it's possible that both arguments can be null. However, `jobGraph.getJobID()` is always non-null. > Create common WebMonitorEndpoint > > > Key: FLINK-8029 > URL: https://issues.apache.org/jira/browse/FLINK-8029 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to reuse the existing the REST handlers, we should create a common > {{WebMonitorEndpoint}} which is shared by the {{Dispatcher}} and the > {{JobMaster}} component. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8029) Create common WebMonitorEndpoint
[ https://issues.apache.org/jira/browse/FLINK-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283718#comment-16283718 ] ASF GitHub Bot commented on FLINK-8029: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4987#discussion_r155802027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java --- @@ -27,8 +27,6 @@ /** * Interface for a metric registry. - - LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath); --- End diff -- beautiful > Create common WebMonitorEndpoint > > > Key: FLINK-8029 > URL: https://issues.apache.org/jira/browse/FLINK-8029 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > In order to reuse the existing the REST handlers, we should create a common > {{WebMonitorEndpoint}} which is shared by the {{Dispatcher}} and the > {{JobMaster}} component. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4987#discussion_r155802027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java --- @@ -27,8 +27,6 @@ /** * Interface for a metric registry. - - LOG.debug("Started MetricQueryService under {}.", metricQueryServicePath); --- End diff -- beautiful ---
[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4
Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/5072 @greghogan, I saw the following in the `flink-core` POM: ``` org.xerial.snappy snappy-java ``` I think this PR just upgrades the snappy-java version from 1.1.1.3 to 1.1.4 for the "flink-core" module. I can not find out what bad effects it will cause.:) ---
[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4
[ https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283716#comment-16283716 ] ASF GitHub Bot commented on FLINK-7984: --- Github user yew1eb commented on the issue: https://github.com/apache/flink/pull/5072 @greghogan, I saw the following in the `flink-core` POM: ``` org.xerial.snappy snappy-java ``` I think this PR just upgrades the snappy-java version from 1.1.1.3 to 1.1.4 for the "flink-core" module. I can not find out what bad effects it will cause.:) > Bump snappy-java to 1.1.4 > - > > Key: FLINK-7984 > URL: https://issues.apache.org/jira/browse/FLINK-7984 > Project: Flink > Issue Type: Improvement > Components: Build System >Affects Versions: 1.4.0 >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 > Fix For: 1.5.0 > > > Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older > version has some issues like memory leak > (https://github.com/xerial/snappy-java/issues/91). > Snappy Java [Release > Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8030) Start JobMasterRestEndpoint in JobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-8030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283699#comment-16283699 ] ASF GitHub Bot commented on FLINK-8030: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4988#discussion_r155797999 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java --- @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler; +import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; +import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; +import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; +import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler; +import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; +import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; +import org.apache.flink.runtime.rest.messages.JobPlanHeaders; +import
[GitHub] flink pull request #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in ...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4988#discussion_r155797999 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java --- @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler; +import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; +import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; +import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; +import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler; +import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler; +import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; +import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; +import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; +import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; +import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders; +import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobConfigHeaders; +import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; +import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; +import org.apache.flink.runtime.rest.messages.JobPlanHeaders; +import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders; +import
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283657#comment-16283657 ] ASF GitHub Bot commented on FLINK-8220: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5134 @pnowojski see FLINK-2848 and FLINK-2973. Also the BSD + Patents conversation along with Flink's dependence on Amazon's Kinesis library (likewise [Category X](https://www.apache.org/legal/resolved.html#category-x)). I now think we have been overly cautious in this regard, not necessarily a bad thing. As @StephanEwen noted, the GPL-dependent code must be an [optional component](https://www.apache.org/legal/resolved.html#optional). We [do this](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html) with Kinesis by not releasing convenience binaries but requiring the user to compile from source with an explicit flag. I think we are safe to include a `flink-benchmarks` module which is not depended on or included in the Flink distribution. The benchmarks are Apache licensed, only the dependence on `jmh` prohibits distribution, so can be included in the (source) release. These are performance, not integration, tests so would be run manually rather than during the build. I agree with @StephanEwen's caution but we would not be releasing GPL code, it is not obvious that `jmh` is GPL licensed, and Java itself is GPL. > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5134 @pnowojski see FLINK-2848 and FLINK-2973. Also the BSD + Patents conversation along with Flink's dependence on Amazon's Kinesis library (likewise [Category X](https://www.apache.org/legal/resolved.html#category-x)). I now think we have been overly cautious in this regard, not necessarily a bad thing. As @StephanEwen noted, the GPL-dependent code must be an [optional component](https://www.apache.org/legal/resolved.html#optional). We [do this](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html) with Kinesis by not releasing convenience binaries but requiring the user to compile from source with an explicit flag. I think we are safe to include a `flink-benchmarks` module which is not depended on or included in the Flink distribution. The benchmarks are Apache licensed, only the dependence on `jmh` prohibits distribution, so can be included in the (source) release. These are performance, not integration, tests so would be run manually rather than during the build. I agree with @StephanEwen's caution but we would not be releasing GPL code, it is not obvious that `jmh` is GPL licensed, and Java itself is GPL. ---
[jira] [Commented] (FLINK-8192) Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving / @Internal
[ https://issues.apache.org/jira/browse/FLINK-8192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283523#comment-16283523 ] ASF GitHub Bot commented on FLINK-8192: --- Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5138 Changes to the public API require a [FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals). > Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving > / @Internal > - > > Key: FLINK-8192 > URL: https://issues.apache.org/jira/browse/FLINK-8192 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > Fix For: 1.5.0 > > > Currently, the APIs of the Flink connectors have absolutely no annotations on > whether their usage is {{Public}} / {{PublicEvolving}} / or {{Internal}}. > We have, for example, instances in the past where a user was mistakenly using > an abstract internal base class in the Elasticsearch connector. > This JIRA tracks the coverage of API usage annotation for all Flink shipped > connectors. Ideally, a separate subtask should be created for each individual > connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5138: [FLINK-8192] [Kinesis connector] Properly annotate APIs o...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5138 Changes to the public API require a [FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals). ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283481#comment-16283481 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155770364 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for the {@link SlotSharingManager}. + */ +public class SlotSharingManagerTest extends TestLogger { + + private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); + + private static final DummySlotOwner slotOwner = new DummySlotOwner(); + + private static final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions(); --- End diff -- This instance is mutable... should not be `static` > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155770364 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for the {@link SlotSharingManager}. + */ +public class SlotSharingManagerTest extends TestLogger { + + private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); + + private static final DummySlotOwner slotOwner = new DummySlotOwner(); + + private static final TestingAllocatedSlotActions allocatedSlotActions = new TestingAllocatedSlotActions(); --- End diff -- This instance is mutable... should not be `static` ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283477#comment-16283477 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155770104 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for the {@link SlotSharingManager}. + */ +public class SlotSharingManagerTest extends TestLogger { + + private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); --- End diff -- Should be `SLOT_SHARING_GROUP_ID` since it is a constant. > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155770104 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Test cases for the {@link SlotSharingManager}. + */ +public class SlotSharingManagerTest extends TestLogger { + + private static final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId(); --- End diff -- Should be `SLOT_SHARING_GROUP_ID` since it is a constant. ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283470#comment-16283470 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155768880 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155768880 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. + * The hierarchy starts with a root {@link MultiTaskSlot} which is a future + * {@link SlotContext} assigned. The {@link SlotContext} represents the allocated slot + * on the TaskExecutor in which all slots of this hierarchy run. A {@link MultiTaskSlot} + * can be assigned multiple {@link SingleTaskSlot} or {@link MultiTaskSlot} if and only if + * the task slot does not yet contain another child with the same {@link AbstractID} identifying + * the actual task or the co-location constraint. + * + * Normal slot sharing is represented by a root {@link MultiTaskSlot} which contains a set + * of {@link SingleTaskSlot} on the second layer. Each {@link SingleTaskSlot} represents a different + * task. + * + * Co-location constraints are modeled by adding a {@link MultiTaskSlot} to the root node. The co-location + * constraint is uniquely identified by a {@link AbstractID} such that we cannot add a second co-located + * {@link MultiTaskSlot} to the same root node. Now all co-located tasks will be added to co-located + * multi task slot. + */ +public class SlotSharingManager { + + private final SlotSharingGroupId slotSharingGroupId; + + // needed to release allocated slots after a complete multi task slot hierarchy has been released + private final AllocatedSlotActions allocatedSlotActions; + + // owner of the slots to which to return them when they are released from the outside + private final SlotOwner slotOwner; + + private final MapallTaskSlots; + + // Root nodes which have not been completed because the allocated slot is still pending + private final Map unresolvedRootSlots; + + // Root nodes which have been completed (the underlying allocated slot has been assigned) + private final
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283413#comment-16283413 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155758219 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. --- End diff -- nit: *leaf nodes* > Add support for scheduling with slot sharing > > > Key: FLINK-7956 > URL: https://issues.apache.org/jira/browse/FLINK-7956 > Project: Flink > Issue Type: Sub-task > Components: Scheduler >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > > In order to reach feature equivalence with the old code base, we should add > support for scheduling with slot sharing to the {{SlotPool}}. This will also > allow us to run all the IT cases based on the {{AbstractTestBase}} on the > Flip-6 {{MiniCluster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155758219 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotContext; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.Locality; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.AbstractCollection; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +/** + * Manager which is responsible for slot sharing. Slot sharing allows to run different + * tasks in the same slot and to realize co-location constraints. + * + * The SlotSharingManager allows to create a hierarchy of {@link TaskSlot} such that + * every {@link TaskSlot} is uniquely identified by a {@link SlotRequestId} identifying + * the request for the TaskSlot and a {@link AbstractID} identifying the task or the + * co-location constraint running in this slot. + * + * The {@link TaskSlot} hierarchy is implemented by {@link MultiTaskSlot} and + * {@link SingleTaskSlot}. The former class represents inner nodes which can contain + * a number of other {@link TaskSlot} and the latter class represents the leave nodes. --- End diff -- nit: *leaf nodes* ---
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283397#comment-16283397 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155754738 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155754738 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return slotAndLocalityFuture.thenApply( + (SlotAndLocality slotAndLocality) -> { + final AllocatedSlot allocatedSlot = slotAndLocality.getSlot(); + +
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283365#comment-16283365 ] ASF GitHub Bot commented on FLINK-8220: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5134 @greghogan That may or may not be possible, not 100% sure. Re-distributing it does certainly not work. To my current understanding, the Apache guidelines say we would need to make sure that downstream consumers of Flink (also the source) don't accidentally pull in a GPL license. For weak copyleft, where consuming it only via linking, it is okay, but for strong copyleft, where accidentally linking it results in copyleft, I think the hurdles for that are pretty high. There are some gray zone areas (like MPL) where the understanding is that "optional" dependency is okay because it requires an explicit action by a downstream user (re-adding the dependency) and the potential damage is not too high. I have seen a lot of cases first hand in the past months how many companies that want to use open source freak out as soon as they see an instance of GPL. We would make Flink's adoption easier if we did not have that. My first suggestion would be to consolidate this in a separate `flink-benchmarks` repository. That would also keep the core Flink repository "clear" of GPL. I could check with Apache legal for a final answer. > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5134 @greghogan That may or may not be possible, not 100% sure. Re-distributing it does certainly not work. To my current understanding, the Apache guidelines say we would need to make sure that downstream consumers of Flink (also the source) don't accidentally pull in a GPL license. For weak copyleft, where consuming it only via linking, it is okay, but for strong copyleft, where accidentally linking it results in copyleft, I think the hurdles for that are pretty high. There are some gray zone areas (like MPL) where the understanding is that "optional" dependency is okay because it requires an explicit action by a downstream user (re-adding the dependency) and the potential damage is not too high. I have seen a lot of cases first hand in the past months how many companies that want to use open source freak out as soon as they see an instance of GPL. We would make Flink's adoption easier if we did not have that. My first suggestion would be to consolidate this in a separate `flink-benchmarks` repository. That would also keep the core Flink repository "clear" of GPL. I could check with Apache legal for a final answer. ---
[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155751694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return slotAndLocalityFuture.thenApply( + (SlotAndLocality slotAndLocality) -> { + final AllocatedSlot allocatedSlot = slotAndLocality.getSlot(); + +
[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing
[ https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283364#comment-16283364 ] ASF GitHub Bot commented on FLINK-7956: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5091#discussion_r155751694 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -266,104 +279,367 @@ public void disconnectResourceManager() { // @Override - public CompletableFuture allocateSlot( - SlotRequestID requestId, - ScheduledUnit task, - ResourceProfile resources, - Iterable locationPreferences, + public CompletableFuture allocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit scheduledUnit, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling, Time timeout) { - return internalAllocateSlot(requestId, task, resources, locationPreferences); + return internalAllocateSlot( + slotRequestId, + scheduledUnit, + resourceProfile, + locationPreferences, + allowQueuedScheduling); } - @Override - public void returnAllocatedSlot(Slot slot) { - internalReturnAllocatedSlot(slot); + private CompletableFuture internalAllocateSlot( + SlotRequestId slotRequestId, + ScheduledUnit task, + ResourceProfile resourceProfile, + Collection locationPreferences, + boolean allowQueuedScheduling) { + + final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); + + if (slotSharingGroupId != null) { + // allocate slot with slot sharing + final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( + slotSharingGroupId, + id -> new SlotSharingManager( + id, + this, + providerAndOwner)); + + final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotFuture; + + try { + if (task.getCoLocationConstraint() != null) { + multiTaskSlotFuture = allocateCoLocatedMultiTaskSlot( + task.getCoLocationConstraint(), + multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } else { + multiTaskSlotFuture = allocateMultiTaskSlot( + task.getJobVertexId(), multiTaskSlotManager, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + } + } catch (NoResourceAvailableException noResourceException) { + return FutureUtils.completedExceptionally(noResourceException); + } + + // sanity check + Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId())); + + final SlotSharingManager.SingleTaskSlot leave = multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot( + slotRequestId, + task.getJobVertexId(), + multiTaskSlotFuture.getLocality()); + + return leave.getLogicalSlotFuture(); + } else { + // request an allocated slot to assign a single logical slot to + CompletableFuture slotAndLocalityFuture = requestAllocatedSlot( + slotRequestId, + resourceProfile, + locationPreferences, + allowQueuedScheduling); + + return
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283318#comment-16283318 ] ASF GitHub Bot commented on FLINK-8139: --- Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155746723 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected def checkValidTableName(name: String): Unit + /** +* Checks if the chosen table type is valid. +* @param table The table to check +*/ + protected def checkValidTableType(table: Table): Unit = { --- End diff -- I will do this > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283319#comment-16283319 ] ASF GitHub Bot commented on FLINK-8139: --- Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155746766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected def checkValidTableName(name: String): Unit + /** +* Checks if the chosen table type is valid. +* @param table The table to check +*/ + protected def checkValidTableType(table: Table): Unit = { +val types = table.getSchema.getTypes +checkTypeArray(types) + } + + private def checkTypeArray(types: Array[TypeInformation[_]]) = { +for (typeInfo <- types) { + if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType && +!typeInfo.isInstanceOf[PrimitiveArrayTypeInfo[_]]) { --- End diff -- I will do this. > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155746723 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected def checkValidTableName(name: String): Unit + /** +* Checks if the chosen table type is valid. +* @param table The table to check +*/ + protected def checkValidTableType(table: Table): Unit = { --- End diff -- I will do this ---
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155746766 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected def checkValidTableName(name: String): Unit + /** +* Checks if the chosen table type is valid. +* @param table The table to check +*/ + protected def checkValidTableType(table: Table): Unit = { +val types = table.getSchema.getTypes +checkTypeArray(types) + } + + private def checkTypeArray(types: Array[TypeInformation[_]]) = { +for (typeInfo <- types) { + if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType && +!typeInfo.isInstanceOf[PrimitiveArrayTypeInfo[_]]) { --- End diff -- I will do this. ---
[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink
[ https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283309#comment-16283309 ] ASF GitHub Bot commented on FLINK-8220: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5134 It seems like `commons-lang` doesn't have any issues with using `jmh` in tests: https://commons.apache.org/proper/commons-lang/dependencies.html However @greghogan do you remember my previous attempt with JMH in Flink? https://github.com/apache/flink/pull/4323 Was it a false alarm? > Implement set of network throughput benchmarks in Flink > --- > > Key: FLINK-8220 > URL: https://issues.apache.org/jira/browse/FLINK-8220 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > Benchmarks should be defined and implemented in flink project and they will > be executed in {{flink-benchmarks}} project. > Configurable parameters: number of record writers and number of channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5134 It seems like `commons-lang` doesn't have any issues with using `jmh` in tests: https://commons.apache.org/proper/commons-lang/dependencies.html However @greghogan do you remember my previous attempt with JMH in Flink? https://github.com/apache/flink/pull/4323 Was it a false alarm? ---
[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators
[ https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283304#comment-16283304 ] ASF GitHub Bot commented on FLINK-4812: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5125#discussion_r155742906 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java --- @@ -80,7 +91,10 @@ public void init() throws Exception { this.headOperator); // make sure that stream tasks report their I/O statistics - inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); + inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(), input1WatermarkGauge, input2WatermarkGauge); + + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); --- End diff -- Sure, I think that makes sense. > Report Watermark metrics in all operators > - > > Key: FLINK-4812 > URL: https://issues.apache.org/jira/browse/FLINK-4812 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Critical > Fix For: 1.5.0 > > > As reported by a user, Flink does currently not export the current low > watermark for sources > (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html). > This JIRA is for adding such a metric for the sources as well. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5125#discussion_r155742906 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java --- @@ -80,7 +91,10 @@ public void init() throws Exception { this.headOperator); // make sure that stream tasks report their I/O statistics - inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup()); + inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(), input1WatermarkGauge, input2WatermarkGauge); + + headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); --- End diff -- Sure, I think that makes sense. ---
[jira] [Commented] (FLINK-8224) Should shudownApplication when job terminated in job mode
[ https://issues.apache.org/jira/browse/FLINK-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283302#comment-16283302 ] ASF GitHub Bot commented on FLINK-8224: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5139 [FLINK-8224] [runtime] shutdown application when job terminated in job mode ## What is the purpose of the change This current job cluster entrypoint doesn't call resource manage to shutdown the application. So resource manger has no change to set the application status to the outer resource management system such as YARN/Mesos. This may make the YARN still consider the application as running even the job is finished. ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8224 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5139 commit b047b2a50791f4eeeb4c3a984d060ffdbf57ea26 Author: shuai.xusDate: 2017-12-08T10:02:42Z [FLINK-8224] [runtime] shutdown application when job terminated in job mode > Should shudownApplication when job terminated in job mode > - > > Key: FLINK-8224 > URL: https://issues.apache.org/jira/browse/FLINK-8224 > Project: Flink > Issue Type: Bug > Components: Cluster Management >Affects Versions: 1.5.0 >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > For job mode, one job is an application. When job finished, it should tell > the resource manager to shutdown the application, otherwise the resource > manager can not set the application status. For example, if yarn resource > manager don't set application as finished to yarn master, the yarn will > consider the application as still running. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5139: [FLINK-8224] [runtime] shutdown application when j...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/5139 [FLINK-8224] [runtime] shutdown application when job terminated in job mode ## What is the purpose of the change This current job cluster entrypoint doesn't call resource manage to shutdown the application. So resource manger has no change to set the application status to the outer resource management system such as YARN/Mesos. This may make the YARN still consider the application as running even the job is finished. ## Verifying this change This change is tested manually. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-8224 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5139.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5139 commit b047b2a50791f4eeeb4c3a984d060ffdbf57ea26 Author: shuai.xusDate: 2017-12-08T10:02:42Z [FLINK-8224] [runtime] shutdown application when job terminated in job mode ---
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283293#comment-16283293 ] ASF GitHub Bot commented on FLINK-8139: --- Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155741290 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected def checkValidTableName(name: String): Unit + /** +* Checks if the chosen table type is valid. +* @param table The table to check +*/ + protected def checkValidTableType(table: Table): Unit = { --- End diff -- I will do this. > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283292#comment-16283292 ] ASF GitHub Bot commented on FLINK-8139: --- Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155741244 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -411,6 +411,7 @@ abstract class TableEnvironment(val config: TableConfig) { } checkValidTableName(name) +checkValidTableType(table) --- End diff -- `org.apache.flink.table.api.TableEnvironment#validateType` is not only used to validate output types, but also used for all validating type information case, e.g. `BatchTableEnvironment#translate` for explain statement. > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155741290 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected def checkValidTableName(name: String): Unit + /** +* Checks if the chosen table type is valid. +* @param table The table to check +*/ + protected def checkValidTableType(table: Table): Unit = { --- End diff -- I will do this. ---
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155741244 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -411,6 +411,7 @@ abstract class TableEnvironment(val config: TableConfig) { } checkValidTableName(name) +checkValidTableType(table) --- End diff -- `org.apache.flink.table.api.TableEnvironment#validateType` is not only used to validate output types, but also used for all validating type information case, e.g. `BatchTableEnvironment#translate` for explain statement. ---
[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table
[ https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283289#comment-16283289 ] ASF GitHub Bot commented on FLINK-8139: --- Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155740555 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -98,7 +98,9 @@ abstract class BatchTableEnvironment( tableSource match { case batchTableSource: BatchTableSource[_] => -registerTableInternal(name, new BatchTableSourceTable(batchTableSource)) +val table = new BatchTableSourceTable(batchTableSource) +checkValidTableSourceType(tableSource) +registerTableInternal(name, table) --- End diff -- `registerTableInternal` takes parameter with `AbstractTable` type, while when registering `TableSource`, we should pass `TableSource` type. > Check for proper equals() and hashCode() when registering a table > - > > Key: FLINK-8139 > URL: https://issues.apache.org/jira/browse/FLINK-8139 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Aegeaner > > In the current Table API & SQL implementation we compare {{Row}}s at > different positions. E.g., for joining we test rows for equality or put them > into state. A heap state backend requires proper hashCode() and equals() in > order to work correct. Thus, every type in the Table API needs to have these > methods implemented. > We need to check if all fields of a row have implement methods that differ > from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both > coming from TableSource and DataStream/DataSet. > Additionally, for array types, the {{Row}} class should use > {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep > variants. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...
Github user Aegeaner commented on a diff in the pull request: https://github.com/apache/flink/pull/5065#discussion_r155740555 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala --- @@ -98,7 +98,9 @@ abstract class BatchTableEnvironment( tableSource match { case batchTableSource: BatchTableSource[_] => -registerTableInternal(name, new BatchTableSourceTable(batchTableSource)) +val table = new BatchTableSourceTable(batchTableSource) +checkValidTableSourceType(tableSource) +registerTableInternal(name, table) --- End diff -- `registerTableInternal` takes parameter with `AbstractTable` type, while when registering `TableSource`, we should pass `TableSource` type. ---
[jira] [Created] (FLINK-8225) Use JsonRowDeserializationSchema without Kafka connector dependency
Sendoh created FLINK-8225: - Summary: Use JsonRowDeserializationSchema without Kafka connector dependency Key: FLINK-8225 URL: https://issues.apache.org/jira/browse/FLINK-8225 Project: Flink Issue Type: Wish Components: Table API & SQL, Type Serialization System Reporter: Sendoh Priority: Minor Now when using JsonRowDeserializationSchema, user needs to add Kafka connector dependency. Nevertheless JsonRowDeserializationSchema can be used without using Kafka connector. AC: move JsonRowDeserializationSchema to a dedicated module Ref: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/the-location-of-JsonRowDeserializationSchema-java-td17063.html -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8224) Should shudownApplication when job terminated in job mode
shuai.xu created FLINK-8224: --- Summary: Should shudownApplication when job terminated in job mode Key: FLINK-8224 URL: https://issues.apache.org/jira/browse/FLINK-8224 Project: Flink Issue Type: Bug Components: Cluster Management Affects Versions: 1.5.0 Reporter: shuai.xu Assignee: shuai.xu For job mode, one job is an application. When job finished, it should tell the resource manager to shutdown the application, otherwise the resource manager can not set the application status. For example, if yarn resource manager don't set application as finished to yarn master, the yarn will consider the application as still running. -- This message was sent by Atlassian JIRA (v6.4.14#64029)