[jira] [Commented] (FLINK-8217) Properly annotate APIs of flink-connector-kinesis

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284616#comment-16284616
 ] 

ASF GitHub Bot commented on FLINK-8217:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5138
  
@greghogan this is a subtask of FLINK-8192. 

@tzulitai What do you think of having a FLIP to FLINK-8192?


> Properly annotate APIs of flink-connector-kinesis
> -
>
> Key: FLINK-8217
> URL: https://issues.apache.org/jira/browse/FLINK-8217
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kinesis Connector
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5138: [FLINK-8217] [Kinesis connector] Properly annotate APIs o...

2017-12-08 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5138
  
@greghogan this is a subtask of FLINK-8192. 

@tzulitai What do you think of having a FLIP to FLINK-8192?


---


[jira] [Commented] (FLINK-8227) Optimize the performance of SharedBufferSerializer

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284570#comment-16284570
 ] 

ASF GitHub Bot commented on FLINK-8227:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5142

[FLINK-8227] Optimize the performance of SharedBufferSerializer

## What is the purpose of the change

*This pull request optimize the performance of SharedBufferSerializer*

## Verifying this change

This change is a performance improvement without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink optimize_sharedbuffer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5142


commit b586abec579ef7f251333032c9385d7e71f3799b
Author: Dian Fu 
Date:   2017-12-09T03:51:04Z

[FLINK-8227] Optimize the performance of SharedBufferSerializer




> Optimize the performance of SharedBufferSerializer
> --
>
> Key: FLINK-8227
> URL: https://issues.apache.org/jira/browse/FLINK-8227
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>
> Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and 
> put all the {{SharedBufferEntry}} into it. Usually this is not a problem. But 
> we obverse that in some cases the calculation of hashCode may become the 
> bottleneck. The performance will decrease as the number of 
> {{SharedBufferEdge}} increases. For looping pattern {{A*}}, if the number of 
> {{SharedBufferEntry}} is {{N}}, the number of {{SharedBufferEdge}} is about 
> {{N * N}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5142: [FLINK-8227] Optimize the performance of SharedBuf...

2017-12-08 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5142

[FLINK-8227] Optimize the performance of SharedBufferSerializer

## What is the purpose of the change

*This pull request optimize the performance of SharedBufferSerializer*

## Verifying this change

This change is a performance improvement without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink optimize_sharedbuffer

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5142.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5142


commit b586abec579ef7f251333032c9385d7e71f3799b
Author: Dian Fu 
Date:   2017-12-09T03:51:04Z

[FLINK-8227] Optimize the performance of SharedBufferSerializer




---


[jira] [Created] (FLINK-8227) Optimize the performance of SharedBufferSerializer

2017-12-08 Thread Dian Fu (JIRA)
Dian Fu created FLINK-8227:
--

 Summary: Optimize the performance of SharedBufferSerializer
 Key: FLINK-8227
 URL: https://issues.apache.org/jira/browse/FLINK-8227
 Project: Flink
  Issue Type: Bug
  Components: CEP
Reporter: Dian Fu
Assignee: Dian Fu


Currently {{SharedBufferSerializer.serialize()}} will create a HashMap and put 
all the {{SharedBufferEntry}} into it. Usually this is not a problem. But we 
obverse that in some cases the calculation of hashCode may become the 
bottleneck. The performance will decrease as the number of {{SharedBufferEdge}} 
increases. For looping pattern {{A*}}, if the number of {{SharedBufferEntry}} 
is {{N}}, the number of {{SharedBufferEdge}} is about {{N * N}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry

2017-12-08 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284560#comment-16284560
 ] 

Dian Fu commented on FLINK-8226:


It will cause exceptions such as when serializing the NFA.
{code}
Caused by: java.lang.IllegalStateException: Could not find id for entry: 
SharedBufferEntry(ValueTimeWrapper(2017-12-05 09:49:05,2017-12-05 
09:49:05,normal, 1512438545000, 0), 1)
{code}

> Dangling reference generated after NFA clean up timed out SharedBufferEntry
> ---
>
> Key: FLINK-8226
> URL: https://issues.apache.org/jira/browse/FLINK-8226
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284558#comment-16284558
 ] 

ASF GitHub Bot commented on FLINK-8226:
---

GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5141

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed 
out SharedBufferEntry

…med out SharedBufferEntry


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## What is the purpose of the change

*(For example: This pull request fix the issue that dangling reference 
generated after NFA clean up timed out SharedBufferEntry. Exception will be 
thrown when serializing NFA.*


## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added tests NFATest#testTimeoutWindowPruning2*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink dangling_ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5141


commit 982bfafaabcfbfd78f4fcbdd9438eab9c8be65bb
Author: Dian Fu 
Date:   2017-12-09T02:55:14Z

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed 
out SharedBufferEntry




> Dangling reference generated after NFA clean up timed out SharedBufferEntry
> ---
>
> Key: FLINK-8226
> URL: https://issues.apache.org/jira/browse/FLINK-8226
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Reporter: Dian Fu
>Assignee: Dian Fu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5141: [FLINK-8226] [cep] Dangling reference generated af...

2017-12-08 Thread dianfu
GitHub user dianfu opened a pull request:

https://github.com/apache/flink/pull/5141

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed 
out SharedBufferEntry

…med out SharedBufferEntry


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## What is the purpose of the change

*(For example: This pull request fix the issue that dangling reference 
generated after NFA clean up timed out SharedBufferEntry. Exception will be 
thrown when serializing NFA.*


## Verifying this change

This change added tests and can be verified as follows:

*(example:)*
  - *Added tests NFATest#testTimeoutWindowPruning2*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dianfu/flink dangling_ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5141


commit 982bfafaabcfbfd78f4fcbdd9438eab9c8be65bb
Author: Dian Fu 
Date:   2017-12-09T02:55:14Z

[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed 
out SharedBufferEntry




---


[jira] [Created] (FLINK-8226) Dangling reference generated after NFA clean up timed out SharedBufferEntry

2017-12-08 Thread Dian Fu (JIRA)
Dian Fu created FLINK-8226:
--

 Summary: Dangling reference generated after NFA clean up timed out 
SharedBufferEntry
 Key: FLINK-8226
 URL: https://issues.apache.org/jira/browse/FLINK-8226
 Project: Flink
  Issue Type: Bug
  Components: CEP
Reporter: Dian Fu
Assignee: Dian Fu






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7797) Add support for windowed outer joins for streaming tables

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284543#comment-16284543
 ] 

ASF GitHub Bot commented on FLINK-7797:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5140

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables

## What is the purpose of the change

This PR adds support for windowed outer joins for streaming tables.

## Brief change log

  - Adjusts the plan translation logic to accept stream window outer join.
  - Adheres an ever emitted flag to each row. When a row is removed from 
the cache (or detected as not cached), a null padding join result will be 
emitted if necessary.
  - Adds a custom `JoinAwareCollector` to track whether there's a 
successfully joined result for both sides in each join loop.
  - Adds table/SQL translation tests, and also join integration tests. 
Since the runtime logic is built on the existing window inner join, no new 
harness tests are added.
 - Updates the SQL/Table API docs.

## Verifying this change

This PR can be verified by the cases added in `JoinTest` and `JoinITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (**yes**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (**removes the restriction 
notes**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7797

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5140


commit 34d3fde8049ec407849b61901acd8258a6a1f919
Author: Xingcan Cui 
Date:   2017-12-07T17:28:40Z

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables




> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5140: [FLINK-7797] [table] Add support for windowed oute...

2017-12-08 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5140

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables

## What is the purpose of the change

This PR adds support for windowed outer joins for streaming tables.

## Brief change log

  - Adjusts the plan translation logic to accept stream window outer join.
  - Adheres an ever emitted flag to each row. When a row is removed from 
the cache (or detected as not cached), a null padding join result will be 
emitted if necessary.
  - Adds a custom `JoinAwareCollector` to track whether there's a 
successfully joined result for both sides in each join loop.
  - Adds table/SQL translation tests, and also join integration tests. 
Since the runtime logic is built on the existing window inner join, no new 
harness tests are added.
 - Updates the SQL/Table API docs.

## Verifying this change

This PR can be verified by the cases added in `JoinTest` and `JoinITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (**yes**)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (**yes**)
  - If yes, how is the feature documented? (**removes the restriction 
notes**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7797

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5140.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5140


commit 34d3fde8049ec407849b61901acd8258a6a1f919
Author: Xingcan Cui 
Date:   2017-12-07T17:28:40Z

[FLINK-7797] [table] Add support for windowed outer joins for streaming 
tables




---


[jira] [Commented] (FLINK-8116) Stale comments referring to Checkpointed interface

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16284230#comment-16284230
 ] 

ASF GitHub Bot commented on FLINK-8116:
---

Github user ankitiitb1069 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5121#discussion_r155877893
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -34,17 +34,17 @@
  * The run method can run for as long as necessary. The source must, 
however, react to an
  * invocation of {@link #cancel()} by breaking out of its main loop.
  *
- * Checkpointed Sources
+ * CheckpointedFunction Sources
  *
- * Sources that also implement the {@link 
org.apache.flink.streaming.api.checkpoint.Checkpointed}
+ * Sources that also implement the {@link 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
  * interface must ensure that state checkpointing, updating of internal 
state and emission of
  * elements are not done concurrently. This is achieved by using the 
provided checkpointing lock
  * object to protect update of state and emission of elements in a 
synchronized block.
  *
  * This is the basic pattern one should follow when implementing a 
(checkpointed) source:
  *
  * {@code
- *  public class ExampleSource implements SourceFunction, 
Checkpointed {
+ *  public class ExampleSource implements SourceFunction, 
CheckpointedFunction {
--- End diff --

Please check, I have made the changes, but could not make up what to write 
inside of these functions


> Stale comments referring to Checkpointed interface
> --
>
> Key: FLINK-8116
> URL: https://issues.apache.org/jira/browse/FLINK-8116
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Documentation
>Reporter: Gabor Gevay
>Priority: Trivial
>  Labels: starter
> Fix For: 1.5.0
>
>
> Between Flink 1.1 and 1.2, the {{Checkpointed}} interface was superseded by 
> the {{CheckpointedFunction}} interface.
> However, in {{SourceFunction}} there are two comments still referring to the 
> old {{Checkpointed}} interface. (The code examples there also need to be 
> modified.)
> Note that the problem also occurs in {{StreamExecutionEnvironment}}, and 
> possibly other places as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5121: [FLINK-8116] Stale comments referring to Checkpoin...

2017-12-08 Thread ankitiitb1069
Github user ankitiitb1069 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5121#discussion_r155877893
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 ---
@@ -34,17 +34,17 @@
  * The run method can run for as long as necessary. The source must, 
however, react to an
  * invocation of {@link #cancel()} by breaking out of its main loop.
  *
- * Checkpointed Sources
+ * CheckpointedFunction Sources
  *
- * Sources that also implement the {@link 
org.apache.flink.streaming.api.checkpoint.Checkpointed}
+ * Sources that also implement the {@link 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
  * interface must ensure that state checkpointing, updating of internal 
state and emission of
  * elements are not done concurrently. This is achieved by using the 
provided checkpointing lock
  * object to protect update of state and emission of elements in a 
synchronized block.
  *
  * This is the basic pattern one should follow when implementing a 
(checkpointed) source:
  *
  * {@code
- *  public class ExampleSource implements SourceFunction, 
Checkpointed {
+ *  public class ExampleSource implements SourceFunction, 
CheckpointedFunction {
--- End diff --

Please check, I have made the changes, but could not make up what to write 
inside of these functions


---


[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283741#comment-16283741
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5134
  
I also think being more on the cautious side is actually a good thing. 
Given how scared and confused many people and companies are with respect to 
open source and accidentally infringement of IP, being cautious is good, in my 
opinion.

There is an interesting comment from Ted Dunning on a related question, 
basically saying that if things could be a problem or not, think if some 
cautious/picky users might perceive them as a problem:

https://lists.apache.org/thread.html/fc3992c13cc30f889c820d1cfd6be61b63a5be4efa7c9101262474c9@%3Clegal-discuss.apache.org%3E


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-08 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5134
  
I also think being more on the cautious side is actually a good thing. 
Given how scared and confused many people and companies are with respect to 
open source and accidentally infringement of IP, being cautious is good, in my 
opinion.

There is an interesting comment from Ted Dunning on a related question, 
basically saying that if things could be a problem or not, think if some 
cautious/picky users might perceive them as a problem:

https://lists.apache.org/thread.html/fc3992c13cc30f889c820d1cfd6be61b63a5be4efa7c9101262474c9@%3Clegal-discuss.apache.org%3E


---


[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283734#comment-16283734
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5134
  
@greg I think there is one more requirement by the ASF (not necessarily the 
Apache License itself), which is making sure that downstream consumers of 
Apache projects are always explicitly aware when they are consuming something 
(even when the build themselves) that has friction with the ASL 2.0.

My preferred solution would still be a separate repository. Executing 
benchmarks during tests is pretty tricky anyways, because their result depends 
so much on where the benchmark is executed, and what happens concurrently on 
the machine, is it a VM or container, etc.


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-08 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5134
  
@greg I think there is one more requirement by the ASF (not necessarily the 
Apache License itself), which is making sure that downstream consumers of 
Apache projects are always explicitly aware when they are consuming something 
(even when the build themselves) that has friction with the ASL 2.0.

My preferred solution would still be a separate repository. Executing 
benchmarks during tests is pretty tricky anyways, because their result depends 
so much on where the benchmark is executed, and what happens concurrently on 
the machine, is it a VM or container, etc.


---


[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4987#discussion_r155804733
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -757,6 +775,64 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
return 
CompletableFuture.completedFuture(executionGraph.getState());
}
 
+   
//--
+   // RestfulGateway RPC methods
+   
//--
+
+   @Override
+   public CompletableFuture requestRestAddress(Time timeout) {
+   return restAddressFuture;
+   }
+
+   @Override
+   public CompletableFuture requestJob(JobID jobId, 
Time timeout) {
+   if (Objects.equals(jobGraph.getJobID(), jobId)) {
--- End diff --

When I see `Objects.equals`, I am assuming that it's possible that both 
arguments can be null. However, `jobGraph.getJobID()` is always non-null.


---


[jira] [Commented] (FLINK-8029) Create common WebMonitorEndpoint

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283732#comment-16283732
 ] 

ASF GitHub Bot commented on FLINK-8029:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4987#discussion_r155804733
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -757,6 +775,64 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
return 
CompletableFuture.completedFuture(executionGraph.getState());
}
 
+   
//--
+   // RestfulGateway RPC methods
+   
//--
+
+   @Override
+   public CompletableFuture requestRestAddress(Time timeout) {
+   return restAddressFuture;
+   }
+
+   @Override
+   public CompletableFuture requestJob(JobID jobId, 
Time timeout) {
+   if (Objects.equals(jobGraph.getJobID(), jobId)) {
--- End diff --

When I see `Objects.equals`, I am assuming that it's possible that both 
arguments can be null. However, `jobGraph.getJobID()` is always non-null.


> Create common WebMonitorEndpoint
> 
>
> Key: FLINK-8029
> URL: https://issues.apache.org/jira/browse/FLINK-8029
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to reuse the existing the REST handlers, we should create a common 
> {{WebMonitorEndpoint}} which is shared by the {{Dispatcher}} and the 
> {{JobMaster}} component.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8029) Create common WebMonitorEndpoint

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283718#comment-16283718
 ] 

ASF GitHub Bot commented on FLINK-8029:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4987#discussion_r155802027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -27,8 +27,6 @@
 
 /**
  * Interface for a metric registry.
-
-   LOG.debug("Started MetricQueryService under 
{}.", metricQueryServicePath);
--- End diff --

beautiful


> Create common WebMonitorEndpoint
> 
>
> Key: FLINK-8029
> URL: https://issues.apache.org/jira/browse/FLINK-8029
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to reuse the existing the REST handlers, we should create a common 
> {{WebMonitorEndpoint}} which is shared by the {{Dispatcher}} and the 
> {{JobMaster}} component.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4987: [FLINK-8029] Create WebMonitorEndpoint

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4987#discussion_r155802027
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 ---
@@ -27,8 +27,6 @@
 
 /**
  * Interface for a metric registry.
-
-   LOG.debug("Started MetricQueryService under 
{}.", metricQueryServicePath);
--- End diff --

beautiful


---


[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2017-12-08 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/5072
  
@greghogan,  I saw the following in the `flink-core` POM:
```


   org.xerial.snappy
   snappy-java

```
I think this PR just upgrades the snappy-java version from 1.1.1.3 to 1.1.4 
for the "flink-core" module.   I can not find out what bad effects it will 
cause.:)


---


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283716#comment-16283716
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/5072
  
@greghogan,  I saw the following in the `flink-core` POM:
```


   org.xerial.snappy
   snappy-java

```
I think this PR just upgrades the snappy-java version from 1.1.1.3 to 1.1.4 
for the "flink-core" module.   I can not find out what bad effects it will 
cause.:)


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8030) Start JobMasterRestEndpoint in JobClusterEntrypoint

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283699#comment-16283699
 ] 

ASF GitHub Bot commented on FLINK-8030:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4988#discussion_r155797999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
+import 
org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import 
org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
+import 

[GitHub] flink pull request #4988: [FLINK-8030] Instantiate JobMasterRestEndpoint in ...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4988#discussion_r155797999
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import 
org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointingStatisticsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.job.metrics.JobMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
+import 
org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
+import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import 
org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
+import 

[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283657#comment-16283657
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5134
  
@pnowojski see FLINK-2848 and FLINK-2973. Also the BSD + Patents 
conversation along with Flink's dependence on Amazon's Kinesis library 
(likewise [Category X](https://www.apache.org/legal/resolved.html#category-x)). 
I now think we have been overly cautious in this regard, not necessarily a bad 
thing.

As @StephanEwen noted, the GPL-dependent code must be an [optional 
component](https://www.apache.org/legal/resolved.html#optional). We [do 
this](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html)
 with Kinesis by not releasing convenience binaries but requiring the user to 
compile from source with an explicit flag. I think we are safe to include a 
`flink-benchmarks` module which is not depended on or included in the Flink 
distribution. The benchmarks are Apache licensed, only the dependence on `jmh` 
prohibits distribution, so can be included in the (source) release. These are 
performance, not integration, tests so would be run manually rather than during 
the build.

I agree with @StephanEwen's caution but we would not be releasing GPL code, 
it is not obvious that `jmh` is GPL licensed, and Java itself is GPL.


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-08 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5134
  
@pnowojski see FLINK-2848 and FLINK-2973. Also the BSD + Patents 
conversation along with Flink's dependence on Amazon's Kinesis library 
(likewise [Category X](https://www.apache.org/legal/resolved.html#category-x)). 
I now think we have been overly cautious in this regard, not necessarily a bad 
thing.

As @StephanEwen noted, the GPL-dependent code must be an [optional 
component](https://www.apache.org/legal/resolved.html#optional). We [do 
this](https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/kinesis.html)
 with Kinesis by not releasing convenience binaries but requiring the user to 
compile from source with an explicit flag. I think we are safe to include a 
`flink-benchmarks` module which is not depended on or included in the Flink 
distribution. The benchmarks are Apache licensed, only the dependence on `jmh` 
prohibits distribution, so can be included in the (source) release. These are 
performance, not integration, tests so would be run manually rather than during 
the build.

I agree with @StephanEwen's caution but we would not be releasing GPL code, 
it is not obvious that `jmh` is GPL licensed, and Java itself is GPL.


---


[jira] [Commented] (FLINK-8192) Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving / @Internal

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283523#comment-16283523
 ] 

ASF GitHub Bot commented on FLINK-8192:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5138
  
Changes to the public API require a 
[FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals).


> Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving 
> / @Internal
> -
>
> Key: FLINK-8192
> URL: https://issues.apache.org/jira/browse/FLINK-8192
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0
>
>
> Currently, the APIs of the Flink connectors have absolutely no annotations on 
> whether their usage is {{Public}} / {{PublicEvolving}} / or {{Internal}}.
> We have, for example, instances in the past where a user was mistakenly using 
> an abstract internal base class in the Elasticsearch connector.
> This JIRA tracks the coverage of API usage annotation for all Flink shipped 
> connectors. Ideally, a separate subtask should be created for each individual 
> connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5138: [FLINK-8192] [Kinesis connector] Properly annotate APIs o...

2017-12-08 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5138
  
Changes to the public API require a 
[FLIP](https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals).


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283481#comment-16283481
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770364
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+
+   private static final DummySlotOwner slotOwner = new DummySlotOwner();
+
+   private static final TestingAllocatedSlotActions allocatedSlotActions = 
new TestingAllocatedSlotActions();
--- End diff --

This instance is mutable... should not be `static`


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770364
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
+
+   private static final DummySlotOwner slotOwner = new DummySlotOwner();
+
+   private static final TestingAllocatedSlotActions allocatedSlotActions = 
new TestingAllocatedSlotActions();
--- End diff --

This instance is mutable... should not be `static`


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283477#comment-16283477
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770104
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
--- End diff --

Should be `SLOT_SHARING_GROUP_ID` since it is a constant.


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155770104
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManagerTest.java
 ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.DummySlotOwner;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for the {@link SlotSharingManager}.
+ */
+public class SlotSharingManagerTest extends TestLogger {
+
+   private static final SlotSharingGroupId slotSharingGroupId = new 
SlotSharingGroupId();
--- End diff --

Should be `SLOT_SHARING_GROUP_ID` since it is a constant.


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283470#comment-16283470
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155768880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155768880
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283413#comment-16283413
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155758219
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
--- End diff --

nit: *leaf nodes*


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155758219
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
--- End diff --

nit: *leaf nodes*


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283397#comment-16283397
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155754738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155754738
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+ 

[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283365#comment-16283365
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5134
  
@greghogan That may or may not be possible, not 100% sure. Re-distributing 
it does certainly not work.

To my current understanding, the Apache guidelines say we would need to 
make sure that downstream consumers of Flink (also the source) don't 
accidentally pull in a GPL license. For weak copyleft, where consuming it only 
via linking, it is okay, but for strong copyleft, where accidentally linking it 
results in copyleft, I think the hurdles for that are pretty high. There are 
some gray zone areas (like MPL) where the understanding is that "optional" 
dependency is okay because it requires an explicit action by a downstream user 
(re-adding the dependency) and the potential damage is not too high.

I have seen a lot of cases first hand in the past months how many companies 
that want to use open source freak out as soon as they see an instance of GPL. 
We would make Flink's adoption easier if we did not have that.

My first suggestion would be to consolidate this in a separate 
`flink-benchmarks` repository. That would also keep the core Flink repository 
"clear" of GPL. I could check with Apache legal for a final answer.


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-08 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5134
  
@greghogan That may or may not be possible, not 100% sure. Re-distributing 
it does certainly not work.

To my current understanding, the Apache guidelines say we would need to 
make sure that downstream consumers of Flink (also the source) don't 
accidentally pull in a GPL license. For weak copyleft, where consuming it only 
via linking, it is okay, but for strong copyleft, where accidentally linking it 
results in copyleft, I think the hurdles for that are pretty high. There are 
some gray zone areas (like MPL) where the understanding is that "optional" 
dependency is okay because it requires an explicit action by a downstream user 
(re-adding the dependency) and the potential damage is not too high.

I have seen a lot of cases first hand in the past months how many companies 
that want to use open source freak out as soon as they see an instance of GPL. 
We would make Flink's adoption easier if we did not have that.

My first suggestion would be to consolidate this in a separate 
`flink-benchmarks` repository. That would also keep the core Flink repository 
"clear" of GPL. I could check with Apache legal for a final answer.


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-08 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155751694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return slotAndLocalityFuture.thenApply(
+   (SlotAndLocality slotAndLocality) -> {
+   final AllocatedSlot allocatedSlot = 
slotAndLocality.getSlot();
+
+ 

[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283364#comment-16283364
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155751694
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
+   slotRequestId,
+   task.getJobVertexId(),
+   multiTaskSlotFuture.getLocality());
+
+   return leave.getLogicalSlotFuture();
+   } else {
+   // request an allocated slot to assign a single logical 
slot to
+   CompletableFuture 
slotAndLocalityFuture = requestAllocatedSlot(
+   slotRequestId,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+
+   return 

[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283318#comment-16283318
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155746723
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
--- End diff --

I will do this


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283319#comment-16283319
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155746766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
+val types = table.getSchema.getTypes
+checkTypeArray(types)
+  }
+
+  private def checkTypeArray(types: Array[TypeInformation[_]]) = {
+for (typeInfo <- types) {
+  if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType &&
+!typeInfo.isInstanceOf[PrimitiveArrayTypeInfo[_]]) {
--- End diff --

I will do this.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-08 Thread Aegeaner
Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155746723
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
--- End diff --

I will do this


---


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-08 Thread Aegeaner
Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155746766
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
+val types = table.getSchema.getTypes
+checkTypeArray(types)
+  }
+
+  private def checkTypeArray(types: Array[TypeInformation[_]]) = {
+for (typeInfo <- types) {
+  if(!typeInfo.asInstanceOf[TypeInformation[_]].isBasicType &&
+!typeInfo.isInstanceOf[PrimitiveArrayTypeInfo[_]]) {
--- End diff --

I will do this.


---


[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283309#comment-16283309
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5134
  
It seems like `commons-lang` doesn't have any issues with using `jmh` in 
tests: https://commons.apache.org/proper/commons-lang/dependencies.html

However @greghogan do you remember my previous attempt with JMH in Flink? 
https://github.com/apache/flink/pull/4323 Was it a false alarm?


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-08 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5134
  
It seems like `commons-lang` doesn't have any issues with using `jmh` in 
tests: https://commons.apache.org/proper/commons-lang/dependencies.html

However @greghogan do you remember my previous attempt with JMH in Flink? 
https://github.com/apache/flink/pull/4323 Was it a false alarm?


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283304#comment-16283304
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r155742906
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

Sure, I think that makes sense. 


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5125: [WIP][FLINK-4812][metrics] Expose currentLowWaterm...

2017-12-08 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5125#discussion_r155742906
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 ---
@@ -80,7 +91,10 @@ public void init() throws Exception {
this.headOperator);
 
// make sure that stream tasks report their I/O statistics
-   
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
+   
inputProcessor.setupMetrics(getEnvironment().getMetricGroup().getIOMetricGroup(),
 input1WatermarkGauge, input2WatermarkGauge);
+
+   
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
--- End diff --

Sure, I think that makes sense. 


---


[jira] [Commented] (FLINK-8224) Should shudownApplication when job terminated in job mode

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283302#comment-16283302
 ] 

ASF GitHub Bot commented on FLINK-8224:
---

GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5139

[FLINK-8224] [runtime] shutdown application when job terminated in job mode

## What is the purpose of the change

This current job cluster entrypoint doesn't call resource manage to 
shutdown the application. So resource manger has no change to set the 
application status to the outer resource management system such as YARN/Mesos. 
This may make the YARN still consider the application as running even the job 
is finished.

## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8224

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5139


commit b047b2a50791f4eeeb4c3a984d060ffdbf57ea26
Author: shuai.xus 
Date:   2017-12-08T10:02:42Z

[FLINK-8224] [runtime] shutdown application when job terminated in job mode




> Should shudownApplication when job terminated in job mode
> -
>
> Key: FLINK-8224
> URL: https://issues.apache.org/jira/browse/FLINK-8224
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> For job mode, one job is an application. When job finished, it should tell 
> the resource manager to shutdown the application, otherwise the resource 
> manager can not set the application status. For example, if yarn resource 
> manager don't set application as finished to yarn master, the yarn will 
> consider the application as still running.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5139: [FLINK-8224] [runtime] shutdown application when j...

2017-12-08 Thread shuai-xu
GitHub user shuai-xu opened a pull request:

https://github.com/apache/flink/pull/5139

[FLINK-8224] [runtime] shutdown application when job terminated in job mode

## What is the purpose of the change

This current job cluster entrypoint doesn't call resource manage to 
shutdown the application. So resource manger has no change to set the 
application status to the outer resource management system such as YARN/Mesos. 
This may make the YARN still consider the application as running even the job 
is finished.

## Verifying this change

This change is tested manually.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shuai-xu/flink jira-8224

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5139.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5139


commit b047b2a50791f4eeeb4c3a984d060ffdbf57ea26
Author: shuai.xus 
Date:   2017-12-08T10:02:42Z

[FLINK-8224] [runtime] shutdown application when job terminated in job mode




---


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283293#comment-16283293
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155741290
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
--- End diff --

I will do this.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283292#comment-16283292
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155741244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -411,6 +411,7 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 }
 
 checkValidTableName(name)
+checkValidTableType(table)
--- End diff --

`org.apache.flink.table.api.TableEnvironment#validateType` is not only used 
to validate output types, but also used for all validating type information 
case, e.g. `BatchTableEnvironment#translate` for explain statement. 


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-08 Thread Aegeaner
Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155741290
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -721,6 +722,42 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 */
   protected def checkValidTableName(name: String): Unit
 
+  /**
+* Checks if the chosen table type is valid.
+* @param table The table to check
+*/
+  protected def checkValidTableType(table: Table): Unit = {
--- End diff --

I will do this.


---


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-08 Thread Aegeaner
Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155741244
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -411,6 +411,7 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 }
 
 checkValidTableName(name)
+checkValidTableType(table)
--- End diff --

`org.apache.flink.table.api.TableEnvironment#validateType` is not only used 
to validate output types, but also used for all validating type information 
case, e.g. `BatchTableEnvironment#translate` for explain statement. 


---


[jira] [Commented] (FLINK-8139) Check for proper equals() and hashCode() when registering a table

2017-12-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283289#comment-16283289
 ] 

ASF GitHub Bot commented on FLINK-8139:
---

Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155740555
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -98,7 +98,9 @@ abstract class BatchTableEnvironment(
 
 tableSource match {
   case batchTableSource: BatchTableSource[_] =>
-registerTableInternal(name, new 
BatchTableSourceTable(batchTableSource))
+val table = new BatchTableSourceTable(batchTableSource)
+checkValidTableSourceType(tableSource)
+registerTableInternal(name, table)
--- End diff --

`registerTableInternal` takes parameter with `AbstractTable` type, while 
when registering `TableSource`, we should pass `TableSource` type.


> Check for proper equals() and hashCode() when registering a table
> -
>
> Key: FLINK-8139
> URL: https://issues.apache.org/jira/browse/FLINK-8139
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Aegeaner
>
> In the current Table API & SQL implementation we compare {{Row}}s at 
> different positions. E.g., for joining we test rows for equality or put them 
> into state. A heap state backend requires proper hashCode() and equals() in 
> order to work correct. Thus, every type in the Table API needs to have these 
> methods implemented.
> We need to check if all fields of a row have implement methods that differ 
> from {{Object.equals()}} and {{Object.hashCode()}} via reflections. Both 
> coming from TableSource and DataStream/DataSet.
> Additionally, for array types, the {{Row}} class should use 
> {{Arrays.deepEquals()}} and {{Arrays.deepHashCode()}} instead of the non-deep 
> variants.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5065: [FLINK-8139][table] Check for proper equals() and ...

2017-12-08 Thread Aegeaner
Github user Aegeaner commented on a diff in the pull request:

https://github.com/apache/flink/pull/5065#discussion_r155740555
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 ---
@@ -98,7 +98,9 @@ abstract class BatchTableEnvironment(
 
 tableSource match {
   case batchTableSource: BatchTableSource[_] =>
-registerTableInternal(name, new 
BatchTableSourceTable(batchTableSource))
+val table = new BatchTableSourceTable(batchTableSource)
+checkValidTableSourceType(tableSource)
+registerTableInternal(name, table)
--- End diff --

`registerTableInternal` takes parameter with `AbstractTable` type, while 
when registering `TableSource`, we should pass `TableSource` type.


---


[jira] [Created] (FLINK-8225) Use JsonRowDeserializationSchema without Kafka connector dependency

2017-12-08 Thread Sendoh (JIRA)
Sendoh created FLINK-8225:
-

 Summary: Use JsonRowDeserializationSchema without Kafka connector 
dependency 
 Key: FLINK-8225
 URL: https://issues.apache.org/jira/browse/FLINK-8225
 Project: Flink
  Issue Type: Wish
  Components: Table API & SQL, Type Serialization System
Reporter: Sendoh
Priority: Minor


Now when using JsonRowDeserializationSchema, user needs to add Kafka connector 
dependency. Nevertheless JsonRowDeserializationSchema can be used without using 
Kafka connector.

AC:
move JsonRowDeserializationSchema to a dedicated module

Ref:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/the-location-of-JsonRowDeserializationSchema-java-td17063.html




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8224) Should shudownApplication when job terminated in job mode

2017-12-08 Thread shuai.xu (JIRA)
shuai.xu created FLINK-8224:
---

 Summary: Should shudownApplication when job terminated in job mode
 Key: FLINK-8224
 URL: https://issues.apache.org/jira/browse/FLINK-8224
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Affects Versions: 1.5.0
Reporter: shuai.xu
Assignee: shuai.xu


For job mode, one job is an application. When job finished, it should tell the 
resource manager to shutdown the application, otherwise the resource manager 
can not set the application status. For example, if yarn resource manager don't 
set application as finished to yarn master, the yarn will consider the 
application as still running.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)