[GitHub] [flink] Myasuka commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side
Myasuka commented on issue #7942: [FLINK-11696][checkpoint] Avoid to send mkdir requests to DFS from task side URL: https://github.com/apache/flink/pull/7942#issuecomment-506924134 Build failed due to [FLINK-12164](https://issues.apache.org/jira/browse/FLINK-12164), re-trigger again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-13030) add more test case for blink planner
godfrey he created FLINK-13030: -- Summary: add more test case for blink planner Key: FLINK-13030 URL: https://issues.apache.org/jira/browse/FLINK-13030 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Reporter: godfrey he Assignee: godfrey he this issue aims to introduce more test cases from inner blink and flink planner to blink planner, which makes sure the functionality of blink planner could align with inner blink and flink planner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-12164) JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable
[ https://issues.apache.org/jira/browse/FLINK-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang reopened FLINK-12164: -- > JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable > > > Key: FLINK-12164 > URL: https://issues.apache.org/jira/browse/FLINK-12164 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Aljoscha Krettek >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > {code} > 07:28:23.957 [ERROR] Tests run: 24, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 8.968 s <<< FAILURE! - in > org.apache.flink.runtime.jobmaster.JobMasterTest > 07:28:23.957 [ERROR] > testJobFailureWhenTaskExecutorHeartbeatTimeout(org.apache.flink.runtime.jobmaster.JobMasterTest) > Time elapsed: 0.177 s <<< ERROR! > java.util.concurrent.ExecutionException: java.lang.Exception: Unknown > TaskManager 69a7c8c18a36069ff90a1eae8ec41066 > at > org.apache.flink.runtime.jobmaster.JobMasterTest.registerSlotsAtJobMaster(JobMasterTest.java:1746) > at > org.apache.flink.runtime.jobmaster.JobMasterTest.runJobFailureWhenTaskExecutorTerminatesTest(JobMasterTest.java:1670) > at > org.apache.flink.runtime.jobmaster.JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout(JobMasterTest.java:1630) > Caused by: java.lang.Exception: Unknown TaskManager > 69a7c8c18a36069ff90a1eae8ec41066 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12164) JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable
[ https://issues.apache.org/jira/browse/FLINK-12164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16875361#comment-16875361 ] Yun Tang commented on FLINK-12164: -- This problem happened again, here is the instance: [https://api.travis-ci.org/v3/job/551302724/log.txt] . > JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout is unstable > > > Key: FLINK-12164 > URL: https://issues.apache.org/jira/browse/FLINK-12164 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Reporter: Aljoscha Krettek >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > {code} > 07:28:23.957 [ERROR] Tests run: 24, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 8.968 s <<< FAILURE! - in > org.apache.flink.runtime.jobmaster.JobMasterTest > 07:28:23.957 [ERROR] > testJobFailureWhenTaskExecutorHeartbeatTimeout(org.apache.flink.runtime.jobmaster.JobMasterTest) > Time elapsed: 0.177 s <<< ERROR! > java.util.concurrent.ExecutionException: java.lang.Exception: Unknown > TaskManager 69a7c8c18a36069ff90a1eae8ec41066 > at > org.apache.flink.runtime.jobmaster.JobMasterTest.registerSlotsAtJobMaster(JobMasterTest.java:1746) > at > org.apache.flink.runtime.jobmaster.JobMasterTest.runJobFailureWhenTaskExecutorTerminatesTest(JobMasterTest.java:1670) > at > org.apache.flink.runtime.jobmaster.JobMasterTest.testJobFailureWhenTaskExecutorHeartbeatTimeout(JobMasterTest.java:1630) > Caused by: java.lang.Exception: Unknown TaskManager > 69a7c8c18a36069ff90a1eae8ec41066 > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] Myasuka commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState
Myasuka commented on issue #8909: [FLINK-13004][table-runtime-blink] Correct the logic of needToCleanupState in KeyedProcessFunctionWithCleanupState URL: https://github.com/apache/flink/pull/8909#issuecomment-506920344 @wuchong Thanks for your review, rebase with latest code to fix build error due to symbol not found. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service
xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service URL: https://github.com/apache/flink/pull/8889#discussion_r298778900 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -452,29 +493,15 @@ under the License. false - - commons-dbcp:commons-dbcp - commons-pool:commons-pool - commons-beanutils:commons-beanutils - com.fasterxml.jackson.core:* - com.jolbox:bonecp - org.apache.thrift:libthrift - org.datanucleus:* - org.antlr:antlr-runtime + *:* true - *:* - Review comment: This is fine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13021) unify catalog partition implementations
[ https://issues.apache.org/jira/browse/FLINK-13021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-13021: --- Labels: pull-request-available (was: ) > unify catalog partition implementations > --- > > Key: FLINK-13021 > URL: https://issues.apache.org/jira/browse/FLINK-13021 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / API >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations
flinkbot commented on issue #8926: [FLINK-13021][table][hive] unify catalog partition implementations URL: https://github.com/apache/flink/pull/8926#issuecomment-506901123 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 opened a new pull request #8926: [FLINK-13021][table][hive] unify catalog partition implementations
bowenli86 opened a new pull request #8926: [FLINK-13021][table][hive] unify catalog partition implementations URL: https://github.com/apache/flink/pull/8926 ## What is the purpose of the change This PR unifies catalog partition implementations. ## Brief change log - Unified `GenericCatalogPartition`, `HiveCatalogPartition`, and `AbstractCatalogPartition` into `CatalogPartitionImpl` - updated unit tests ## Verifying this change This change is already covered by existing tests, such as `GenericInMemoryCatalogTest`, `HiveCatalogGenericMetadataTest`, `HiveCatalogHiveMetadataTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service
xuefuz commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service URL: https://github.com/apache/flink/pull/8889#issuecomment-506900050 Thanks for the update. Looking at the required libraries, I'm wondering if it's reasonable to expect user to find those very specific libraries and put them in the classpath. The other question would be why only those are missing. For usability, it seems fine to me if we ask user to put flink-hive jar in the classpath. However, asking for a very specific jar like commons-logging seems having too much usability overhead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service
bowenli86 commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service URL: https://github.com/apache/flink/pull/8889#discussion_r298769864 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -452,29 +493,15 @@ under the License. false - - commons-dbcp:commons-dbcp - commons-pool:commons-pool - commons-beanutils:commons-beanutils - com.fasterxml.jackson.core:* - com.jolbox:bonecp - org.apache.thrift:libthrift - org.datanucleus:* - org.antlr:antlr-runtime + *:* true - *:* - Review comment: because we don't package any hive/hadoop dependencies anymore a long time ago. They are all provided, so we don't need to filter anything for them. This is a kind of historical leftover, and can be on its own PR if necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service
xuefuz commented on a change in pull request #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service URL: https://github.com/apache/flink/pull/8889#discussion_r298768463 ## File path: flink-connectors/flink-connector-hive/pom.xml ## @@ -452,29 +493,15 @@ under the License. false - - commons-dbcp:commons-dbcp - commons-pool:commons-pool - commons-beanutils:commons-beanutils - com.fasterxml.jackson.core:* - com.jolbox:bonecp - org.apache.thrift:libthrift - org.datanucleus:* - org.antlr:antlr-runtime + *:* true - *:* - Review comment: How come we don't need those any more? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations
bowenli86 commented on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations URL: https://github.com/apache/flink/pull/8919#issuecomment-506892579 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 removed a comment on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations
bowenli86 removed a comment on issue #8919: [FLINK-13022][table][hive] unify catalog function implementations URL: https://github.com/apache/flink/pull/8919#issuecomment-506892579 cc @xuefuz @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service
bowenli86 commented on issue #8889: [FLINK-12934][hive] add additional dependencies for flink-connector-hive to connect to remote hive metastore service URL: https://github.com/apache/flink/pull/8889#issuecomment-506859928 @xuefuz after considering it carefully, I decided to make them as `provided` dependency for now as flink-connector-hive doesn't necessarily need them for either compilation or testing. I added them as dependencies more as a reminder to users and developers that they need to add them into their classpath when running in production This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8904: [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table
bowenli86 commented on issue #8904: [FLINK-13005][hive] HiveCatalog should not add 'flink.is_generic' key for Hive table URL: https://github.com/apache/flink/pull/8904#issuecomment-506834286 @xuefuz I've updated the PR with new design as we discussed yesterday. Can you take another look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8800: [FLINK-12627][doc][sql client][hive] Document how to configure and use catalogs in SQL CLI
bowenli86 commented on issue #8800: [FLINK-12627][doc][sql client][hive] Document how to configure and use catalogs in SQL CLI URL: https://github.com/apache/flink/pull/8800#issuecomment-506827758 cc @xuefuz @wuchong @lirui-apache @zjuwangg This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
xuefuz commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921#issuecomment-506804032 Thanks for the review. Not sure it was due to the inclusion of #8890, but I rebased to master to retest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12989) Generate HiveTableSink from from a Hive table
[ https://issues.apache.org/jira/browse/FLINK-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12989: --- Labels: pull-request-available (was: ) > Generate HiveTableSink from from a Hive table > - > > Key: FLINK-12989 > URL: https://issues.apache.org/jira/browse/FLINK-12989 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > Labels: pull-request-available > > As a followup for FLINK-11480, this adds the conversion from a Hive table to > a table sink that's used for data connector writing side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8890: [FLINK-12989][hive] Generate HiveTableSink from from a Hive table
asfgit closed pull request #8890: [FLINK-12989][hive] Generate HiveTableSink from from a Hive table URL: https://github.com/apache/flink/pull/8890 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14
ifndef-SleePy commented on issue #8870: [FLINK-12974][checkstyle] Bump checkstyle to 8.14 URL: https://github.com/apache/flink/pull/8870#issuecomment-506799770 To @hmcl , I wish I could. But I have no authorization to merge your PR since I'm not a committer of Flink project. I would suggest ping the committers like zentol to review it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12989) Generate HiveTableSink from from a Hive table
[ https://issues.apache.org/jira/browse/FLINK-12989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-12989. Resolution: Fixed merged in 1.9.0: f1721293b0701d584d42bd68671181e332d2ad04 > Generate HiveTableSink from from a Hive table > - > > Key: FLINK-12989 > URL: https://issues.apache.org/jira/browse/FLINK-12989 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Xuefu Zhang >Assignee: Xuefu Zhang >Priority: Major > > As a followup for FLINK-11480, this adds the conversion from a Hive table to > a table sink that's used for data connector writing side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events
[ https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu reassigned FLINK-13027: --- Assignee: (was: Ying Xu) > StreamingFileSink bulk-encoded writer supports file rolling upon customized > events > -- > > Key: FLINK-13027 > URL: https://issues.apache.org/jira/browse/FLINK-13027 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Ying Xu >Priority: Major > > When writing in bulk-encoded format such as Parquet, StreamingFileSink only > supports OnCheckpointRollingPolicy, which rolls file at checkpointing time. > > In many scenarios, it is beneficial that the sink can roll file upon certain > events, for example, when the file size reaches a limit. Such a rolling > policy can also potentially alleviate some of the side effects of > OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file > uploading all happen at the checkpoint time. > Specifically, this Jira calls for a new rolling policy that rolls file: > # whenever a customized event happens, e.g., the file size reaches certain > limit. > # whenever a checkpoint happens. This is needed for providing exactly-once > guarantees when writing bulk-encoded files. > Users of this rolling policy need to be aware that the customized event and > the next checkpoint epoch may be close to each other, thus may yield a tiny > file per checkpoint at the worst. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-13027) StreamingFileSink bulk-encoded writer supports file rolling upon customized events
[ https://issues.apache.org/jira/browse/FLINK-13027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ying Xu reassigned FLINK-13027: --- Assignee: Ying Xu > StreamingFileSink bulk-encoded writer supports file rolling upon customized > events > -- > > Key: FLINK-13027 > URL: https://issues.apache.org/jira/browse/FLINK-13027 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Reporter: Ying Xu >Assignee: Ying Xu >Priority: Major > > When writing in bulk-encoded format such as Parquet, StreamingFileSink only > supports OnCheckpointRollingPolicy, which rolls file at checkpointing time. > > In many scenarios, it is beneficial that the sink can roll file upon certain > events, for example, when the file size reaches a limit. Such a rolling > policy can also potentially alleviate some of the side effects of > OnCheckpointRollingPolicy, e.g.,, most of the heavy liftings including file > uploading all happen at the checkpoint time. > Specifically, this Jira calls for a new rolling policy that rolls file: > # whenever a customized event happens, e.g., the file size reaches certain > limit. > # whenever a checkpoint happens. This is needed for providing exactly-once > guarantees when writing bulk-encoded files. > Users of this rolling policy need to be aware that the customized event and > the next checkpoint epoch may be close to each other, thus may yield a tiny > file per checkpoint at the worst. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] bowenli86 commented on issue #8890: [FLINK-12989][hive] Generate HiveTableSink from from a Hive table
bowenli86 commented on issue #8890: [FLINK-12989][hive] Generate HiveTableSink from from a Hive table URL: https://github.com/apache/flink/pull/8890#issuecomment-506793531 Merging This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table
bowenli86 commented on issue #8921: [FLINK-13023][hive] Generate HiveTableSource from from a Hive table URL: https://github.com/apache/flink/pull/8921#issuecomment-506793351 There's a compile failure in flink-table-common. I don't think that's because of this PR, thus I restarted the build to see if that's fixed already by someone else. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE)
Xeli commented on issue #6594: [FLINK-9311] [pubsub] Added PubSub source connector with support for checkpointing (ATLEAST_ONCE) URL: https://github.com/apache/flink/pull/6594#issuecomment-506787729 @rmetzger it took a while for travis to run, but it just succeeded This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
zhijiangW commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#issuecomment-506758658 No other concerns on my side atm. LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner
asfgit closed pull request #8901: [FLINK-13003][table-planner-blink] Support Temporal TableFunction Join in blink planner URL: https://github.com/apache/flink/pull/8901 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-13003) Support Temporal TableFunction Join in processing time and event time
[ https://issues.apache.org/jira/browse/FLINK-13003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-13003. --- Resolution: Fixed Fix Version/s: 1.9.0 Fixed in 1.9.0: 4b7ca247431e727ac0533cf767d43615bbaf07c0 > Support Temporal TableFunction Join in processing time and event time > - > > Key: FLINK-13003 > URL: https://issues.apache.org/jira/browse/FLINK-13003 > Project: Flink > Issue Type: Task > Components: Table SQL / Planner >Reporter: Jark Wu >Assignee: Jark Wu >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > This is a feature port from flink-planner to blink-planner to support > temporal TableFunction join (or called versioned joins). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
asfgit closed pull request #8757: [FLINK-12850][core] Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime URL: https://github.com/apache/flink/pull/8757 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12850) Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime
[ https://issues.apache.org/jira/browse/FLINK-12850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12850. --- Resolution: Fixed Fix Version/s: 1.9.0 Release Note: Support LocalDate & LocalTime & LocalDataTime TypeInformations. Resolved in 1.9.0: 463cedddec1bd55545f6e3e39ada04229ef5ceb5 > Introduce TypeInfo for LocalDate/LocalTime/LocalDateTime > > > Key: FLINK-12850 > URL: https://issues.apache.org/jira/browse/FLINK-12850 > Project: Flink > Issue Type: New Feature > Components: API / Type Serialization System >Reporter: Jingsong Lee >Assignee: Jingsong Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Now in the new type system of table, the default class of time type is > LocalDate and so on. > There are some situations that need to be converted to TypeInformation, such > as toDataStream, so we need to provide TypeInformation support such as > LocalDate. > Introduce LocalTimeTypeInfo > Introduce LocalDateSerializer, LocalTimeSerializer, LocalDateTimeSerializer > Introduce LocalDateComparator, LocalTimeComparator, LocalDateTimeComparator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-12592) StreamTableEnvironment object has no attribute connect
[ https://issues.apache.org/jira/browse/FLINK-12592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng closed FLINK-12592. --- Resolution: Fixed Fix Version/s: 1.9.0 This problem has disappeared, So close this JIRA > StreamTableEnvironment object has no attribute connect > -- > > Key: FLINK-12592 > URL: https://issues.apache.org/jira/browse/FLINK-12592 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.9.0 >Reporter: Till Rohrmann >Assignee: sunjincheng >Priority: Critical > Labels: pull-request-available, test-stability > Fix For: 1.9.0 > > Attachments: image-2019-05-24-10-55-04-214.png, > image-2019-05-24-10-55-14-534.png > > Time Spent: 20m > Remaining Estimate: 0h > > The Python build module failed on Travis with the following problem: > {{'StreamTableEnvironment' object has no attribute 'connect'}}. > https://api.travis-ci.org/v3/job/535684431/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] asfgit closed pull request #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
asfgit closed pull request #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
flinkbot edited a comment on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506250830 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ✅ 1. The [description] looks good. - Approved by @sunjincheng121 [committer] * ✅ 2. There is [consensus] that the contribution should go into to Flink. - Approved by @sunjincheng121 [committer] * ❓ 3. Needs [attention] from. * ✅ 4. The change fits into the overall [architecture]. - Approved by @sunjincheng121 [committer] * ✅ 5. Overall code [quality] is good. - Approved by @sunjincheng121 [committer] Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506706349 Thanks for the update @dianfu +1 to merged. @flinkbot approve all This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wenhuitang commented on a change in pull request #8711: [FLINK-12817][docs]Correct the import in Processing function example.
wenhuitang commented on a change in pull request #8711: [FLINK-12817][docs]Correct the import in Processing function example. URL: https://github.com/apache/flink/pull/8711#discussion_r298561043 ## File path: docs/dev/stream/operators/process_function.md ## @@ -97,9 +97,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; Review comment: Thanks a lot, I have addressed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure.
StephanEwen commented on issue #8880: [FLINK-12986] [network] Fix instability of BoundedBlockingSubpartition under memory pressure. URL: https://github.com/apache/flink/pull/8880#issuecomment-506702770 @zhijiangW and @wsry I would merge this now unless you have further comments I should address first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9036) Add default value via suppliers
[ https://issues.apache.org/jira/browse/FLINK-9036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-9036: -- Labels: pull-request-available (was: ) > Add default value via suppliers > --- > > Key: FLINK-9036 > URL: https://issues.apache.org/jira/browse/FLINK-9036 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > > Earlier versions had a default value in {{ValueState}}. We dropped that, > because the value would have to be duplicated on each access, to be safe > against side effects when using mutable types. > For convenience, we should re-add the feature, but using a supplier/factory > function to create the default value on access. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] StephanEwen closed pull request #5735: [FLINK-9036] [core] Add default values to State Descriptors via suppliers
StephanEwen closed pull request #5735: [FLINK-9036] [core] Add default values to State Descriptors via suppliers URL: https://github.com/apache/flink/pull/5735 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes
zhijiangW commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes URL: https://github.com/apache/flink/pull/8924#issuecomment-506698607 Thanks for the review @zentol ! I have addressed the above comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] sunjincheng121 edited a comment on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
sunjincheng121 edited a comment on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506690440 Hi, I think we also need to update the `shell.py`, the test fails in `shell.py`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298552634 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final int DEFAULT_PARALLELISM = 2; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private TestRestartStrategy manuallyTriggeredRestartStrategy; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + } + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIter
[GitHub] [flink] dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
dianfu commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506697353 @sunjincheng121 Thanks a lot for the review. Updated the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298552634 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final int DEFAULT_PARALLELISM = 2; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private TestRestartStrategy manuallyTriggeredRestartStrategy; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + } + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIter
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298552634 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final int DEFAULT_PARALLELISM = 2; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private TestRestartStrategy manuallyTriggeredRestartStrategy; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + } + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIter
[GitHub] [flink] flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
flinkbot commented on issue #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925#issuecomment-506695220 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-12852) Deadlock occurs when requiring exclusive buffer for RemoteInputChannel
[ https://issues.apache.org/jira/browse/FLINK-12852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-12852: --- Labels: pull-request-available (was: ) > Deadlock occurs when requiring exclusive buffer for RemoteInputChannel > -- > > Key: FLINK-12852 > URL: https://issues.apache.org/jira/browse/FLINK-12852 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.9.0 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > Labels: pull-request-available > > When running tests with an upstream vertex and downstream vertex, deadlock > occurs when submitting the job: > {code:java} > "Sink: Unnamed (3/500)" #136 prio=5 os_prio=0 tid=0x7f2cca81b000 > nid=0x38845 waiting on condition [0x7f2cbe9fe000] > java.lang.Thread.State: TIMED_WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00073ed6b6f0> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:233) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:418) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:180) > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:54) > at > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.assignExclusiveSegments(RemoteInputChannel.java:139) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.assignExclusiveSegments(SingleInputGate.java:312) > - locked <0x00073fbc81f0> (a java.lang.Object) > at > org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.setup(SingleInputGate.java:220) > at > org.apache.flink.runtime.taskmanager.Task.setupPartionsAndGates(Task.java:836) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:598) > at java.lang.Thread.run(Thread.java:834) > {code} > This is due to the required and max of local buffer pool is not the same and > there may be over-allocation, when assignExclusiveSegments there are no > available memory. > > The detail of the scenarios is as follows: The parallelism of both upstream > vertex and downstream vertex are 1000 and 500 respectively. There are 200 TM > and each TM has 10696 buffers( in total and has 10 slots. For a TM that runs > 9 upstream tasks and 1 downstream task, the 9 upstream tasks start first with > local buffer pool \{required = 500, max = 2 * 500 + 8 = 1008}, it produces > data quickly and each occupy about 990 buffers. Then the DownStream task > starts and try to assigning exclusive buffers for 1500 -9 = 1491 > InputChannels. It requires 2981 buffers but only 1786 left. Since not all > downstream tasks can start, the job will be blocked finally and no buffer can > be released, and the deadlock finally occurred. > > I think although increasing the network memory solves the problem, the > deadlock may not be acceptable. Fined grained resource management > Flink-12761 can solve this problem, but AFAIK in 1.9 it will not include the > network memory into the ResourceProfile. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] gaoyunhaii opened a new pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers
gaoyunhaii opened a new pull request #8925: [FLINK-12852][network] Fix the deadlock occured when requesting exclusive buffers URL: https://github.com/apache/flink/pull/8925 ## What is the purpose of the change This pull request tries to fix the deadlock problem during requesting exclusive buffers. Since currently the number of maximum buffers and the number of required buffers are not the same for local buffer pools, there may be cases that the local buffer pools of the upstream tasks occupy all the buffers while the downstream tasks fail to acquire exclusive buffers to make progress. Although this problem can be fixed by increasing the number of total buffers, the deadlock may not be acceptable. Therefore, this PR tries to failover the current execution when the deadlock occurs and tips users to increase the number of buffers in the exceptional message. ## Brief change log The main changes include 1. Add an option for the timeout of `requestMemorySegment` for each channel. The default timeout is 30s. This option is marked as undocumented since it may be removed within the future implementation. 2. Transfer the timeout to `NetworkBufferPool`. 3. `requestMemorySegments` will throw `IOException("Insufficient buffer")` if not all segments acquired after timeout. ## Verifying this change 1. Added test that validates `requestMemorySegments` end exceptionally if not all segments acquired after timeout. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] asfgit closed pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser
asfgit closed pull request #8850: [FLINK-12954] Supports create(drop) view grammar for sql parser URL: https://github.com/apache/flink/pull/8850 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12954) Supports create(drop) view grammar for sql parser
[ https://issues.apache.org/jira/browse/FLINK-12954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-12954. --- Resolution: Fixed Fixed in 1.9.0: 71570bc9a5dc549a34b020981e5cf87c479658e9 > Supports create(drop) view grammar for sql parser > - > > Key: FLINK-12954 > URL: https://issues.apache.org/jira/browse/FLINK-12954 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Danny Chan >Assignee: Danny Chan >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Add watermark to create table; Also add create view grammar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config
sunjincheng121 commented on issue #8910: [hotfix][python] Aligns with Java Table API by removing methods exec_env and query_config URL: https://github.com/apache/flink/pull/8910#issuecomment-506690440 Hi, I think we also need to update the `shell.py`, What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298543321 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ## @@ -876,6 +889,15 @@ private void jobStatusChanged( validateRunsInMainThread(); if (newJobStatus.isGloballyTerminalState()) { + // other terminal job states are handled by the executions + if (newJobStatus == JobStatus.FINISHED) { + runAsync(() -> { + for (Map.Entry> entry : registeredTaskManagers.entrySet()) { Review comment: `registeredTaskManagers.keySet().forEach(partitionTracker::stopTrackingAndReleaseAllPartitions);` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298518652 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); + private final Map> producersByExecutor = new HashMap<>(); + + private final ShuffleMaster shuffleMaster; + + private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + + public PartitionTrackerImpl( + final JobID jobId, + final ShuffleMaster shuffleMaster, + final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); + this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + } + + @Override + public CompletableFuture registerPartition( + final PartitionDescriptor partitionDescriptor, + final ProducerDescriptor producerDescriptor, + final ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory) { + + CompletableFuture shuffleDescriptorFuture = shuffleMaster.registerPartitionWithProducer(partitionDescriptor, producerDescriptor); + + return shuffleDescriptorFuture + .thenApply(shuffleDescriptor -> + resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor, shuffleDescriptor)) + .thenApply(deploymentDescriptor -> { + startTrackingPartition(producerDescriptor.getProducerLocation(), deploymentDescriptor); + return deploymentDescriptor; + }); + } + + @Override + public Collection getTrackedDeploymentDescriptors(final ExecutionAttemptID producerId) { + final InternalPartitionInfo partitionInfo = partitionInfoByProducer.get(producerId); + if (partitionInfo == null) { + return Collections.emptyList(); + } else { + return partitionInfo.getResultPartitionDeploymentDescriptors(); + } + } + + private void startTrackingPartition(final ResourceID producingTaskExecutorId, final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { + Preconditions.checkNotNull(producingTaskExecutorId); + Preconditions.checkNotNull(resultPartitionDeploymentDescriptor); + + final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(); + addPartition(producingTaskExecutorId, resultP
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298500217 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); + private final Map> producersByExecutor = new HashMap<>(); + + private final ShuffleMaster shuffleMaster; + + private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + + public PartitionTrackerImpl( + final JobID jobId, + final ShuffleMaster shuffleMaster, + final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); + this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + } + + @Override + public CompletableFuture registerPartition( + final PartitionDescriptor partitionDescriptor, + final ProducerDescriptor producerDescriptor, + final ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory) { + + CompletableFuture shuffleDescriptorFuture = shuffleMaster.registerPartitionWithProducer(partitionDescriptor, producerDescriptor); Review comment: nit: any value in this local var? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298494696 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); Review comment: I would prefer all initialisations in the constructor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298537362 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -285,7 +281,13 @@ public LogicalSlot getAssignedResource() { public Optional getResultPartitionDeploymentDescriptor( IntermediateResultPartitionID id) { - return Optional.ofNullable(producedPartitions.get(id)); + Collection trackedDeploymentDescriptors = vertex.getExecutionGraph().getPartitionTracker().getTrackedDeploymentDescriptors(getAttemptId()); + for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : trackedDeploymentDescriptors) { Review comment: anyways I think the method rather belongs to the tracker with the exec id added to the signature This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298524076 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); + private final Map> producersByExecutor = new HashMap<>(); + + private final ShuffleMaster shuffleMaster; + + private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + + public PartitionTrackerImpl( + final JobID jobId, + final ShuffleMaster shuffleMaster, + final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); + this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + } + + @Override + public CompletableFuture registerPartition( + final PartitionDescriptor partitionDescriptor, + final ProducerDescriptor producerDescriptor, + final ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory) { + + CompletableFuture shuffleDescriptorFuture = shuffleMaster.registerPartitionWithProducer(partitionDescriptor, producerDescriptor); + + return shuffleDescriptorFuture + .thenApply(shuffleDescriptor -> + resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor, shuffleDescriptor)) + .thenApply(deploymentDescriptor -> { + startTrackingPartition(producerDescriptor.getProducerLocation(), deploymentDescriptor); + return deploymentDescriptor; + }); + } + + @Override + public Collection getTrackedDeploymentDescriptors(final ExecutionAttemptID producerId) { + final InternalPartitionInfo partitionInfo = partitionInfoByProducer.get(producerId); + if (partitionInfo == null) { + return Collections.emptyList(); + } else { + return partitionInfo.getResultPartitionDeploymentDescriptors(); + } + } + + private void startTrackingPartition(final ResourceID producingTaskExecutorId, final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { + Preconditions.checkNotNull(producingTaskExecutorId); + Preconditions.checkNotNull(resultPartitionDeploymentDescriptor); + + final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(); + addPartition(producingTaskExecutorId, resultP
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298524932 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public interface PartitionTracker { + + /** +* Asynchronously starts tracking the given partition. +* +* @param partition descriptor for the partition +* @param producerDescriptor descriptor for the producer +* @param resultPartitionDeploymentDescriptorFactory factory for the ResultPartitionDeploymentDescriptor +* @return future containing the assembled result partition deployment descriptor +*/ + CompletableFuture registerPartition( + PartitionDescriptor partition, + ProducerDescriptor producerDescriptor, + ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory); + + /** +* Returns all tracked partitions for the given producer. +* +* @param producerId producer to return tracked partitions for +* @return tracked partitions for the given producer +*/ + Collection getTrackedDeploymentDescriptors(ExecutionAttemptID producerId); + + /** +* Stops the tracking of all partitions for the given task executor ID, without issuing any release calls. +* +* @param producingTaskExecutorId task executor id to determine relevant partitions +*/ + void stopTrackingAllPartitions(ResourceID producingTaskExecutorId); + + /** +* Releases all partitions for the given producer, and stop the tracking of partitions that were released. +* +* @param producerId producer id to determine relevant partitions +*/ + void stopTrackingAndReleasePartitions(final ExecutionAttemptID producerId); + + /** +* Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released. +* +* @param producingTaskExecutorId task executor id to determine relevant partitions +*/ + void stopTrackingAndReleaseAllPartitions(ResourceID producingTaskExecutorId); + + /** +* Returns whether any partition is being tracked for the given task executor ID. +* +* @param producingTaskExecutorId task executor id to determine relevant partitions +*/ + boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId); + + /** +* Factory for creating a {@link ResultPartitionDeploymentDescriptor} from a given +* {@link PartitionDescriptor}/{@link ShuffleDescriptor} pair. +*/ + interface ResultPartitionDeploymentDescriptorFactory { Review comment: @FunctionalInterface This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298527316 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public interface PartitionTracker { + + /** +* Asynchronously starts tracking the given partition. +* +* @param partition descriptor for the partition +* @param producerDescriptor descriptor for the producer +* @param resultPartitionDeploymentDescriptorFactory factory for the ResultPartitionDeploymentDescriptor +* @return future containing the assembled result partition deployment descriptor +*/ + CompletableFuture registerPartition( + PartitionDescriptor partition, + ProducerDescriptor producerDescriptor, + ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory); + + /** +* Returns all tracked partitions for the given producer. +* +* @param producerId producer to return tracked partitions for +* @return tracked partitions for the given producer +*/ + Collection getTrackedDeploymentDescriptors(ExecutionAttemptID producerId); + + /** +* Stops the tracking of all partitions for the given task executor ID, without issuing any release calls. +* +* @param producingTaskExecutorId task executor id to determine relevant partitions +*/ + void stopTrackingAllPartitions(ResourceID producingTaskExecutorId); + + /** +* Releases all partitions for the given producer, and stop the tracking of partitions that were released. +* +* @param producerId producer id to determine relevant partitions +*/ + void stopTrackingAndReleasePartitions(final ExecutionAttemptID producerId); Review comment: atm it is stop all for exec id and release what needs release. I think comment needs adjustment and function could be called: `stopTrackingAndReleasePartitionsForExecution` same for the next method, stop all for task executor and release what needs release: `stopTrackingAndReleasePartitionsOnTaskExecutor` (release all is a bit confusing) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298538571 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); + private final Map> producersByExecutor = new HashMap<>(); + + private final ShuffleMaster shuffleMaster; + + private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + + public PartitionTrackerImpl( + final JobID jobId, + final ShuffleMaster shuffleMaster, + final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); + this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + } + + @Override + public CompletableFuture registerPartition( + final PartitionDescriptor partitionDescriptor, + final ProducerDescriptor producerDescriptor, + final ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory) { + + CompletableFuture shuffleDescriptorFuture = shuffleMaster.registerPartitionWithProducer(partitionDescriptor, producerDescriptor); + + return shuffleDescriptorFuture + .thenApply(shuffleDescriptor -> + resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor, shuffleDescriptor)) + .thenApply(deploymentDescriptor -> { + startTrackingPartition(producerDescriptor.getProducerLocation(), deploymentDescriptor); Review comment: We should have thought about a batch registration in shuffle master probably.. as we are joining on all of them anyways This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298539553 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ## @@ -94,7 +94,8 @@ public static ExecutionGraph buildGraph( BlobWriter blobWriter, Time allocationTimeout, Logger log, - ShuffleMaster shuffleMaster) throws JobExecutionException, JobException { + PartitionTracker partitionTracker) + throws JobExecutionException, JobException { Review comment: keep formatting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298493297 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import java.util.Optional; + +/** + * Factory for {@link PartitionTracker}. + */ +public interface PartitionTrackerFactory { + + /** +* Creates a new PartitionTracker. +* +* @param taskExecutorGatewayLookup lookup function to access task executor gateways +* @return created PartitionTracker +*/ + PartitionTracker create(TaskExecutorGatewayLookup taskExecutorGatewayLookup); + + /** +* Lookup function for {@link TaskExecutorGateway}. +*/ + interface TaskExecutorGatewayLookup { Review comment: `@FunctionalInterface` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298534681 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import java.util.Optional; + +/** + * Factory for {@link PartitionTracker}. + */ +public interface PartitionTrackerFactory { + + /** +* Creates a new PartitionTracker. +* +* @param taskExecutorGatewayLookup lookup function to access task executor gateways +* @return created PartitionTracker +*/ + PartitionTracker create(TaskExecutorGatewayLookup taskExecutorGatewayLookup); + + /** +* Lookup function for {@link TaskExecutorGateway}. +*/ + interface TaskExecutorGatewayLookup { + + /** +* Returns a {@link TaskExecutorGateway} corresponding to the given ResourceID. +* +* @param taskExecutorId id of the task executor to look up. +* @return optional task executor gateway +*/ + Optional lookup(ResourceID taskExecutorId); Review comment: I would prefer a partition releaser: ``` interface TaskExecutorPartitionReleaser { void releasePartitions(ResourceID taskExecutorId, JobID jobId, Collection partitionIds); } ``` could also return boolean whether TE found, although not used anyways do we need the full `TaskExecutorGateway` for something? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298495055 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); + private final Map> producersByExecutor = new HashMap<>(); + + private final ShuffleMaster shuffleMaster; + + private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; Review comment: final also any value in explicit prefixing over a static import of `PartitionTrackerFactory`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298493519 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import java.util.Optional; + +/** + * Factory for {@link PartitionTracker}. + */ +public interface PartitionTrackerFactory { Review comment: @FunctionalInterface This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298509021 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -285,7 +281,13 @@ public LogicalSlot getAssignedResource() { public Optional getResultPartitionDeploymentDescriptor( IntermediateResultPartitionID id) { - return Optional.ofNullable(producedPartitions.get(id)); + Collection trackedDeploymentDescriptors = vertex.getExecutionGraph().getPartitionTracker().getTrackedDeploymentDescriptors(getAttemptId()); + for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : trackedDeploymentDescriptors) { Review comment: I do not really see a problem atm to have it in `InternalPartitionInfo` with a similar add partition method and Collection -> Map This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298540751 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -1325,22 +1324,8 @@ private void sendCancelRpcCall(int numberRetries) { private void sendReleaseIntermediateResultPartitionsRpcCall() { LOG.info("Discarding the results produced by task execution {}.", attemptId); - final LogicalSlot slot = assignedResource; - - if (slot != null) { - final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); - - Collection partitions = vertex.getProducedPartitions().values(); - Collection partitionIds = new ArrayList<>(partitions.size()); - for (IntermediateResultPartition partition : partitions) { - partitionIds.add(new ResultPartitionID(partition.getPartitionId(), attemptId)); - } - - if (!partitionIds.isEmpty()) { - // TODO For some tests this could be a problem when querying too early if all resources were released - taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds); - } - } + final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker(); + partitionTracker.stopTrackingAndReleasePartitions(getAttemptId()); Review comment: looks like we are still doing it in cancel and and suspend This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM
azagrebin commented on a change in pull request #8778: [FLINK-12615][coordination] Track partitions on JM URL: https://github.com/apache/flink/pull/8778#discussion_r298504283 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java ## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final Map partitionInfoByProducer = new HashMap<>(); + private final Map> producersByExecutor = new HashMap<>(); + + private final ShuffleMaster shuffleMaster; + + private PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + + public PartitionTrackerImpl( + final JobID jobId, + final ShuffleMaster shuffleMaster, + final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); + this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + } + + @Override + public CompletableFuture registerPartition( + final PartitionDescriptor partitionDescriptor, + final ProducerDescriptor producerDescriptor, + final ResultPartitionDeploymentDescriptorFactory resultPartitionDeploymentDescriptorFactory) { + + CompletableFuture shuffleDescriptorFuture = shuffleMaster.registerPartitionWithProducer(partitionDescriptor, producerDescriptor); + + return shuffleDescriptorFuture + .thenApply(shuffleDescriptor -> + resultPartitionDeploymentDescriptorFactory.createResultPartitionDeploymentDescriptor(partitionDescriptor, shuffleDescriptor)) + .thenApply(deploymentDescriptor -> { + startTrackingPartition(producerDescriptor.getProducerLocation(), deploymentDescriptor); Review comment: At the moment, there is no requirement that shuffle master returns the descriptor into the main thread which is not even given to the shuffle master. This was the reason why `Execution.registerProducedPartitions` (non-static) modified its state in `getJobMasterMainThreadExecutor`. I would give the `JobMasterMainThreadExecutor` to `PartitionTrackerImpl` and do `assertRunningInJobMasterMainThread();` for state modifying methods. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on a change in pull request #8811: [FLINK-12777][network] Support CheckpointBarrierHandler in StreamTwoInputSelectableProcessor
pnowojski commented on a change in pull request #8811: [FLINK-12777][network] Support CheckpointBarrierHandler in StreamTwoInputSelectableProcessor URL: https://github.com/apache/flink/pull/8811#discussion_r298540216 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/LinkedBufferStorage.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.util.ExceptionUtils; + +import java.io.IOException; +import java.util.Optional; + +/** + * Implementation of {@link BufferStorage} that links two {@link BufferStorage} together. + * Each of the linked {@link BufferStorage} will store buffers independently, but they will be + * linked together for {@link #rollOver()} - if one is rolled over, other will do that as well. + */ +public class LinkedBufferStorage implements BufferStorage { Review comment: I see your point. I think java doc above the class explains this: > Each of the linked {@link BufferStorage} will store buffers independently which is not ideal (self documenting code > comments), but as we don't have a better proposal, I will leave it as it is. If someone will come up with a better name in the future, it's always easy to rename internal classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298539105 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -63,6 +66,9 @@ return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: + return new AdaptedRestartPipelinedRegionStrategyNG.Factory(); Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] pnowojski commented on issue #8455: [FLINK-12284, FLINK-12637][Network, Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode
pnowojski commented on issue #8455: [FLINK-12284,FLINK-12637][Network,Metrics]Fix the incorrect inputBufferUsage metric in credit-based network mode URL: https://github.com/apache/flink/pull/8455#issuecomment-506683902 > In addition, one PR should not connect to two jira tickets in general. So I think you might need to close one jira and adjust this PR to only one jira. I wouldn't be so strict about having couple of jira tickets in one PR, especially if they depend on one another. However as a rule of thumb one commit shouldn't be addressing two jira tickets, since probably it means that either: - commit is fixing two separate things - Jira tickets are duplicated As far as I recall this is inline with our coding style guideline. Of course it's better to have separate things fixed in independent PRs. Here I think it's the case that "Jira tickets are duplicated". In my opinion `Add metrics for floatingBufferUsage and exclusiveBufferUsage for credit based mode` is a solution for `InputBufferPoolUsage is incorrect in credit-based network control flow`. So I would close the newer ticket as duplicate of the older one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298538461 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final int DEFAULT_PARALLELISM = 2; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private TestRestartStrategy manuallyTriggeredRestartStrategy; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + } + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIterator.
[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298538576 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** +* Tests for region failover for job in EAGER mode. +* This applies to streaming job, with no BLOCKING edge. +* +* (v11) ---> (v21) +* +* (v12) ---> (v22) +* +*^ +*
[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298538537 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -208,7 +211,7 @@ public void testFailurePropagationToUnderlyingStrategy() throws Exception { final ExecutionVertex ev21 = vertexIterator.next(); // trigger downstream regions to schedule - testingMainThreadExecutor.execute(() -> { + componentMainThreadExecutor.execute(() -> { Review comment: Thanks, fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298536587 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** +* Tests for region failover for job in EAGER mode. +* This applies to streaming job, with no BLOCKING edge. +* +* (v11) ---> (v21) +* +* (v12) ---> (v22) +* +*^ +*
[GitHub] [flink] GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
GJL commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298536587 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** +* Tests for region failover for job in EAGER mode. +* This applies to streaming job, with no BLOCKING edge. +* +* (v11) ---> (v21) +* +* (v12) ---> (v22) +* +*^ +*
[GitHub] [flink] dawidwys merged pull request #8876: [FLINK-12985][core][streaming] Rename StreamTransformation to org.apache.flink.api.dag.Transformation
dawidwys merged pull request #8876: [FLINK-12985][core][streaming] Rename StreamTransformation to org.apache.flink.api.dag.Transformation URL: https://github.com/apache/flink/pull/8876 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-12985) Move & rename StreamTransformation to org.apache.flink.api.dag.Transformation
[ https://issues.apache.org/jira/browse/FLINK-12985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-12985. Resolution: Fixed Implemented in fd511c345eac31f03b801ff19dbf1f8c86aae760 > Move & rename StreamTransformation to org.apache.flink.api.dag.Transformation > - > > Key: FLINK-12985 > URL: https://issues.apache.org/jira/browse/FLINK-12985 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 20m > Remaining Estimate: 0h > > It is quite strange to have a streaming package in {{flink-core}}. Moreover > right now the {{StreamTransformation#setChainingStrategy}} throws Exception > in half of the transformations. > I suggest to: > # rename the {{StreamTransformation}} to > {{org.apache.flink.api.dag.Transformation}} > # extract {{#setChainingMethod}} to a separate class > {{PhysicalTransformation}} & move the {{ChainingStrategy}} back to the > flink-streaming-java module -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298517179 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** +* Tests for region failover for job in EAGER mode. +* This applies to streaming job, with no BLOCKING edge. +* +* (v11) ---> (v21) +* +* (v12) ---> (v22) +* +*^ +
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298529362 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -208,7 +211,7 @@ public void testFailurePropagationToUnderlyingStrategy() throws Exception { final ExecutionVertex ev21 = vertexIterator.next(); // trigger downstream regions to schedule - testingMainThreadExecutor.execute(() -> { + componentMainThreadExecutor.execute(() -> { Review comment: can invoke `markFinished` directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zhuzhurk commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298524268 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java ## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. + * There can be local+local and local+global concurrent failovers. + */ +public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private static final int DEFAULT_PARALLELISM = 2; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private TestRestartStrategy manuallyTriggeredRestartStrategy; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); + } + + /** +* Tests that 2 concurrent region failovers can lead to a properly vertex state. +* +* (v11) -+-> (v21) +*x +* (v12) -+-> (v22) +* +*^ +*| +* (blocking) +* +*/ + @Test + public void testConcurrentRegionFailovers() throws Exception { + + // the logic in this test is as follows: + // - start a job + // - cause {ev11} failure and delay the local recovery action via the manual executor + // - cause {ev12} failure and delay the local recovery action via the manual executor + // - resume local recovery actions + // - validate that each task is restarted only once + + final ExecutionGraph eg = createExecutionGraph(); + + final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = + (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); + failoverStrategy.setBlockerFuture(new CompletableFuture<>()); + + final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); + final ExecutionVertex ev11 = vertexIter
[GitHub] [flink] flinkbot edited a comment on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes
flinkbot edited a comment on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes URL: https://github.com/apache/flink/pull/8924#issuecomment-506655381 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❗ 3. Needs [attention] from. - Needs attention by @azagrebin, @zentol [PMC] * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298517846 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** +* Tests for region failover for job in EAGER mode. +* This applies to streaming job, with no BLOCKING edge. +* +* (v11) ---> (v21) +* +* (v12) ---> (v22) +* +*^ +
[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298503935 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java ## @@ -63,6 +66,9 @@ return new RestartAllStrategy.Factory(); case PIPELINED_REGION_RESTART_STRATEGY_NAME: + return new AdaptedRestartPipelinedRegionStrategyNG.Factory(); Review comment: indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298508743 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java ## @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}. + */ +public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger { + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + } + + @Test + public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { + final JobGraph jobGraph = createStreamingJobGraph(); + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); + + final Iterator vertexIterator = executionGraph.getAllExecutionVertices().iterator(); + final ExecutionVertex onlyExecutionVertex = vertexIterator.next(); + + setTaskRunning(executionGraph, onlyExecutionVertex); + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + checkState(checkpointCoordinator != null); + + checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false); + final int pendingCheckpointsBeforeFailure = checkpointCoordinator.getNumberOfPendingCheckpoints(); + + failVertex(onlyExecutionVertex); + + assertThat(pendingCheckpointsBeforeFailure, is(equalTo(1))); + assertNoPendingCheckpoints(checkpointCoordinator); + } + + private void setTaskRunning(final ExecutionGraph executionGra
[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298520665 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java ## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; +import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. + */ +public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { + + private static final JobID TEST_JOB_ID = new JobID(); + + private ComponentMainThreadExecutor componentMainThreadExecutor; + + private FailingSlotProviderDecorator slotProvider; + + private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; + + @Before + public void setUp() { + manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); + componentMainThreadExecutor = new ScheduledExecutorToComponentMainThreadExecutorAdapter(manualMainThreadExecutor, Thread.currentThread()); + slotProvider = new FailingSlotProviderDecorator(new SimpleSlotProvider(TEST_JOB_ID, 14)); + } + + /** +* Tests for region failover for job in EAGER mode. +* This applies to streaming job, with no BLOCKING edge. +* +* (v11) ---> (v21) +* +* (v12) ---> (v22) +* +*^ +
[GitHub] [flink] zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler
zentol commented on a change in pull request #8922: [FLINK-12876][runtime] Add an adapter of region failover NG for legacy scheduler URL: https://github.com/apache/flink/pull/8922#discussion_r298526519 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ## @@ -939,123 +933,22 @@ public void scheduleForExecution() throws JobException { } } - private CompletableFuture scheduleLazy(SlotProvider slotProvider) { - - final ArrayList> schedulingFutures = new ArrayList<>(numVerticesTotal); - // simply take the vertices without inputs. - for (ExecutionJobVertex ejv : verticesInCreationOrder) { - if (ejv.getJobVertex().isInputVertex()) { - final CompletableFuture schedulingJobVertexFuture = ejv.scheduleAll( - slotProvider, - allowQueuedScheduling, - LocationPreferenceConstraint.ALL, // since it is an input vertex, the input based location preferences should be empty - Collections.emptySet()); - - schedulingFutures.add(schedulingJobVertexFuture); - } - } - - return FutureUtils.waitForAll(schedulingFutures); + private CompletableFuture scheduleLazy() { + final List executionVertices = StreamSupport + .stream(getAllExecutionVertices().spliterator(), false) + .collect(Collectors.toList()); + return SchedulingUtils.scheduleLazy(executionVertices, this); Review comment: agreed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter
ifndef-SleePy commented on issue #8894: [FLINK-12961][datastream] Providing an internal execution method of StreamExecutionEnvironment accepting StreamGraph as input parameter URL: https://github.com/apache/flink/pull/8894#issuecomment-506671123 @aljoscha I think you have raised a good question! AFAIK, Blink runner generates `StreamGraph` itself. And it also keeps transformations itself. When a `DataStream` is translated to a `Table`, the Blink batch runner back tracks the all transformations of `StreamExecutionEnvironment` through transformations kept by itself. Although there is a corner case that if there exists a disconnected sub graph in `StreamExecutionEnvironment` which might not be touched by back tracking. I have not reached an agreement with my colleague about this case. I'm wondering whether there is a better solution for Blink batch runner? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-13025) Elasticsearch 7.x support
[ https://issues.apache.org/jira/browse/FLINK-13025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16874807#comment-16874807 ] Aljoscha Krettek commented on FLINK-13025: -- If it is possible, i.e. if the interfaces are backwards compatible I'd like if we had a universal connector, yes. > Elasticsearch 7.x support > - > > Key: FLINK-13025 > URL: https://issues.apache.org/jira/browse/FLINK-13025 > Project: Flink > Issue Type: New Feature > Components: Connectors / ElasticSearch >Affects Versions: 1.8.0 >Reporter: Keegan Standifer >Priority: Major > > Elasticsearch 7.0.0 was released in April of 2019: > [https://www.elastic.co/blog/elasticsearch-7-0-0-released] > The latest elasticsearch connector is > [flink-connector-elasticsearch6|https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-elasticsearch6] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] yanghua commented on issue #8871: [FLINK-12976] Bump Kafka client version to 2.3.0 for universal Kafka connector
yanghua commented on issue #8871: [FLINK-12976] Bump Kafka client version to 2.3.0 for universal Kafka connector URL: https://github.com/apache/flink/pull/8871#issuecomment-50645 I commented some code for testing because I found if we close the Kafka producer the Transaction with id : `MockTask-002a002c-2` would enter `PrepareAbort`. The reason is `Aborting incomplete transaction due to shutdown` from the test log. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr closed pull request #8874: [FLINK-12968][table-common] Add a utility for logical type casts
twalthr closed pull request #8874: [FLINK-12968][table-common] Add a utility for logical type casts URL: https://github.com/apache/flink/pull/8874 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-13029) Remove expressionBridge from QueryOperations factories
[ https://issues.apache.org/jira/browse/FLINK-13029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-13029: - Summary: Remove expressionBridge from QueryOperations factories (was: Remove expressionBridge from OperationTreeBuilder's factories) > Remove expressionBridge from QueryOperations factories > -- > > Key: FLINK-13029 > URL: https://issues.apache.org/jira/browse/FLINK-13029 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Affects Versions: 1.9.0 >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Fix For: 1.9.0 > > > Expression bridge is used to create a schema of QueryOperation. This is no > longer necessary with ResolvedExpressions in place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-13029) Remove expressionBridge from OperationTreeBuilder's factories
Dawid Wysakowicz created FLINK-13029: Summary: Remove expressionBridge from OperationTreeBuilder's factories Key: FLINK-13029 URL: https://issues.apache.org/jira/browse/FLINK-13029 Project: Flink Issue Type: Sub-task Components: Table SQL / API Affects Versions: 1.9.0 Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz Fix For: 1.9.0 Expression bridge is used to create a schema of QueryOperation. This is no longer necessary with ResolvedExpressions in place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] [flink] flinkbot commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes
flinkbot commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes URL: https://github.com/apache/flink/pull/8924#issuecomment-506655381 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes
zhijiangW commented on issue #8924: [hotfix][docs] Update the generated configuration docs for previous changes URL: https://github.com/apache/flink/pull/8924#issuecomment-506655017 @flinkbot attention @azagrebin @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW opened a new pull request #8924: [hotfix][docs] Update the generated configuration docs for previous changes
zhijiangW opened a new pull request #8924: [hotfix][docs] Update the generated configuration docs for previous changes URL: https://github.com/apache/flink/pull/8924 ## What is the purpose of the change *Some configuration docs were forgot to update for previous changes, so run `generate-config-docs` for updating relevant ones.* ## Brief change log - *Update the generated blob_server_configuration.html* - *Add the new generated netty_shuffle_environment_configuration.html* ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] dawidwys commented on a change in pull request #8874: [FLINK-12968][table-common] Add a utility for logical type casts
dawidwys commented on a change in pull request #8874: [FLINK-12968][table-common] Add a utility for logical type casts URL: https://github.com/apache/flink/pull/8874#discussion_r298500052 ## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/TypeInferenceUtil.java ## @@ -206,7 +207,9 @@ private static AdaptedCallContext adaptArguments( } private static boolean canCast(DataType sourceDataType, DataType targetDataType) { Review comment: Just ignore my comment. I thought this is a public method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on issue #8874: [FLINK-12968][table-common] Add a utility for logical type casts
twalthr commented on issue #8874: [FLINK-12968][table-common] Add a utility for logical type casts URL: https://github.com/apache/flink/pull/8874#issuecomment-506650471 Thanks @dawidwys. Will merge... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] 1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService
1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce mailbox-based ExecutorService URL: https://github.com/apache/flink/pull/8692#discussion_r298498045 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ## @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks.mailbox; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Optional; + +/** + * This class encapsulates the logic of the mailbox-based execution model. At the core of this model + * {@link #runMailboxLoop()} that continuously executes the provided {@link MailboxDefaultAction} in a loop. On each + * iteration, the method also checks if there are pending actions in the mailbox and executes such actions. This model + * ensures single-threaded execution between the default action (e.g. record processing) and mailbox actions (e.g. + * checkpoint trigger, timer firing, ...). + * + * The {@link MailboxDefaultAction} interacts with this class through the {@link MailboxDefaultActionContext} to + * communicate control flow changes to the mailbox loop, e.g. that invocations of the default action are temporarily + * or permanently exhausted. + * + * The design of {@link #runMailboxLoop()} is centered around the idea of keeping the expected hot path + * (default action, no mail) as fast as possible, with just a single volatile read per iteration in + * {@link Mailbox#hasMail}. This means that all checking of mail and other control flags (mailboxLoopRunning, + * suspendedDefaultAction) are always connected to #hasMail indicating true. This means that control flag changes in + * the mailbox thread can be done directly, but we must ensure that there is at least one action in the mailbox so that + * the change is picked up. For control flag changes by all other threads, that must happen through mailbox actions, + * this is automatically the case. + * + * This class has a open-prepareClose-close lifecycle that is connected with and maps to the lifecycle of the + * encapsulated {@link Mailbox} (which is open-quiesce-close). + * + * The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} exists to run the current sources + * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with the mailbox model in a compatibility + * mode. Once we drop the old source interface for the new one (FLIP-27) this method can eventually go away. + */ +public class MailboxProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(MailboxProcessor.class); + + /** The mailbox data-structure that manages request for special actions, like timers, checkpoints, ... */ + private final Mailbox mailbox; + + /** Executor-style facade for client code to submit actions to the mailbox. */ + private final TaskMailboxExecutorService taskMailboxExecutor; + + /** Action that is repeatedly executed if no action request is in the mailbox. Typically record processing. */ + private final MailboxDefaultAction mailboxDefaultAction; + + /** Control flag to terminate the mailbox loop. Must only be accessed from mailbox thread. */ + private boolean mailboxLoopRunning; + + /** +* Remembers a currently active suspension of the default action. Serves as flag to indicate a suspended +* default action (suspended if not-null) and to reuse the object as return value in consecutive suspend attempts. +* Must only be accessed from mailbox thread. +*/ + private MailboxDefaultAction.SuspendedDefaultAction suspendedDefaultAction; + + /** Special action that is used to terminate the mailbox loop. */ + private final Runnable mailboxPoisonLetter; + + public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) { + this.mailboxDefaultAction = Preconditions.checkNotNull(mailboxDefaultAction); + this.mailbox = new MailboxImpl(