[GitHub] [flink] Myasuka commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side

2019-06-28 Thread GitBox
Myasuka commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir 
requests to DFS from task side
URL: https://github.com/apache/flink/pull/7942#issuecomment-506924134
 
 
   Build failed due to 
[FLINK-12164](https://issues.apache.org/jira/browse/FLINK-12164), re-trigger 
again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-13030) add more test case for blink planner

2019-06-28 Thread godfrey he (JIRA)
godfrey he created FLINK-13030:
--

 Summary: add more test case for blink planner
 Key: FLINK-13030
 URL: https://issues.apache.org/jira/browse/FLINK-13030
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Planner
Reporter: godfrey he
Assignee: godfrey he


this issue aims to introduce more test cases from inner blink and flink planner 
to blink planner, which makes sure the functionality of blink planner could 
align with inner blink and flink planner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-12164) JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable

2019-06-28 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reopened FLINK-12164:
--

> JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable
> 
>
> Key: FLINK-12164
> URL: https://issues.apache.org/jira/browse/FLINK-12164
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code}
> 07:28:23.957 [ERROR] Tests run: 24, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 8.968 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterTest
> 07:28:23.957 [ERROR] 
> testJobFailureWhenTaskExecutorHeartbeatTimeout(org.apache.flink.runtime.jobmaster.JobMasterTest)
>   Time elapsed: 0.177 s  <<< ERROR!
> java.util.concurrent.ExecutionException: java.lang.Exception: Unknown 
> TaskManager 69a7c8c18a36069ff90a1eae8ec41066
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.registerSlotsAtJobMaster(JobMasterTest.java:1746)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.runJobFailureWhenTaskExecutorTerminatesTest(JobMasterTest.java:1670)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout(JobMasterTest.java:1630)
> Caused by: java.lang.Exception: Unknown TaskManager 
> 69a7c8c18a36069ff90a1eae8ec41066
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12164) JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable

2019-06-28 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16875361#comment-16875361
 ] 

Yun Tang commented on FLINK-12164:
--

This problem happened again, here is the instance: 
[https://api.travis-ci.org/v3/job/551302724/log.txt] .

> JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable
> 
>
> Key: FLINK-12164
> URL: https://issues.apache.org/jira/browse/FLINK-12164
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> {code}
> 07:28:23.957 [ERROR] Tests run: 24, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 8.968 s <<< FAILURE! - in 
> org.apache.flink.runtime.jobmaster.JobMasterTest
> 07:28:23.957 [ERROR] 
> testJobFailureWhenTaskExecutorHeartbeatTimeout(org.apache.flink.runtime.jobmaster.JobMasterTest)
>   Time elapsed: 0.177 s  <<< ERROR!
> java.util.concurrent.ExecutionException: java.lang.Exception: Unknown 
> TaskManager 69a7c8c18a36069ff90a1eae8ec41066
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.registerSlotsAtJobMaster(JobMasterTest.java:1746)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.runJobFailureWhenTaskExecutorTerminatesTest(JobMasterTest.java:1670)
>   at 
> org.apache.flink.runtime.jobmaster.JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout(JobMasterTest.java:1630)
> Caused by: java.lang.Exception: Unknown TaskManager 
> 69a7c8c18a36069ff90a1eae8ec41066
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] Myasuka commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState

2019-06-28 Thread GitBox
Myasuka commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct 
the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
URL: https://github.com/apache/flink/pull/8909#issuecomment-506920344
 
 
   @wuchong Thanks for your review, rebase with latest code to fix build error 
due to symbol not found.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-28 Thread GitBox
xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add 
additional dependencies for flink-connector-hive to connect to remote hive 
metastore service
URL: https://github.com/apache/flink/pull/8889#discussion_r298778900
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -452,29 +493,15 @@ under the License.


false

-   

-   
commons-dbcp:commons-dbcp
-   
commons-pool:commons-pool
-   
commons-beanutils:commons-beanutils
-   
com.fasterxml.jackson.core:*
-   
com.jolbox:bonecp
-   
org.apache.thrift:libthrift
-   
org.datanucleus:*
-   
org.antlr:antlr-runtime
+   
*:*



true



-   

*:*
-   

 
 Review comment:
   This is fine.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13021) unify catalog partition implementations

2019-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-13021:
---
Labels: pull-request-available  (was: )

> unify catalog partition implementations
> ---
>
> Key: FLINK-13021
> URL: https://issues.apache.org/jira/browse/FLINK-13021
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / API
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations

2019-06-28 Thread GitBox
flinkbot commented on issue #8926: [FLINK-13021][table][hive] unify catalog 
partition implementations
URL: https://github.com/apache/flink/pull/8926#issuecomment-506901123
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 opened a new pull request #8926: [FLINK-13021][table][hive] unify catalog partition implementations

2019-06-28 Thread GitBox
bowenli86 opened a new pull request #8926: [FLINK-13021][table][hive] unify 
catalog partition implementations
URL: https://github.com/apache/flink/pull/8926
 
 
   ## What is the purpose of the change
   
   This PR unifies catalog partition implementations.
   
   ## Brief change log
   
   - Unified `GenericCatalogPartition`, `HiveCatalogPartition`, and 
`AbstractCatalogPartition` into `CatalogPartitionImpl`
   - updated unit tests
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`GenericInMemoryCatalogTest`, `HiveCatalogGenericMetadataTest`, 
`HiveCatalogHiveMetadataTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-28 Thread GitBox
xuefuz commented on issue #8889: [FLINK-12934][hive] add additional 
dependencies for flink-connector-hive to connect to remote hive metastore 
service
URL: https://github.com/apache/flink/pull/8889#issuecomment-506900050
 
 
   Thanks for the update. Looking at the required libraries, I'm wondering if 
it's reasonable to expect user to find those very specific libraries and put 
them in the classpath. The other question would be why only those are missing. 
   
   For usability, it seems fine to me if we ask user to put flink-hive jar in 
the classpath. However, asking for a very specific jar like commons-logging 
seems having too much usability overhead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-28 Thread GitBox
bowenli86 commented on a change in pull request #8889: [FLINK-12934][hive] add 
additional dependencies for flink-connector-hive to connect to remote hive 
metastore service
URL: https://github.com/apache/flink/pull/8889#discussion_r298769864
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -452,29 +493,15 @@ under the License.


false

-   

-   
commons-dbcp:commons-dbcp
-   
commons-pool:commons-pool
-   
commons-beanutils:commons-beanutils
-   
com.fasterxml.jackson.core:*
-   
com.jolbox:bonecp
-   
org.apache.thrift:libthrift
-   
org.datanucleus:*
-   
org.antlr:antlr-runtime
+   
*:*



true



-   

*:*
-   

 
 Review comment:
   because we don't package any hive/hadoop dependencies anymore a long time 
ago. They are all provided, so we don't need to filter anything for them. This 
is a kind of historical leftover, and can be on its own PR if necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-28 Thread GitBox
xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add 
additional dependencies for flink-connector-hive to connect to remote hive 
metastore service
URL: https://github.com/apache/flink/pull/8889#discussion_r298768463
 
 

 ##
 File path: flink-connectors/flink-connector-hive/pom.xml
 ##
 @@ -452,29 +493,15 @@ under the License.


false

-   

-   
commons-dbcp:commons-dbcp
-   
commons-pool:commons-pool
-   
commons-beanutils:commons-beanutils
-   
com.fasterxml.jackson.core:*
-   
com.jolbox:bonecp
-   
org.apache.thrift:libthrift
-   
org.datanucleus:*
-   
org.antlr:antlr-runtime
+   
*:*



true



-   

*:*
-   

 
 Review comment:
   How come we don't need those any more?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations

2019-06-28 Thread GitBox
bowenli86 commented on issue #8919: [FLINK-13022][table][hive] unify catalog 
function implementations
URL: https://github.com/apache/flink/pull/8919#issuecomment-506892579
 
 
   cc @xuefuz @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 removed a comment on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations

2019-06-28 Thread GitBox
bowenli86 removed a comment on issue #8919: [FLINK-13022][table][hive] unify 
catalog function implementations
URL: https://github.com/apache/flink/pull/8919#issuecomment-506892579
 
 
   cc @xuefuz @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service

2019-06-28 Thread GitBox
bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional 
dependencies for flink-connector-hive to connect to remote hive metastore 
service
URL: https://github.com/apache/flink/pull/8889#issuecomment-506859928
 
 
   @xuefuz after considering it carefully, I decided to make them as `provided` 
dependency for now as flink-connector-hive doesn't necessarily need them for 
either compilation or testing. I added them as dependencies more as a reminder 
to users and developers that they need to add them into their classpath when 
running in production


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8904: [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table

2019-06-28 Thread GitBox
bowenli86 commented on issue #8904: [FLINK-13005][hive] HiveCatalog should not 
add 'flink.is_generic' key for Hive table
URL: https://github.com/apache/flink/pull/8904#issuecomment-506834286
 
 
   @xuefuz I've updated the PR with new design as we discussed yesterday. Can 
you take another look?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8800: [FLINK-12627][doc][sql client][hive] Document how to configure and use catalogs in SQL CLI

2019-06-28 Thread GitBox
bowenli86 commented on issue #8800: [FLINK-12627][doc][sql client][hive] 
Document how to configure and use catalogs in SQL CLI
URL: https://github.com/apache/flink/pull/8800#issuecomment-506827758
 
 
   cc @xuefuz @wuchong @lirui-apache @zjuwangg 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-28 Thread GitBox
xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource 
from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506804032
 
 
   Thanks for the review. Not sure it was due to the inclusion of #8890, but I 
rebased to master to retest.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12989) Generate HiveTableSink from from a Hive table

2019-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12989:
---
Labels: pull-request-available  (was: )

> Generate HiveTableSink from from a Hive table
> -
>
> Key: FLINK-12989
> URL: https://issues.apache.org/jira/browse/FLINK-12989
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>  Labels: pull-request-available
>
> As a followup for FLINK-11480, this adds the conversion from a Hive table to 
> a table sink that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8890: [FLINK-12989][hive] Generate HiveTableSink from from a Hive table

2019-06-28 Thread GitBox
asfgit closed pull request #8890: [FLINK-12989][hive] Generate HiveTableSink 
from from a Hive table
URL: https://github.com/apache/flink/pull/8890
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14

2019-06-28 Thread GitBox
ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump 
checkstyle to 8.14
URL: https://github.com/apache/flink/pull/8870#issuecomment-506799770
 
 
   To @hmcl , I wish I could. But I have no authorization to merge your PR 
since I'm not a committer of Flink project. I would suggest ping the committers 
like zentol to review it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12989) Generate HiveTableSink from from a Hive table

2019-06-28 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li closed FLINK-12989.

Resolution: Fixed

merged in 1.9.0: f1721293b0701d584d42bd68671181e332d2ad04

> Generate HiveTableSink from from a Hive table
> -
>
> Key: FLINK-12989
> URL: https://issues.apache.org/jira/browse/FLINK-12989
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Xuefu Zhang
>Assignee: Xuefu Zhang
>Priority: Major
>
> As a followup for FLINK-11480, this adds the conversion from a Hive table to 
> a table sink that's used for data connector writing side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events

2019-06-28 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu reassigned FLINK-13027:
---

Assignee: (was: Ying Xu)

> StreamingFileSink bulk-encoded writer supports file rolling upon customized 
> events
> --
>
> Key: FLINK-13027
> URL: https://issues.apache.org/jira/browse/FLINK-13027
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Ying Xu
>Priority: Major
>
> When writing in bulk-encoded format such as Parquet, StreamingFileSink only 
> supports OnCheckpointRollingPolicy, which rolls file at checkpointing time.   
>  
> In many scenarios, it is beneficial that the sink can roll file upon certain 
> events, for example, when the file size reaches a limit. Such a rolling 
> policy can also potentially alleviate some of the side effects of 
> OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file 
> uploading all happen at the checkpoint time.  
> Specifically, this Jira calls for a new rolling policy that rolls file: 
>  # whenever a customized event happens, e.g., the file size reaches certain 
> limit. 
>  # whenever a checkpoint happens. This is needed for providing exactly-once 
> guarantees when writing bulk-encoded files. 
> Users of this rolling policy need to be aware that the customized event and 
> the next checkpoint epoch may be close to each other, thus may yield a tiny 
> file per checkpoint at the worst. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events

2019-06-28 Thread Ying Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ying Xu reassigned FLINK-13027:
---

Assignee: Ying Xu

> StreamingFileSink bulk-encoded writer supports file rolling upon customized 
> events
> --
>
> Key: FLINK-13027
> URL: https://issues.apache.org/jira/browse/FLINK-13027
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Reporter: Ying Xu
>Assignee: Ying Xu
>Priority: Major
>
> When writing in bulk-encoded format such as Parquet, StreamingFileSink only 
> supports OnCheckpointRollingPolicy, which rolls file at checkpointing time.   
>  
> In many scenarios, it is beneficial that the sink can roll file upon certain 
> events, for example, when the file size reaches a limit. Such a rolling 
> policy can also potentially alleviate some of the side effects of 
> OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file 
> uploading all happen at the checkpoint time.  
> Specifically, this Jira calls for a new rolling policy that rolls file: 
>  # whenever a customized event happens, e.g., the file size reaches certain 
> limit. 
>  # whenever a checkpoint happens. This is needed for providing exactly-once 
> guarantees when writing bulk-encoded files. 
> Users of this rolling policy need to be aware that the customized event and 
> the next checkpoint epoch may be close to each other, thus may yield a tiny 
> file per checkpoint at the worst. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] bowenli86 commented on issue #8890: [FLINK-12989][hive] Generate HiveTableSink from from a Hive table

2019-06-28 Thread GitBox
bowenli86 commented on issue #8890: [FLINK-12989][hive] Generate HiveTableSink 
from from a Hive table
URL: https://github.com/apache/flink/pull/8890#issuecomment-506793531
 
 
   Merging


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table

2019-06-28 Thread GitBox
bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate 
HiveTableSource from from a Hive table
URL: https://github.com/apache/flink/pull/8921#issuecomment-506793351
 
 
   There's a compile failure in flink-table-common. I don't think that's 
because of this PR, thus I restarted the build to see if that's fixed already 
by someone else.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)

2019-06-28 Thread GitBox
Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source 
connector with support for checkpointing (ATLEAST_ONCE)
URL: https://github.com/apache/flink/pull/6594#issuecomment-506787729
 
 
   @rmetzger it took a while for travis to run, but it just succeeded


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-28 Thread GitBox
zhijiangW commented on issue #8880: [FLINK-12986] [network] Fix instability of 
BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#issuecomment-506758658
 
 
   No other concerns on my side atm. LGTM! 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner

2019-06-28 Thread GitBox
asfgit closed pull request #8901: [FLINK-13003][table-planner-blink] Support 
Temporal TableFunction Join in blink planner
URL: https://github.com/apache/flink/pull/8901
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-13003) Support Temporal TableFunction Join in processing time and event time

2019-06-28 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-13003.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

Fixed in 1.9.0: 4b7ca247431e727ac0533cf767d43615bbaf07c0

> Support Temporal TableFunction Join in processing time and event time
> -
>
> Key: FLINK-13003
> URL: https://issues.apache.org/jira/browse/FLINK-13003
> Project: Flink
>  Issue Type: Task
>  Components: Table SQL / Planner
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This is a feature port from flink-planner to blink-planner to support 
> temporal TableFunction join (or called versioned joins).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime

2019-06-28 Thread GitBox
asfgit closed pull request #8757: [FLINK-12850][core] Introduce TypeInfo for 
LocalDate/LocalTime/LocalDateTime
URL: https://github.com/apache/flink/pull/8757
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12850) Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime

2019-06-28 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12850.
---
   Resolution: Fixed
Fix Version/s: 1.9.0
 Release Note: Support LocalDate & LocalTime & LocalDataTime 
TypeInformations.

Resolved in 1.9.0: 463cedddec1bd55545f6e3e39ada04229ef5ceb5

> Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
> 
>
> Key: FLINK-12850
> URL: https://issues.apache.org/jira/browse/FLINK-12850
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Type Serialization System
>Reporter: Jingsong Lee
>Assignee: Jingsong Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Now in the new type system of table, the default class of time type is 
> LocalDate and so on.
> There are some situations that need to be converted to TypeInformation, such 
> as toDataStream, so we need to provide TypeInformation support such as 
> LocalDate.
> Introduce LocalTimeTypeInfo
> Introduce LocalDateSerializer, LocalTimeSerializer, LocalDateTimeSerializer
> Introduce LocalDateComparator, LocalTimeComparator, LocalDateTimeComparator



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-12592) StreamTableEnvironment object has no attribute connect

2019-06-28 Thread sunjincheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng closed FLINK-12592.
---
   Resolution: Fixed
Fix Version/s: 1.9.0

This problem has disappeared, So close this JIRA

> StreamTableEnvironment object has no attribute connect
> --
>
> Key: FLINK-12592
> URL: https://issues.apache.org/jira/browse/FLINK-12592
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Tests
>Affects Versions: 1.9.0
>Reporter: Till Rohrmann
>Assignee: sunjincheng
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0
>
> Attachments: image-2019-05-24-10-55-04-214.png, 
> image-2019-05-24-10-55-14-534.png
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The Python build module failed on Travis with the following problem: 
> {{'StreamTableEnvironment' object has no attribute 'connect'}}.
> https://api.travis-ci.org/v3/job/535684431/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] asfgit closed pull request #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-28 Thread GitBox
asfgit closed pull request #8910: [hotfix][python] Aligns with Java Table API 
by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-28 Thread GitBox
flinkbot edited a comment on issue #8910: [hotfix][python] Aligns with Java 
Table API by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506250830
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @sunjincheng121 [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @sunjincheng121 [committer]
   * ❓ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @sunjincheng121 [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @sunjincheng121 [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-28 Thread GitBox
sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java 
Table API by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506706349
 
 
   Thanks for the update @dianfu 
   +1 to merged.
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wenhuitang commented on a change in pull request #8711: [FLINK-12817][docs]Correct the import in Processing function example.

2019-06-28 Thread GitBox
wenhuitang commented on a change in pull request #8711: 
[FLINK-12817][docs]Correct the import in Processing function example.
URL: https://github.com/apache/flink/pull/8711#discussion_r298561043
 
 

 ##
 File path: docs/dev/stream/operators/process_function.md
 ##
 @@ -97,9 +97,7 @@ import 
org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 
 Review comment:
   Thanks a lot, I have addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.

2019-06-28 Thread GitBox
StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability 
of BoundedBlockingSubpartition under memory pressure.
URL: https://github.com/apache/flink/pull/8880#issuecomment-506702770
 
 
   @zhijiangW and @wsry I would merge this now unless you have further comments 
I should address first.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9036) Add default value via suppliers

2019-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9036:
--
Labels: pull-request-available  (was: )

> Add default value via suppliers
> ---
>
> Key: FLINK-9036
> URL: https://issues.apache.org/jira/browse/FLINK-9036
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
>
> Earlier versions had a default value in {{ValueState}}. We dropped that, 
> because the value would have to be duplicated on each access, to be safe 
> against side effects when using mutable types.
> For convenience, we should re-add the feature, but using a supplier/factory 
> function to create the default value on access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] StephanEwen closed pull request #5735: [FLINK-9036] [core] Add default values to State Descriptors via suppliers

2019-06-28 Thread GitBox
StephanEwen closed pull request #5735:  [FLINK-9036] [core] Add default values 
to State Descriptors via suppliers
URL: https://github.com/apache/flink/pull/5735
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes

2019-06-28 Thread GitBox
zhijiangW commented on issue #8924: [hotfix][docs] Update the generated 
configuration docs for previous changes
URL: https://github.com/apache/flink/pull/8924#issuecomment-506698607
 
 
   Thanks for the review @zentol !
   I have addressed the above comments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] sunjincheng121 edited a comment on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-28 Thread GitBox
sunjincheng121 edited a comment on issue #8910: [hotfix][python] Aligns with 
Java Table API by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506690440
 
 
   Hi, I think we also need to update the `shell.py`, the test fails in 
`shell.py`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298552634
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private static final int DEFAULT_PARALLELISM = 2;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   manuallyTriggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   }
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final ExecutionGraph eg = createExecutionGraph();
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+   final Iterator vertexIterator = 
eg.getAllExecutionVertices().iterator();
+   final ExecutionVertex ev11 = vertexIter

[GitHub] [flink] dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-28 Thread GitBox
dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by 
removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506697353
 
 
   @sunjincheng121 Thanks a lot for the review. Updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298552634
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private static final int DEFAULT_PARALLELISM = 2;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   manuallyTriggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   }
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final ExecutionGraph eg = createExecutionGraph();
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+   final Iterator vertexIterator = 
eg.getAllExecutionVertices().iterator();
+   final ExecutionVertex ev11 = vertexIter

[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298552634
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private static final int DEFAULT_PARALLELISM = 2;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   manuallyTriggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   }
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final ExecutionGraph eg = createExecutionGraph();
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+   final Iterator vertexIterator = 
eg.getAllExecutionVertices().iterator();
+   final ExecutionVertex ev11 = vertexIter

[GitHub] [flink] flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-06-28 Thread GitBox
flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock 
occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925#issuecomment-506695220
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel

2019-06-28 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12852:
---
Labels: pull-request-available  (was: )

> Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
> --
>
> Key: FLINK-12852
> URL: https://issues.apache.org/jira/browse/FLINK-12852
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.9.0
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>  Labels: pull-request-available
>
> When running tests with an upstream vertex and downstream vertex, deadlock 
> occurs when submitting the job:
> {code:java}
> "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 
> nid=0x38845 waiting on condition [0x7f2cbe9fe000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00073ed6b6f0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180)
> at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312)
> - locked <0x00073fbc81f0> (a java.lang.Object)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220)
> at 
> org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598)
> at java.lang.Thread.run(Thread.java:834)
> {code}
> This is due to the required and max of local buffer pool is not the same and 
> there may be over-allocation, when assignExclusiveSegments there are no 
> available memory.
>  
> The detail of the scenarios is as follows: The parallelism of both upstream 
> vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM 
> and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs 
> 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with 
> local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces 
> data quickly and each occupy about 990 buffers. Then the DownStream task 
> starts and try to assigning exclusive buffers for 1500 -9 = 1491 
> InputChannels. It requires 2981 buffers but only 1786 left. Since not all 
> downstream tasks can start, the job will be blocked finally and no buffer can 
> be released, and the deadlock finally occurred.
>  
> I think although increasing the network memory solves the problem, the 
> deadlock may not be acceptable.  Fined grained resource management  
> Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the 
> network memory into the ResourceProfile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] gaoyunhaii opened a new pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers

2019-06-28 Thread GitBox
gaoyunhaii opened a new pull request #8925: [FLINK-12852][network] Fix the 
deadlock occured when requesting exclusive buffers
URL: https://github.com/apache/flink/pull/8925
 
 
   ## What is the purpose of the change
   
   This pull request tries to fix the deadlock problem during requesting 
exclusive buffers. Since currently the number of maximum buffers and the number 
of required buffers are not the same for local buffer pools, there may be cases 
that the local buffer pools of the upstream tasks occupy all the buffers while 
the downstream tasks fail to acquire exclusive buffers to make progress. 
Although this problem can be fixed by increasing the number of total buffers, 
the deadlock may not be acceptable. Therefore, this PR tries to failover the 
current execution when the deadlock occurs and tips users to increase the 
number of buffers in the exceptional message.
   
   ## Brief change log
   
   The main changes include
   
   1. Add an option for the timeout of `requestMemorySegment` for each channel. 
The default timeout is 30s. This option is marked as undocumented since it may 
be removed within the future implementation.
   2. Transfer the timeout to `NetworkBufferPool`.
   3. `requestMemorySegments` will throw `IOException("Insufficient buffer")`  
if not all segments acquired after timeout.
   
   ## Verifying this change
   
   1. Added test that validates `requestMemorySegments` end exceptionally if 
not all segments acquired after timeout.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] asfgit closed pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser

2019-06-28 Thread GitBox
asfgit closed pull request #8850: [FLINK-12954] Supports create(drop) view 
grammar for sql parser
URL: https://github.com/apache/flink/pull/8850
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12954) Supports create(drop) view grammar for sql parser

2019-06-28 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu closed FLINK-12954.
---
Resolution: Fixed

Fixed in 1.9.0: 71570bc9a5dc549a34b020981e5cf87c479658e9

> Supports create(drop) view grammar for sql parser
> -
>
> Key: FLINK-12954
> URL: https://issues.apache.org/jira/browse/FLINK-12954
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Danny Chan
>Assignee: Danny Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Add watermark to create table; Also add create view grammar



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config

2019-06-28 Thread GitBox
sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java 
Table API by removing methods exec_env and query_config
URL: https://github.com/apache/flink/pull/8910#issuecomment-506690440
 
 
   Hi, I think we also need to update the `shell.py`, What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298543321
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##
 @@ -876,6 +889,15 @@ private void jobStatusChanged(
validateRunsInMainThread();
 
if (newJobStatus.isGloballyTerminalState()) {
+   // other terminal job states are handled by the 
executions
+   if (newJobStatus == JobStatus.FINISHED) {
+   runAsync(() -> {
+   for (Map.Entry> entry : 
registeredTaskManagers.entrySet()) {
 
 Review comment:
   
`registeredTaskManagers.keySet().forEach(partitionTracker::stopTrackingAndReleaseAllPartitions);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298518652
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
+   private final Map> 
producersByExecutor = new HashMap<>();
+
+   private final ShuffleMaster shuffleMaster;
+
+   private PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+
+   public PartitionTrackerImpl(
+   final JobID jobId,
+   final ShuffleMaster shuffleMaster,
+   final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup) {
+
+   this.jobId = Preconditions.checkNotNull(jobId);
+   this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
+   this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+   }
+
+   @Override
+   public CompletableFuture 
registerPartition(
+   final PartitionDescriptor partitionDescriptor,
+   final ProducerDescriptor producerDescriptor,
+   final ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory) {
+
+   CompletableFuture 
shuffleDescriptorFuture = 
shuffleMaster.registerPartitionWithProducer(partitionDescriptor, 
producerDescriptor);
+
+   return shuffleDescriptorFuture
+   .thenApply(shuffleDescriptor ->
+   
resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor,
 shuffleDescriptor))
+   .thenApply(deploymentDescriptor -> {
+   
startTrackingPartition(producerDescriptor.getProducerLocation(), 
deploymentDescriptor);
+   return deploymentDescriptor;
+   });
+   }
+
+   @Override
+   public Collection 
getTrackedDeploymentDescriptors(final ExecutionAttemptID producerId) {
+   final InternalPartitionInfo partitionInfo = 
partitionInfoByProducer.get(producerId);
+   if (partitionInfo == null) {
+   return Collections.emptyList();
+   } else {
+   return 
partitionInfo.getResultPartitionDeploymentDescriptors();
+   }
+   }
+
+   private void startTrackingPartition(final ResourceID 
producingTaskExecutorId, final ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor) {
+   Preconditions.checkNotNull(producingTaskExecutorId);
+   Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
+
+   final ResultPartitionID resultPartitionId = 
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
+   addPartition(producingTaskExecutorId, 
resultP

[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298500217
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
+   private final Map> 
producersByExecutor = new HashMap<>();
+
+   private final ShuffleMaster shuffleMaster;
+
+   private PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+
+   public PartitionTrackerImpl(
+   final JobID jobId,
+   final ShuffleMaster shuffleMaster,
+   final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup) {
+
+   this.jobId = Preconditions.checkNotNull(jobId);
+   this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
+   this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+   }
+
+   @Override
+   public CompletableFuture 
registerPartition(
+   final PartitionDescriptor partitionDescriptor,
+   final ProducerDescriptor producerDescriptor,
+   final ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory) {
+
+   CompletableFuture 
shuffleDescriptorFuture = 
shuffleMaster.registerPartitionWithProducer(partitionDescriptor, 
producerDescriptor);
 
 Review comment:
   nit: any value in this local var?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298494696
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
 
 Review comment:
   I would prefer all initialisations in the constructor


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298537362
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -285,7 +281,13 @@ public LogicalSlot getAssignedResource() {
 
public Optional 
getResultPartitionDeploymentDescriptor(
IntermediateResultPartitionID id) {
-   return Optional.ofNullable(producedPartitions.get(id));
+   Collection 
trackedDeploymentDescriptors = 
vertex.getExecutionGraph().getPartitionTracker().getTrackedDeploymentDescriptors(getAttemptId());
+   for (ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor : trackedDeploymentDescriptors) {
 
 Review comment:
   anyways I think the method rather belongs to the tracker with the exec id 
added to the signature


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298524076
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
+   private final Map> 
producersByExecutor = new HashMap<>();
+
+   private final ShuffleMaster shuffleMaster;
+
+   private PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+
+   public PartitionTrackerImpl(
+   final JobID jobId,
+   final ShuffleMaster shuffleMaster,
+   final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup) {
+
+   this.jobId = Preconditions.checkNotNull(jobId);
+   this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
+   this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+   }
+
+   @Override
+   public CompletableFuture 
registerPartition(
+   final PartitionDescriptor partitionDescriptor,
+   final ProducerDescriptor producerDescriptor,
+   final ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory) {
+
+   CompletableFuture 
shuffleDescriptorFuture = 
shuffleMaster.registerPartitionWithProducer(partitionDescriptor, 
producerDescriptor);
+
+   return shuffleDescriptorFuture
+   .thenApply(shuffleDescriptor ->
+   
resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor,
 shuffleDescriptor))
+   .thenApply(deploymentDescriptor -> {
+   
startTrackingPartition(producerDescriptor.getProducerLocation(), 
deploymentDescriptor);
+   return deploymentDescriptor;
+   });
+   }
+
+   @Override
+   public Collection 
getTrackedDeploymentDescriptors(final ExecutionAttemptID producerId) {
+   final InternalPartitionInfo partitionInfo = 
partitionInfoByProducer.get(producerId);
+   if (partitionInfo == null) {
+   return Collections.emptyList();
+   } else {
+   return 
partitionInfo.getResultPartitionDeploymentDescriptors();
+   }
+   }
+
+   private void startTrackingPartition(final ResourceID 
producingTaskExecutorId, final ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor) {
+   Preconditions.checkNotNull(producingTaskExecutorId);
+   Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
+
+   final ResultPartitionID resultPartitionId = 
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
+   addPartition(producingTaskExecutorId, 
resultP

[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298524932
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public interface PartitionTracker {
+
+   /**
+* Asynchronously starts tracking the given partition.
+*
+* @param partition descriptor for the partition
+* @param producerDescriptor descriptor for the producer
+* @param resultPartitionDeploymentDescriptorFactory factory for the 
ResultPartitionDeploymentDescriptor
+* @return future containing the assembled result partition deployment 
descriptor
+*/
+   CompletableFuture 
registerPartition(
+   PartitionDescriptor partition,
+   ProducerDescriptor producerDescriptor,
+   ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory);
+
+   /**
+* Returns all tracked partitions for the given producer.
+*
+* @param producerId producer to return tracked partitions for
+* @return tracked partitions for the given producer
+*/
+   Collection 
getTrackedDeploymentDescriptors(ExecutionAttemptID producerId);
+
+   /**
+* Stops the tracking of all partitions for the given task executor ID, 
without issuing any release calls.
+*
+* @param producingTaskExecutorId task executor id to determine 
relevant partitions
+*/
+   void stopTrackingAllPartitions(ResourceID producingTaskExecutorId);
+
+   /**
+* Releases all partitions for the given producer, and stop the 
tracking of partitions that were released.
+*
+* @param producerId producer id to determine relevant partitions
+*/
+   void stopTrackingAndReleasePartitions(final ExecutionAttemptID 
producerId);
+
+   /**
+* Releases all partitions for the given task executor ID, and stop the 
tracking of partitions that were released.
+*
+* @param producingTaskExecutorId task executor id to determine 
relevant partitions
+*/
+   void stopTrackingAndReleaseAllPartitions(ResourceID 
producingTaskExecutorId);
+
+   /**
+* Returns whether any partition is being tracked for the given task 
executor ID.
+*
+* @param producingTaskExecutorId task executor id to determine 
relevant partitions
+*/
+   boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId);
+
+   /**
+* Factory for creating a {@link ResultPartitionDeploymentDescriptor} 
from a given
+* {@link PartitionDescriptor}/{@link ShuffleDescriptor} pair.
+*/
+   interface ResultPartitionDeploymentDescriptorFactory {
 
 Review comment:
   @FunctionalInterface


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298527316
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
 ##
 @@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public interface PartitionTracker {
+
+   /**
+* Asynchronously starts tracking the given partition.
+*
+* @param partition descriptor for the partition
+* @param producerDescriptor descriptor for the producer
+* @param resultPartitionDeploymentDescriptorFactory factory for the 
ResultPartitionDeploymentDescriptor
+* @return future containing the assembled result partition deployment 
descriptor
+*/
+   CompletableFuture 
registerPartition(
+   PartitionDescriptor partition,
+   ProducerDescriptor producerDescriptor,
+   ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory);
+
+   /**
+* Returns all tracked partitions for the given producer.
+*
+* @param producerId producer to return tracked partitions for
+* @return tracked partitions for the given producer
+*/
+   Collection 
getTrackedDeploymentDescriptors(ExecutionAttemptID producerId);
+
+   /**
+* Stops the tracking of all partitions for the given task executor ID, 
without issuing any release calls.
+*
+* @param producingTaskExecutorId task executor id to determine 
relevant partitions
+*/
+   void stopTrackingAllPartitions(ResourceID producingTaskExecutorId);
+
+   /**
+* Releases all partitions for the given producer, and stop the 
tracking of partitions that were released.
+*
+* @param producerId producer id to determine relevant partitions
+*/
+   void stopTrackingAndReleasePartitions(final ExecutionAttemptID 
producerId);
 
 Review comment:
   atm it is stop all for exec id and release what needs release.
   I think comment needs adjustment and function could be called:
   `stopTrackingAndReleasePartitionsForExecution`
   same for the next method, stop all for task executor and release what needs 
release:
   `stopTrackingAndReleasePartitionsOnTaskExecutor` (release all is a bit 
confusing)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298538571
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
+   private final Map> 
producersByExecutor = new HashMap<>();
+
+   private final ShuffleMaster shuffleMaster;
+
+   private PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+
+   public PartitionTrackerImpl(
+   final JobID jobId,
+   final ShuffleMaster shuffleMaster,
+   final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup) {
+
+   this.jobId = Preconditions.checkNotNull(jobId);
+   this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
+   this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+   }
+
+   @Override
+   public CompletableFuture 
registerPartition(
+   final PartitionDescriptor partitionDescriptor,
+   final ProducerDescriptor producerDescriptor,
+   final ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory) {
+
+   CompletableFuture 
shuffleDescriptorFuture = 
shuffleMaster.registerPartitionWithProducer(partitionDescriptor, 
producerDescriptor);
+
+   return shuffleDescriptorFuture
+   .thenApply(shuffleDescriptor ->
+   
resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor,
 shuffleDescriptor))
+   .thenApply(deploymentDescriptor -> {
+   
startTrackingPartition(producerDescriptor.getProducerLocation(), 
deploymentDescriptor);
 
 Review comment:
   We should have thought about a batch registration in shuffle master 
probably.. as we are joining on all of them anyways


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298539553
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ##
 @@ -94,7 +94,8 @@ public static ExecutionGraph buildGraph(
BlobWriter blobWriter,
Time allocationTimeout,
Logger log,
-   ShuffleMaster shuffleMaster) throws 
JobExecutionException, JobException {
+   PartitionTracker partitionTracker)
+   throws JobExecutionException, JobException {
 
 Review comment:
   keep formatting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298493297
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.util.Optional;
+
+/**
+ * Factory for {@link PartitionTracker}.
+ */
+public interface PartitionTrackerFactory {
+
+   /**
+* Creates a new PartitionTracker.
+*
+* @param taskExecutorGatewayLookup lookup function to access task 
executor gateways
+* @return created PartitionTracker
+*/
+   PartitionTracker create(TaskExecutorGatewayLookup 
taskExecutorGatewayLookup);
+
+   /**
+* Lookup function for {@link TaskExecutorGateway}.
+*/
+   interface TaskExecutorGatewayLookup {
 
 Review comment:
   `@FunctionalInterface`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298534681
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.util.Optional;
+
+/**
+ * Factory for {@link PartitionTracker}.
+ */
+public interface PartitionTrackerFactory {
+
+   /**
+* Creates a new PartitionTracker.
+*
+* @param taskExecutorGatewayLookup lookup function to access task 
executor gateways
+* @return created PartitionTracker
+*/
+   PartitionTracker create(TaskExecutorGatewayLookup 
taskExecutorGatewayLookup);
+
+   /**
+* Lookup function for {@link TaskExecutorGateway}.
+*/
+   interface TaskExecutorGatewayLookup {
+
+   /**
+* Returns a {@link TaskExecutorGateway} corresponding to the 
given ResourceID.
+*
+* @param taskExecutorId id of the task executor to look up.
+* @return optional task executor gateway
+*/
+   Optional lookup(ResourceID taskExecutorId);
 
 Review comment:
   I would prefer a partition releaser:
   ```
   interface TaskExecutorPartitionReleaser {
   void releasePartitions(ResourceID taskExecutorId, JobID jobId, 
Collection partitionIds);
   }
   ```
   could also return boolean whether TE found, although not used anyways
   do we need the full `TaskExecutorGateway` for something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298495055
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
+   private final Map> 
producersByExecutor = new HashMap<>();
+
+   private final ShuffleMaster shuffleMaster;
+
+   private PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
 
 Review comment:
   final
   also any value in explicit prefixing over a static import of 
`PartitionTrackerFactory`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298493519
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java
 ##
 @@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.util.Optional;
+
+/**
+ * Factory for {@link PartitionTracker}.
+ */
+public interface PartitionTrackerFactory {
 
 Review comment:
   @FunctionalInterface


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298509021
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -285,7 +281,13 @@ public LogicalSlot getAssignedResource() {
 
public Optional 
getResultPartitionDeploymentDescriptor(
IntermediateResultPartitionID id) {
-   return Optional.ofNullable(producedPartitions.get(id));
+   Collection 
trackedDeploymentDescriptors = 
vertex.getExecutionGraph().getPartitionTracker().getTrackedDeploymentDescriptors(getAttemptId());
+   for (ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor : trackedDeploymentDescriptors) {
 
 Review comment:
   I do not really see a problem atm to have it in `InternalPartitionInfo` with 
a similar add partition method and Collection -> Map


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298540751
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -1325,22 +1324,8 @@ private void sendCancelRpcCall(int numberRetries) {
 
private void sendReleaseIntermediateResultPartitionsRpcCall() {
LOG.info("Discarding the results produced by task execution 
{}.", attemptId);
-   final LogicalSlot slot = assignedResource;
-
-   if (slot != null) {
-   final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
-
-   Collection partitions = 
vertex.getProducedPartitions().values();
-   Collection partitionIds = new 
ArrayList<>(partitions.size());
-   for (IntermediateResultPartition partition : 
partitions) {
-   partitionIds.add(new 
ResultPartitionID(partition.getPartitionId(), attemptId));
-   }
-
-   if (!partitionIds.isEmpty()) {
-   // TODO For some tests this could be a problem 
when querying too early if all resources were released
-   
taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
-   }
-   }
+   final PartitionTracker partitionTracker = 
getVertex().getExecutionGraph().getPartitionTracker();
+   
partitionTracker.stopTrackingAndReleasePartitions(getAttemptId());
 
 Review comment:
   looks like we are still doing it in cancel and and suspend


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM

2019-06-28 Thread GitBox
azagrebin commented on a change in pull request #8778: 
[FLINK-12615][coordination] Track partitions on JM
URL: https://github.com/apache/flink/pull/8778#discussion_r298504283
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 ##
 @@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+   private final JobID jobId;
+
+   private final Map 
partitionInfoByProducer = new HashMap<>();
+   private final Map> 
producersByExecutor = new HashMap<>();
+
+   private final ShuffleMaster shuffleMaster;
+
+   private PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+
+   public PartitionTrackerImpl(
+   final JobID jobId,
+   final ShuffleMaster shuffleMaster,
+   final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup) {
+
+   this.jobId = Preconditions.checkNotNull(jobId);
+   this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
+   this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+   }
+
+   @Override
+   public CompletableFuture 
registerPartition(
+   final PartitionDescriptor partitionDescriptor,
+   final ProducerDescriptor producerDescriptor,
+   final ResultPartitionDeploymentDescriptorFactory 
resultPartitionDeploymentDescriptorFactory) {
+
+   CompletableFuture 
shuffleDescriptorFuture = 
shuffleMaster.registerPartitionWithProducer(partitionDescriptor, 
producerDescriptor);
+
+   return shuffleDescriptorFuture
+   .thenApply(shuffleDescriptor ->
+   
resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor,
 shuffleDescriptor))
+   .thenApply(deploymentDescriptor -> {
+   
startTrackingPartition(producerDescriptor.getProducerLocation(), 
deploymentDescriptor);
 
 Review comment:
   At the moment, there is no requirement that shuffle master returns the 
descriptor into the main thread which is not even given to the shuffle master. 
This was the reason why `Execution.registerProducedPartitions` (non-static) 
modified its state in `getJobMasterMainThreadExecutor`. I would give the 
`JobMasterMainThreadExecutor` to `PartitionTrackerImpl` and do 
`assertRunningInJobMasterMainThread();` for state modifying methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8811: [FLINK-12777][network] Support CheckpointBarrierHandler in StreamTwoInputSelectableProcessor

2019-06-28 Thread GitBox
pnowojski commented on a change in pull request #8811: [FLINK-12777][network] 
Support CheckpointBarrierHandler in StreamTwoInputSelectableProcessor
URL: https://github.com/apache/flink/pull/8811#discussion_r298540216
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Implementation of {@link BufferStorage} that links two {@link 
BufferStorage} together.
+ * Each of the linked {@link BufferStorage} will store buffers independently, 
but they will be
+ * linked together for {@link #rollOver()} - if one is rolled over, other will 
do that as well.
+ */
+public class LinkedBufferStorage implements BufferStorage {
 
 Review comment:
   I see your point. I think java doc above the class explains this:
   > Each of the linked {@link BufferStorage} will store buffers independently
   
   which is not ideal (self documenting code > comments), but as we don't have 
a better proposal, I will leave it as it is. 
   
   If someone will come up with a better name in the future, it's always easy 
to rename internal classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298539105
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -63,6 +66,9 @@
return new RestartAllStrategy.Factory();
 
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+   return new 
AdaptedRestartPipelinedRegionStrategyNG.Factory();
 
 Review comment:
   fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on issue #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode

2019-06-28 Thread GitBox
pnowojski commented on issue #8455: 
[FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage 
metric in credit-based network mode
URL: https://github.com/apache/flink/pull/8455#issuecomment-506683902
 
 
   > In addition, one PR should not connect to two jira tickets in general. So 
I think you might need to close one jira and adjust this PR to only one jira.
   
   I wouldn't be so strict about having couple of jira tickets in one PR, 
especially if they depend on one another. However as a rule of thumb one commit 
shouldn't be addressing two jira tickets, since probably it means that either:
   - commit is fixing two separate things
   - Jira tickets are duplicated
   
   As far as I recall this is inline with our coding style guideline. Of course 
it's better to have separate things fixed in independent PRs.
   
   Here I think it's the case that "Jira tickets are duplicated". In my opinion 
`Add metrics for floatingBufferUsage and exclusiveBufferUsage for credit based 
mode` is a solution for `InputBufferPoolUsage is incorrect in credit-based 
network control flow`. So I would close the newer ticket as duplicate of the 
older one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298538461
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private static final int DEFAULT_PARALLELISM = 2;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   manuallyTriggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   }
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final ExecutionGraph eg = createExecutionGraph();
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+   final Iterator vertexIterator = 
eg.getAllExecutionVertices().iterator();
+   final ExecutionVertex ev11 = vertexIterator.

[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298538576
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private FailingSlotProviderDecorator slotProvider;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   slotProvider = new FailingSlotProviderDecorator(new 
SimpleSlotProvider(TEST_JOB_ID, 14));
+   }
+
+   /**
+* Tests for region failover for job in EAGER mode.
+* This applies to streaming job, with no BLOCKING edge.
+* 
+* (v11) ---> (v21)
+*
+* (v12) ---> (v22)
+*
+*^
+* 

[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298538537
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -208,7 +211,7 @@ public void testFailurePropagationToUnderlyingStrategy() 
throws Exception {
final ExecutionVertex ev21 = vertexIterator.next();
 
// trigger downstream regions to schedule
-   testingMainThreadExecutor.execute(() -> {
+   componentMainThreadExecutor.execute(() -> {
 
 Review comment:
   Thanks, fixed it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298536587
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private FailingSlotProviderDecorator slotProvider;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   slotProvider = new FailingSlotProviderDecorator(new 
SimpleSlotProvider(TEST_JOB_ID, 14));
+   }
+
+   /**
+* Tests for region failover for job in EAGER mode.
+* This applies to streaming job, with no BLOCKING edge.
+* 
+* (v11) ---> (v21)
+*
+* (v12) ---> (v22)
+*
+*^
+* 

[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an 
adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298536587
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private FailingSlotProviderDecorator slotProvider;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   slotProvider = new FailingSlotProviderDecorator(new 
SimpleSlotProvider(TEST_JOB_ID, 14));
+   }
+
+   /**
+* Tests for region failover for job in EAGER mode.
+* This applies to streaming job, with no BLOCKING edge.
+* 
+* (v11) ---> (v21)
+*
+* (v12) ---> (v22)
+*
+*^
+* 

[GitHub] [flink] dawidwys merged pull request #8876: [FLINK-12985][core][streaming] Rename StreamTransformation to org.apache.flink.api.dag.Transformation

2019-06-28 Thread GitBox
dawidwys merged pull request #8876: [FLINK-12985][core][streaming] Rename 
StreamTransformation to org.apache.flink.api.dag.Transformation
URL: https://github.com/apache/flink/pull/8876
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12985) Move & rename StreamTransformation to org.apache.flink.api.dag.Transformation

2019-06-28 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-12985.

Resolution: Fixed

Implemented in fd511c345eac31f03b801ff19dbf1f8c86aae760

> Move & rename StreamTransformation to org.apache.flink.api.dag.Transformation
> -
>
> Key: FLINK-12985
> URL: https://issues.apache.org/jira/browse/FLINK-12985
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> It is quite strange to have a streaming package in {{flink-core}}. Moreover 
> right now the {{StreamTransformation#setChainingStrategy}} throws Exception 
> in half of the transformations.
> I suggest to:
> # rename the {{StreamTransformation}} to 
> {{org.apache.flink.api.dag.Transformation}}
> # extract {{#setChainingMethod}} to a separate class 
> {{PhysicalTransformation}} & move the {{ChainingStrategy}} back to the 
> flink-streaming-java module



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298517179
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private FailingSlotProviderDecorator slotProvider;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   slotProvider = new FailingSlotProviderDecorator(new 
SimpleSlotProvider(TEST_JOB_ID, 14));
+   }
+
+   /**
+* Tests for region failover for job in EAGER mode.
+* This applies to streaming job, with no BLOCKING edge.
+* 
+* (v11) ---> (v21)
+*
+* (v12) ---> (v22)
+*
+*^
+ 

[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298529362
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -208,7 +211,7 @@ public void testFailurePropagationToUnderlyingStrategy() 
throws Exception {
final ExecutionVertex ev21 = vertexIterator.next();
 
// trigger downstream regions to schedule
-   testingMainThreadExecutor.execute(() -> {
+   componentMainThreadExecutor.execute(() -> {
 
 Review comment:
   can invoke `markFinished` directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] 
Add an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298524268
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java
 ##
 @@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling 
when concurrent failovers happen.
+ * There can be local+local and local+global concurrent failovers.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest 
extends TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private static final int DEFAULT_PARALLELISM = 2;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private TestRestartStrategy manuallyTriggeredRestartStrategy;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   manuallyTriggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+   }
+
+   /**
+* Tests that 2 concurrent region failovers can lead to a properly 
vertex state.
+* 
+* (v11) -+-> (v21)
+*x
+* (v12) -+-> (v22)
+*
+*^
+*|
+*   (blocking)
+* 
+*/
+   @Test
+   public void testConcurrentRegionFailovers() throws Exception {
+
+   // the logic in this test is as follows:
+   //  - start a job
+   //  - cause {ev11} failure and delay the local recovery action 
via the manual executor
+   //  - cause {ev12} failure and delay the local recovery action 
via the manual executor
+   //  - resume local recovery actions
+   //  - validate that each task is restarted only once
+
+   final ExecutionGraph eg = createExecutionGraph();
+
+   final TestAdaptedRestartPipelinedRegionStrategyNG 
failoverStrategy =
+   (TestAdaptedRestartPipelinedRegionStrategyNG) 
eg.getFailoverStrategy();
+   failoverStrategy.setBlockerFuture(new CompletableFuture<>());
+
+   final Iterator vertexIterator = 
eg.getAllExecutionVertices().iterator();
+   final ExecutionVertex ev11 = vertexIter

[GitHub] [flink] flinkbot edited a comment on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes

2019-06-28 Thread GitBox
flinkbot edited a comment on issue #8924: [hotfix][docs] Update the generated 
configuration docs for previous changes
URL: https://github.com/apache/flink/pull/8924#issuecomment-506655381
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❗ 3. Needs [attention] from.
   - Needs attention by @azagrebin, @zentol [PMC]
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add 
an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298517846
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private FailingSlotProviderDecorator slotProvider;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   slotProvider = new FailingSlotProviderDecorator(new 
SimpleSlotProvider(TEST_JOB_ID, 14));
+   }
+
+   /**
+* Tests for region failover for job in EAGER mode.
+* This applies to streaming job, with no BLOCKING edge.
+* 
+* (v11) ---> (v21)
+*
+* (v12) ---> (v22)
+*
+*^
+   

[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add 
an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298503935
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
 ##
 @@ -63,6 +66,9 @@
return new RestartAllStrategy.Factory();
 
case PIPELINED_REGION_RESTART_STRATEGY_NAME:
+   return new 
AdaptedRestartPipelinedRegionStrategyNG.Factory();
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add 
an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298508743
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java
 ##
 @@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}.
+ */
+public class 
AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends 
TestLogger {
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   }
+
+   @Test
+   public void abortPendingCheckpointsWhenRestartingTasks() throws 
Exception {
+   final JobGraph jobGraph = createStreamingJobGraph();
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph);
+
+   final Iterator vertexIterator = 
executionGraph.getAllExecutionVertices().iterator();
+   final ExecutionVertex onlyExecutionVertex = 
vertexIterator.next();
+
+   setTaskRunning(executionGraph, onlyExecutionVertex);
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   checkState(checkpointCoordinator != null);
+
+   
checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(),  false);
+   final int pendingCheckpointsBeforeFailure = 
checkpointCoordinator.getNumberOfPendingCheckpoints();
+
+   failVertex(onlyExecutionVertex);
+
+   assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1)));
+   assertNoPendingCheckpoints(checkpointCoordinator);
+   }
+
+   private void setTaskRunning(final ExecutionGraph executionGra

[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add 
an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298520665
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 ##
 @@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import 
org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG;
+import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import 
org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling.
+ */
+public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends 
TestLogger {
+
+   private static final JobID TEST_JOB_ID = new JobID();
+
+   private ComponentMainThreadExecutor componentMainThreadExecutor;
+
+   private FailingSlotProviderDecorator slotProvider;
+
+   private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor;
+
+   @Before
+   public void setUp() {
+   manualMainThreadExecutor = new 
ManuallyTriggeredScheduledExecutor();
+   componentMainThreadExecutor = new 
ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, 
Thread.currentThread());
+   slotProvider = new FailingSlotProviderDecorator(new 
SimpleSlotProvider(TEST_JOB_ID, 14));
+   }
+
+   /**
+* Tests for region failover for job in EAGER mode.
+* This applies to streaming job, with no BLOCKING edge.
+* 
+* (v11) ---> (v21)
+*
+* (v12) ---> (v22)
+*
+*^
+   

[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler

2019-06-28 Thread GitBox
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add 
an adapter of region failover NG for legacy scheduler
URL: https://github.com/apache/flink/pull/8922#discussion_r298526519
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ##
 @@ -939,123 +933,22 @@ public void scheduleForExecution() throws JobException {
}
}
 
-   private CompletableFuture scheduleLazy(SlotProvider slotProvider) 
{
-
-   final ArrayList> schedulingFutures = 
new ArrayList<>(numVerticesTotal);
-   // simply take the vertices without inputs.
-   for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-   if (ejv.getJobVertex().isInputVertex()) {
-   final CompletableFuture 
schedulingJobVertexFuture = ejv.scheduleAll(
-   slotProvider,
-   allowQueuedScheduling,
-   LocationPreferenceConstraint.ALL, // 
since it is an input vertex, the input based location preferences should be 
empty
-   Collections.emptySet());
-
-   
schedulingFutures.add(schedulingJobVertexFuture);
-   }
-   }
-
-   return FutureUtils.waitForAll(schedulingFutures);
+   private CompletableFuture scheduleLazy() {
+   final List executionVertices = StreamSupport
+   .stream(getAllExecutionVertices().spliterator(), false)
+   .collect(Collectors.toList());
+   return SchedulingUtils.scheduleLazy(executionVertices, this);
 
 Review comment:
   agreed


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter

2019-06-28 Thread GitBox
ifndef-SleePy commented on issue #8894:  [FLINK-12961][datastream] Providing an 
internal execution method of StreamExecutionEnvironment accepting StreamGraph 
as input parameter
URL: https://github.com/apache/flink/pull/8894#issuecomment-506671123
 
 
   @aljoscha
   I think you have raised a good question!
   
   AFAIK, Blink runner generates `StreamGraph` itself. And it also keeps 
transformations itself. When a `DataStream` is translated to a `Table`, the 
Blink batch runner back tracks the all transformations of 
`StreamExecutionEnvironment` through transformations kept by itself.
   
   Although there is a corner case that if there exists a disconnected sub 
graph in `StreamExecutionEnvironment` which might not be touched by back 
tracking. I have not reached an agreement with my colleague about this case.
   
   I'm wondering whether there is a better solution for Blink batch runner?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-13025) Elasticsearch 7.x support

2019-06-28 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874807#comment-16874807
 ] 

Aljoscha Krettek commented on FLINK-13025:
--

If it is possible, i.e. if the interfaces are backwards compatible I'd like if 
we had a universal connector, yes.

> Elasticsearch 7.x support
> -
>
> Key: FLINK-13025
> URL: https://issues.apache.org/jira/browse/FLINK-13025
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch
>Affects Versions: 1.8.0
>Reporter: Keegan Standifer
>Priority: Major
>
> Elasticsearch 7.0.0 was released in April of 2019: 
> [https://www.elastic.co/blog/elasticsearch-7-0-0-released]
> The latest elasticsearch connector is 
> [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] yanghua commented on issue #8871: [FLINK-12976] Bump Kafka client version to 2.3.0 for universal Kafka connector

2019-06-28 Thread GitBox
yanghua commented on issue #8871: [FLINK-12976] Bump Kafka client version to 
2.3.0 for universal Kafka connector
URL: https://github.com/apache/flink/pull/8871#issuecomment-50645
 
 
   I commented some code for testing because I found if we close the Kafka 
producer the Transaction with id : 
`MockTask-002a002c-2` would enter `PrepareAbort`. The 
reason is `Aborting incomplete transaction due to shutdown` from the test log.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr closed pull request #8874: [FLINK-12968][table-common] Add a utility for logical type casts

2019-06-28 Thread GitBox
twalthr closed pull request #8874: [FLINK-12968][table-common] Add a utility 
for logical type casts
URL: https://github.com/apache/flink/pull/8874
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-13029) Remove expressionBridge from QueryOperations factories

2019-06-28 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-13029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-13029:
-
Summary: Remove expressionBridge from QueryOperations factories  (was: 
Remove expressionBridge from OperationTreeBuilder's factories)

> Remove expressionBridge from QueryOperations factories
> --
>
> Key: FLINK-13029
> URL: https://issues.apache.org/jira/browse/FLINK-13029
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.9.0
>
>
> Expression bridge is used to create a schema of QueryOperation. This is no 
> longer necessary with ResolvedExpressions in place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-13029) Remove expressionBridge from OperationTreeBuilder's factories

2019-06-28 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-13029:


 Summary: Remove expressionBridge from OperationTreeBuilder's 
factories
 Key: FLINK-13029
 URL: https://issues.apache.org/jira/browse/FLINK-13029
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.9.0


Expression bridge is used to create a schema of QueryOperation. This is no 
longer necessary with ResolvedExpressions in place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes

2019-06-28 Thread GitBox
flinkbot commented on issue #8924: [hotfix][docs] Update the generated 
configuration docs for previous changes
URL: https://github.com/apache/flink/pull/8924#issuecomment-506655381
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes

2019-06-28 Thread GitBox
zhijiangW commented on issue #8924: [hotfix][docs] Update the generated 
configuration docs for previous changes
URL: https://github.com/apache/flink/pull/8924#issuecomment-506655017
 
 
   @flinkbot attention @azagrebin @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW opened a new pull request #8924: [hotfix][docs] Update the generated configuration docs for previous changes

2019-06-28 Thread GitBox
zhijiangW opened a new pull request #8924: [hotfix][docs] Update the generated 
configuration docs for previous changes
URL: https://github.com/apache/flink/pull/8924
 
 
   ## What is the purpose of the change
   
   *Some configuration docs were forgot to update for previous changes, so run 
`generate-config-docs` for updating relevant ones.*
   
   ## Brief change log
   
 - *Update the generated blob_server_configuration.html*
 - *Add the new generated netty_shuffle_environment_configuration.html*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dawidwys commented on a change in pull request #8874: [FLINK-12968][table-common] Add a utility for logical type casts

2019-06-28 Thread GitBox
dawidwys commented on a change in pull request #8874: 
[FLINK-12968][table-common] Add a utility for logical type casts
URL: https://github.com/apache/flink/pull/8874#discussion_r298500052
 
 

 ##
 File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java
 ##
 @@ -206,7 +207,9 @@ private static AdaptedCallContext adaptArguments(
}
 
private static boolean canCast(DataType sourceDataType, DataType 
targetDataType) {
 
 Review comment:
   Just ignore my comment. I thought this is a public method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on issue #8874: [FLINK-12968][table-common] Add a utility for logical type casts

2019-06-28 Thread GitBox
twalthr commented on issue #8874: [FLINK-12968][table-common] Add a utility for 
logical type casts
URL: https://github.com/apache/flink/pull/8874#issuecomment-506650471
 
 
   Thanks @dawidwys. Will merge...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService

2019-06-28 Thread GitBox
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298498045
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##
 @@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ */
+public class MailboxProcessor {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+   /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+   private final Mailbox mailbox;
+
+   /** Executor-style facade for client code to submit actions to the 
mailbox. */
+   private final TaskMailboxExecutorService taskMailboxExecutor;
+
+   /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+   private final MailboxDefaultAction mailboxDefaultAction;
+
+   /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+   private boolean mailboxLoopRunning;
+
+   /**
+* Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+* default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+* Must only be accessed from mailbox thread.
+*/
+   private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+   /** Special action that is used to terminate the mailbox loop. */
+   private final Runnable mailboxPoisonLetter;
+
+   public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+   this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+   this.mailbox = new MailboxImpl(

  1   2   >