[GitHub] lamber-ken commented on issue #7296: [hotfix][web] fix the desc about restart strategy in web
lamber-ken commented on issue #7296: [hotfix][web] fix the desc about restart strategy in web URL: https://github.com/apache/flink/pull/7296#issuecomment-447544430 hi,@GJL,cc,thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.
[ https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-6: --- Labels: pull-request-available (was: ) > Overwrite outdated in-progress files in StreamingFileSink. > -- > > Key: FLINK-6 > URL: https://issues.apache.org/jira/browse/FLINK-6 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2 > > > In order to guarantee exactly-once semantics, the streaming file sink is > implementing a two-phase commit protocol when writing files to the filesystem. > Initially data is written to in-progress files. These files are then put into > "pending" state when they are completed (based on the rolling policy), and > they are finally committed when the checkpoint that put them in the "pending" > state is acknowledged as complete. > The above shows that in the case that we have: > 1) checkpoints A, B, C coming > 2) checkpoint A being acknowledged and > 3) failure > Then we may have files that do not belong to any checkpoint (because B and C > were not considered successful). These files are currently not cleaned up. > In order to reduce the amount of such files created, we removed the random > suffix from in-progress temporary files, so that the next in-progress file > that is opened for this part, overwrites them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kl0u opened a new pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u opened a new pull request #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313 ## What is the purpose of the change As described in the related jira, this PR targets the in-progress files left after a failure, that belong to no checkpoint. In this case, we lazily overwrite them as soon as the new tasks arrive to that part-counter. ## Brief change log Just removes the random suffix from the `LocalRecoverableWriter` and the `HadoopRecoverableWriter` so that the temporary files are overwritten. ## Verifying this change Added 2 tests in the `AbstractRecoverableWriterTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) R @aljoscha This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11169) [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui
Jiayong Mo created FLINK-11169: -- Summary: [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui Key: FLINK-11169 URL: https://issues.apache.org/jira/browse/FLINK-11169 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.7.0, 1.6.2 Reporter: Jiayong Mo In the Flink web UI page, we can access the log of jobmanager from the tab under its corresponding page, but we could not reload the log (the content is not up-to-date) since the log file has been cached for 300s in the local client' disk. And this problem occurs because of the incorrect way to check if the file should be cached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11169) fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui
[ https://issues.apache.org/jira/browse/FLINK-11169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiayong Mo updated FLINK-11169: --- Summary: fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui (was: [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui) > fix the problem of not being reloaded for jobmanager's log after clicking the > refresh button in web ui > -- > > Key: FLINK-11169 > URL: https://issues.apache.org/jira/browse/FLINK-11169 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.2, 1.7.0 >Reporter: Jiayong Mo >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > In the Flink web UI page, we can access the log of jobmanager from the tab > under its corresponding page, but we could not reload the log (the content is > not up-to-date) since the log file has been cached for 300s in the local > client' disk. And this problem occurs because of the incorrect way to check > if the file should be cached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11169) [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui
[ https://issues.apache.org/jira/browse/FLINK-11169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11169: --- Labels: pull-request-available (was: ) > [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log > after clicking the refresh button in web ui > - > > Key: FLINK-11169 > URL: https://issues.apache.org/jira/browse/FLINK-11169 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.2, 1.7.0 >Reporter: Jiayong Mo >Priority: Major > Labels: pull-request-available > > In the Flink web UI page, we can access the log of jobmanager from the tab > under its corresponding page, but we could not reload the log (the content is > not up-to-date) since the log file has been cached for 300s in the local > client' disk. And this problem occurs because of the incorrect way to check > if the file should be cached. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] AlphaGarden opened a new pull request #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's…
AlphaGarden opened a new pull request #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's… URL: https://github.com/apache/flink/pull/7312 ## What is the purpose of the change *In the Flink web UI page, we can access the log of jobmanager from the tab under its corresponding page, but we could not reload the log (the content is not up-to-date) since the log file has been cached for 300s in the local client' disk. And this problem occurs because of the incorrect way to check if the file should be cached. * ## Brief change log - *Change the way of checking if the current file is a log file or out flie.* ## Verifying this change - *git clone https://github.com/AlphaGarden/flink.git* - *cd flink* - *git checkout hotfix-web-reload-jobmanager-log* - *mvn clean package -DskipTests* - *run the ./start-cluster.sh script, then the log for jobmanager should be reloaded from backend evertime we click the refresh button in Flink Web UI* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-9717: -- Assignee: (was: boshu Zheng) > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722013#comment-16722013 ] boshu Zheng commented on FLINK-9717: Just a little thought on it to share. When considering about joins with bounded data, I would prefer the side input solution which there is no `MAX_WATERMARK` if I understand correctly. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722001#comment-16722001 ] boshu Zheng commented on FLINK-9717: Hi [~pnowojski], thanks for your comment, I haven't dug into this issue yet, feel free to take it over :) > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11166) Slot Placement Constraint
[ https://issues.apache.org/jira/browse/FLINK-11166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tony Xintong Song reassigned FLINK-11166: - Assignee: Tony Xintong Song > Slot Placement Constraint > - > > Key: FLINK-11166 > URL: https://issues.apache.org/jira/browse/FLINK-11166 > Project: Flink > Issue Type: New Feature > Components: ResourceManager >Reporter: Tony Xintong Song >Assignee: Tony Xintong Song >Priority: Major > > In many cases, users may want Flink to schedule their job tasks following > certain locality preferences. E.g., colocating upstream/downstream tasks to > reduce data transmission costs, dispersing tasks of certain pattern (e.g., > I/O intensive) to avoid resource competitions, running tasks in exclusive > TaskExecutor-s for task level resource consumption measurements, etc. > Currently, there are two ways in Flink to specify such locality preferences: > specifying preferred locations in the slot request, or setting slot sharing > group for the task. In both ways the preferences are specified when > requesting slots from the SlotPool and can affect how tasks are placed among > the slots allocated to the JobMaster. > However, there is no guarantee that such preferences can always be satisfied, > especially when slots are customized with different resource profiles for > different kinds of tasks. E.g., in cases where two tasks A and B are > preferred to be scheduled onto a same TaskExecutor, it is possible that none > of the slots customized for A offered to the JobMaster are collocated with > slots customized for B. > To better support locality preferences with various slot resource > specifications, it is necessary to allow JobMaster-s to request slots > subjected to certain placement constraints from the ResourceManager. > In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, > Mesos) already have individual supports for container level placement > constraints. It is a great opportunity for Flink to leverage such underlying > supports and enable scheduling tasks with rich locality preferences. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10590) Optional quantifier is discarded from first element of group pattern
[ https://issues.apache.org/jira/browse/FLINK-10590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721978#comment-16721978 ] bupt_ljy commented on FLINK-10590: -- It seems that we need to support the group patterns, then we're able to have the problem > Optional quantifier is discarded from first element of group pattern > > > Key: FLINK-10590 > URL: https://issues.apache.org/jira/browse/FLINK-10590 > Project: Flink > Issue Type: Bug > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Priority: Major > > In a pattern {{(A? B)+}}, the optional property of {{A}} state is effectively > discarded in the compiled state machine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721857#comment-16721857 ] Rong Rong commented on FLINK-11088: --- I further dig into the details on the document on Hadoop side and seems like there are 3 recommended way of distributing credentials to secure long running service on YARN. See here: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services. I am not sure whether this applies to other cluster resource management system, but I think it is worthwhile to take a look. For one: the current way of letting all JM and TMs renews keytab with KDC seems to be a problem. If we can have AM or JM renewing with keytab credential and distribute them via delegation token to all TMs it will release lots of loads on KDC server. I will start drafting a simple discussion doc if the community thinks this is worth to dig deeper. Any thoughts [~till.rohrmann] [~aljoscha] ? > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode [1] since proxy user or > super user might not have access to actual users' keytab, but can request > delegation tokens on users' behalf. > Based on the type of security options for long-living YARN service[2], we > propose to have the keytab file path discovery configurable depending on the > launch mode of the YARN client. > Reference: > [1] > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html > [2] > https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode
[ https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721857#comment-16721857 ] Rong Rong edited comment on FLINK-11088 at 12/14/18 11:03 PM: -- I further dig into the details on the document on Hadoop side and seems like there are 3 recommended way of distributing credentials to secure long running service on YARN. See here: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services. I am not sure whether this applies to other cluster resource management system, but I think it is worthwhile to take a look. For one: the current way of letting all JM and TMs renews keytab with KDC seems to be a problem. If we can have AM or JM renewing with keytab credential and distribute them via delegation token to all TMs it will release lots of loads on KDC server. I will start drafting a simple discussion doc if the community thinks this is worth to dig deeper. Any thoughts [~suez1224], [~till.rohrmann], [~aljoscha] ? was (Author: walterddr): I further dig into the details on the document on Hadoop side and seems like there are 3 recommended way of distributing credentials to secure long running service on YARN. See here: https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services. I am not sure whether this applies to other cluster resource management system, but I think it is worthwhile to take a look. For one: the current way of letting all JM and TMs renews keytab with KDC seems to be a problem. If we can have AM or JM renewing with keytab credential and distribute them via delegation token to all TMs it will release lots of loads on KDC server. I will start drafting a simple discussion doc if the community thinks this is worth to dig deeper. Any thoughts [~till.rohrmann] [~aljoscha] ? > Improve Kerberos Authentication using Keytab in YARN proxy user mode > > > Key: FLINK-11088 > URL: https://issues.apache.org/jira/browse/FLINK-11088 > Project: Flink > Issue Type: Improvement > Components: Security, YARN >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > Currently flink-yarn assumes keytab is shipped as application master > environment local resource on client side and will be distributed to all the > TMs. This does not work for YARN proxy user mode [1] since proxy user or > super user might not have access to actual users' keytab, but can request > delegation tokens on users' behalf. > Based on the type of security options for long-living YARN service[2], we > propose to have the keytab file path discovery configurable depending on the > launch mode of the YARN client. > Reference: > [1] > https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html > [2] > https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#discussion_r241909198 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java ## @@ -89,46 +77,72 @@ public void testCheckpointRecovery() throws Exception { expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); - final RetrievableStateHandle failingRetrievableStateHandle = mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception")); - - final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); - when(retrievableStateHandle1.retrieveState()).then( Review comment: Thanks for your review @zentol. I would separate the commits but wonder if it requires separated pull requests? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#discussion_r241907414 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ## @@ -128,7 +145,9 @@ public ZooKeeperCompletedCheckpointStore( // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage); + this.checkpointsInZooKeeper = (checkpointsInZooKeeper != null) ? Review comment: it's a bit icky but non-intrusive enough to be acceptable imo, This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#discussion_r241906430 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ## @@ -327,7 +327,6 @@ public int exists(String pathInZooKeeper) throws Exception { * @return True if the state handle could be released * @throws Exception If the ZooKeeper operation or discarding the state handle fails */ - @Nullable Review comment: unrelated, move into a separate hotfix commit This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#discussion_r241907196 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java ## @@ -89,46 +77,72 @@ public void testCheckpointRecovery() throws Exception { expectedCheckpointIds.add(1L); expectedCheckpointIds.add(2L); - final RetrievableStateHandle failingRetrievableStateHandle = mock(RetrievableStateHandle.class); - when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception")); - - final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); - when(retrievableStateHandle1.retrieveState()).then( Review comment: are all these refactorings necessary for the test to run on jdk 9? If not, please move them into a separate commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#discussion_r241907057 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ## @@ -128,7 +145,9 @@ public ZooKeeperCompletedCheckpointStore( // All operations will have the path as root this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); - this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage); + this.checkpointsInZooKeeper = (checkpointsInZooKeeper != null) ? Review comment: I am not super happy with this line. This is why I have given up this way the first time. I'd like to learn whether this style is OK in Flink or we tend to build such a testing constructor in another way. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#issuecomment-447494914 I've updated the pull request, and the final usage of mockito is to mock CuratorFramework. I'd like to listen to @tillrohrmann to learn if we can test with Curator without mockito. I am rethinking ZooKeeper based HaService and learn how to test would be quite helpful. Thanks for your review @GJL ! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#issuecomment-447485963 @GJL I can do it with a testing constructor. If it is the style that Flink would be happy with, I will send a commit for it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11134) Invalid REST API request should not log the full exception in Flink logs
[ https://issues.apache.org/jira/browse/FLINK-11134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721778#comment-16721778 ] Matt Dailey commented on FLINK-11134: - This also affects Flink 1.5.5. I got a similar stack trace, also while leaving the Flink Web UI open after deleting the job: {code} org.apache.flink.runtime.rest.NotFoundException: Job faef2e5bc048947fc1bf6d468175935d not found at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884) at java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196) at org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:86) at org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) at org.apache.flink.runtime.rest.AbstractHandler.respondAsLeader(AbstractHandler.java:168) at org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:139) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (faef2e5bc048947fc1bf6d468175935d) at org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:693) at org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:459) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} > Invalid REST API request should not log the full exception in Flink logs > > > Key: FLINK-11134 > URL: https://issues.apache.org/jira/browse/FLINK-11134 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.0 >Reporter: Mark Cho >Priority: Minor > > {code:java} > 2018-12-11 17:52:19,207 ERROR > org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception > occurred in REST handler. > org.apache.flink.runtime.rest.NotFoundException: Job > 15d06690e88d309aa1bdbb6ce7c6dcd1 not found > at > org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90) > at >
[GitHub] StefanRRichter commented on issue #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
StefanRRichter commented on issue #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#issuecomment-447403243 @azagrebin Thanks for the review. I addressed most comments as suggested. Will merge once travis is green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] StefanRRichter commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
StefanRRichter commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241839041 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; + +/** + * Responsible for serialization of currentKey, currentGroup and namespace. + * Will reuse the previous serialized currentKeyed if possible. + * @param type of the key. + */ +@NotThreadSafe +@Internal +class RocksDBSerializedCompositeKeyBuilder { + + /** The serializer for the key. */ + @Nonnull + private final TypeSerializer keySerializer; + + /** The output to write the key into. */ + @Nonnull + private final DataOutputSerializer keyOutView; + + /** The number of Key-group-prefix bytes for the key. */ + @Nonnegative + private final int keyGroupPrefixBytes; + + /** This flag indicates whether the key type has a variable byte size in serialization. */ + private final boolean keySerializerTypeVariableSized; + + /** Mark for the position after the serialized key. */ + @Nonnegative + private int afterKeyMark; + + public RocksDBSerializedCompositeKeyBuilder( Review comment: We don't strictly need it, but I think this can be useful for tools like Bravo. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tweise commented on issue #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
tweise commented on issue #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#issuecomment-447403037 @mxm I think the constructor proliferation suffers from its own issues: besides repetition you need to decide how to prioritize parameters that naturally have no priority. And again anything with a long parameter list is not only hard to read but also toxic for backward compatible code evolution. Ideally we would just have one parameter to execute, which could be of type `ExecutionParameters` and hold the job name as well as the savepoint info. Anything else that might be needed in the future can be added without breaking the interface contract. But that isn't easy to accomplish due to how the code has been cast. The difficulty comes from the protected executeRemotely method that we cannot change. How about passing the savepoint parameter or before mentioned new parameters holder through a thread local instead of the instance variable? The difference how we pass it internally is cosmetic and not important to the user. The question of savepoint vs. generalized execution parameters seems more interesting. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r241825068 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -283,6 +275,14 @@ public JobExecutionResult execute(String jobName) throws ProgramInvocationExcept return executeRemotely(streamGraph, jarFiles); } + /** +* Execute the job with savepoint restore. +*/ + public JobExecutionResult execute(String jobName, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException { + this.savepointRestoreSettings = savepointRestoreSettings; + return execute(jobName); Review comment: I understand that it is one-time, but I would still prefer to not have this side effect here. Could you either 1) make the SavepointSettings a `final` field and initialize it in the constructor or 2) provide the settings here as a parameter which is not stored in a field? Sorry for the hassle. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r241822916 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -65,6 +67,8 @@ /** The classpaths that need to be attached to each job. */ private final List globalClasspaths; + private SavepointRestoreSettings savepointRestoreSettings; Review comment: Could you document this field? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241816667 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala ## @@ -38,7 +41,25 @@ class EnumerableToLogicalTableScan( val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan] val table = oldRel.getTable val newRel = LogicalTableScan.create(oldRel.getCluster, table) -call.transformTo(newRel) + +val streamTable = table.unwrap(classOf[UpsertStreamTable[_]]) +val isUpsertTable = streamTable match { + case _: UpsertStreamTable[_] => +true + case _ => +false +} + +if (isUpsertTable) { Review comment: you could merge this if check with match above to simplify code a bit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241811123 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala ## @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], inputType: TypeInforma } } + /** +* Converts the [[DataStream]] with upsert messages into a [[Table]] with keys. +* +* The message will be encoded as [[Tuple2]]. The first field is a [[Boolean]] flag, the second +* field holds the record. A true [[Boolean]] flag indicates an update message, a false flag +* indicates a delete message. +* +* The field name and key of the new [[Table]] can be specified like this: +* +* {{{ +* val env = StreamExecutionEnvironment.getExecutionEnvironment +* val tEnv = TableEnvironment.getTableEnvironment(env) +* +* val stream: DataStream[(Boolean, (String, Int))] = ... +* val table = stream.toKeyedTable(tEnv, 'name.key, 'amount) +* }}} +* +* If field names are not explicitly specified, names are automatically extracted from the type +* of the [[DataStream]]. +* If keys are not explicitly specified, an empty key will be used and the table will be a +* single row table. +* +* @param tableEnv The [[StreamTableEnvironment]] in which the new [[Table]] is created. +* @param fields The field names of the new [[Table]] (optional). +* @return The resulting [[Table]]. +*/ + def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): Table = { Review comment: ``` Methods in DataStreamConversions: toTableFromAppendStream toTableFromUpsertStream toTableFromRetractStream ``` I think this makes sense. @twalthr do you think as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241820886 ## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala ## @@ -65,21 +64,6 @@ class ExplainTest extends AbstractTestBase { assertEquals(expect, result) } - @Test Review comment: Why have you removed this test? Was it moved/superseded by something? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241817741 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala ## @@ -22,13 +22,16 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan import org.apache.calcite.plan.RelOptRule.{any, operand} import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} import org.apache.calcite.rel.logical.LogicalTableScan +import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction +import org.apache.flink.table.plan.schema.UpsertStreamTable /** * Rule that converts an EnumerableTableScan into a LogicalTableScan. * We need this rule because Calcite creates an EnumerableTableScan * when parsing a SQL query. We convert it into a LogicalTableScan * so we can merge the optimization process with any plan that might be created - * by the Table API. + * by the Table API. The rule also checks whether the source is an upsert source and adds Review comment: This rule is only applied on the `EnumerableTableScan` nodes. This comment suggests, that `EnumerableTableScan` appears only in sql, so something is wrong here. If we want to keep it as it is now, the comment need updating. (please check also my comment below in `LogicalToEnumerableTableScan`) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241820550 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalToEnumerableTableScan.scala ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.logical.LogicalTableScan + +/** + * Rule that converts a LogicalTableScan into an EnumerableTableScan. We need this rule to Review comment: What's the problem? That we can not force calcite to fire a rule only once? Like I would expect to have a single pass rule, that's fired only once (around expand plan optimisation phase), that goes through the plan, checks if the sources is upsert and if so, inserts `LogicalUpsertToRetraction`. Alternatively, we can think about doing this one step earlier. Like in Table API somebody is creating a `LogicalTableScan`, right? We could inject logic that checks if we need `LogicalUpsertToRetraction` or not there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241807984 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ## @@ -573,6 +573,17 @@ abstract class StreamTableEnvironment( registerTableInternal(name, dataStreamTable) } + def getTypeFromUpsertStream[T](dataStream: DataStream[T]): TypeInformation[T] = { Review comment: private? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241809269 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala ## @@ -224,14 +226,14 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) { override def toString: String = s"proctime($child)" } -case class Key(child: Expression) extends UnaryExpression { - override private[flink] def resultType: TypeInformation[_] = child.resultType - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { -throw new UnsupportedOperationException( - s"Key Expression can only be used during table initialization.") - } -} +//case class Key(child: Expression) extends UnaryExpression { Review comment: Some left overs to remove? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion URL: https://github.com/apache/flink/pull/6787#discussion_r241815618 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/RemoveDataStreamUpsertToRetractionRule.scala ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.datastream + +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} +import org.apache.calcite.plan.RelOptRule.{none, operand} +import org.apache.calcite.plan.hep.HepRelVertex +import org.apache.flink.table.plan.nodes.datastream.{AccMode, DataStreamUpsertToRetraction} + +/** + * Rule to remove [[DataStreamUpsertToRetraction]] under [[AccMode]]. In this case, it is a no-op Review comment: `under [[AccMode.AccRetract]].`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zzchun commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation
zzchun commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation URL: https://github.com/apache/flink/pull/7291#discussion_r241819796 ## File path: docs/dev/table/connect.md ## @@ -58,11 +56,7 @@ The following table list all available connectors and formats. Their mutual comp | JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | | Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | -{% else %} - -This table is only available for stable releases. - -{% endif %} Review comment: @twalthr I have recovered the changes, can you help me to have a review, thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] GJL commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9
GJL commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9 URL: https://github.com/apache/flink/pull/7302#issuecomment-447380751 Thank you for your contribution to Apache Flink @TisonKun I think an additional constructor annotated with `@VisibleForTesting` is more maintainable. The current solution has the following downsides: - It is not obvious why `Comparator.comparing` does not work. The next person might try to refactor it back to the old solution and break java 9 compatibility. - The test uses powermock which might break again in later java versions. Let me loop in @tillrohrmann, the original author of the test. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r241815671 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.function.ThrowingRunnable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.apache.flink.runtime.concurrent.Executors.directExecutorService; + +/** + * Data transfer utils for RocksDbKeyedStateBackend. + */ +class RocksDbStateDataTransfer { + + static void transferAllStateDataToDirectory( + IncrementalKeyedStateHandle restoreStateHandle, + Path dest, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws Exception { + + final Map sstFiles = + restoreStateHandle.getSharedState(); + final Map miscFiles = + restoreStateHandle.getPrivateState(); + + downloadDataForAllStateHandles(sstFiles, dest, restoringThreadNum, closeableRegistry); + downloadDataForAllStateHandles(miscFiles, dest, restoringThreadNum, closeableRegistry); + } + + /** +* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their +* {@link StateHandleID}. +*/ + private static void downloadDataForAllStateHandles( + Map stateHandleMap, + Path restoreInstancePath, + int restoringThreadNum, + CloseableRegistry closeableRegistry) throws InterruptedException, IOException { + + final ExecutorService executorService = createExecutorService(restoringThreadNum); + + try { + List runnables = createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + List> futures = new ArrayList<>(runnables.size()); + for (Runnable runnable : runnables) { + futures.add(CompletableFuture.runAsync(runnable, executorService)); + } + + FutureUtils.waitForAll(futures).get(); + } catch (ExecutionException e) { + final Throwable throwable = ExceptionUtils.stripExecutionException(e); + if (throwable instanceof IOException) { + throw (IOException) throwable; Review comment: I do not think we need to wrap non-IOException with IOException. I would leave it just as `throw ExceptionUtils.stripExecutionException(e)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r241812506 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ## @@ -726,6 +728,56 @@ void sendPartitionInfos() { } } + /** +* Check whether the InputDependencyConstraint is satisfied for this vertex. +* +* @return whether the input constraint is satisfied +*/ + public boolean checkInputDependencyConstraints() { Review comment: Ok, I agree to leave it as it is. As we expect it to be always true in `scheduleOrUpdateConsumers`, I would also agree not to have the shortcut. If we want to have the shortcut, I would leave then a comment explaining why we have it, like you mentioned above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10928) Job unable to stabilise after restart
[ https://issues.apache.org/jira/browse/FLINK-10928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Harper updated FLINK-10928: -- Attachment: Screen Shot 2018-12-10 at 14.13.52.png > Job unable to stabilise after restart > -- > > Key: FLINK-10928 > URL: https://issues.apache.org/jira/browse/FLINK-10928 > Project: Flink > Issue Type: Bug > Environment: AWS EMR 5.17.0 > FLINK 1.5.2 > BEAM 2.7.0 >Reporter: Daniel Harper >Priority: Major > Attachments: Screen Shot 2018-11-16 at 15.49.03.png, Screen Shot > 2018-11-16 at 15.49.15.png, Screen Shot 2018-12-10 at 14.13.52.png, > ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf > > > We've seen a few instances of this occurring in production now (it's > difficult to reproduce) > I've attached a timeline of events as a PDF here > [^ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf] but essentially > it boils down to > 1. Job restarts due to exception > 2. Job restores from a checkpoint but we get the exception > {code} > Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: > Timeout waiting for connection from pool > {code} > 3. Job restarts > 4. Job restores from a checkpoint but we get the same exception > repeat a few times within 2-3 minutes > 5. YARN kills containers with out of memory > {code} > 2018-11-14 00:16:04,430 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection > container_1541433014652_0001_01_000716 because: Container > [pid=7725,containerID=container_1541433014652_0001_01_ > 000716] is running beyond physical memory limits. Current usage: 6.4 GB of > 6.4 GB physical memory used; 8.4 GB of 31.9 GB virtual memory used. Killing > container. > Dump of the process-tree for container_1541433014652_0001_01_000716 : > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > |- 7725 7723 7725 7725 (bash) 0 0 115863552 696 /bin/bash -c > /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m > -XX:MaxDirectMemorySize=1533m > -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log > -XX:GCLogF > ileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause > -XX:+PrintGCDateStamps -XX:+UseG1GC > -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652_00 > 01/container_1541433014652_0001_01_000716/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /var/log/hadoop-yarn/containers/application_1541433014652_0001/container > _1541433014652_0001_01_000716/taskmanager.out 2> > /var/log/hadoop-yarn/containers/application_1541433014652_0001/container_1541433014652_0001_01_000716/taskmanager.err > |- 7738 7725 7725 7725 (java) 6959576 976377 8904458240 1671684 > /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m > -XX:MaxDirectMemorySize=1533m > -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log > -XX:GCL > ogFileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause > -XX:+PrintGCDateStamps -XX:+UseG1GC > -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652 > _0001/container_1541433014652_0001_01_000716/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . > > Container killed on request. Exit code is 143 > Container exited with a non-zero exit code 143 > {code} > 6. YARN allocates new containers but the job is never able to get back into a > stable state, with constant restarts until eventually the job is cancelled > We've seen something similar to FLINK-10848 happening to with some task > managers allocated but sitting 'idle' state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10928) Job unable to stabilise after restart
[ https://issues.apache.org/jira/browse/FLINK-10928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721569#comment-16721569 ] Daniel Harper commented on FLINK-10928: --- Hi [~dawidwys] Going to be tricky to provide a heap dump due to sensitive data unfortunately. We've resolved the connection timeouts issue by increasing the connection pool size from 15 to 30, which after running this for 7 days or so has resulted in 0 'timeout waiting for connection from pool' errors when the job restarts and restores from a checkpoint One of the causes of the job restarting in the first place is FLINK-10844, which causes the checkpoint to fail (note this is intermittent we see this once or twice a day) which causes the job to restart. We are looking at enabling the setting {{failOnCheckpointingErrors}} to false to mitigate this in the meantime, although we understand the risk in enabling this setting. This 'death spiral'/instability has happened 3 or so times in the past 6 weeks, and we see the job restarting once or twice a day in the times between these massive failures. The only thing I can think of is a memory leak building over time and eventually triggering YARN to kill the containers. I did a heap dump on one of the taskmanagers this morning and it looks to me like there are multiple copies of 'user' classes i.e. BEAM code and our code, most of which have 0 instances, which looks like a classloader leak to me? This snapshot was taken after the job had restarted about 10 times. !Screen Shot 2018-12-10 at 14.13.52.png! > Job unable to stabilise after restart > -- > > Key: FLINK-10928 > URL: https://issues.apache.org/jira/browse/FLINK-10928 > Project: Flink > Issue Type: Bug > Environment: AWS EMR 5.17.0 > FLINK 1.5.2 > BEAM 2.7.0 >Reporter: Daniel Harper >Priority: Major > Attachments: Screen Shot 2018-11-16 at 15.49.03.png, Screen Shot > 2018-11-16 at 15.49.15.png, > ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf > > > We've seen a few instances of this occurring in production now (it's > difficult to reproduce) > I've attached a timeline of events as a PDF here > [^ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf] but essentially > it boils down to > 1. Job restarts due to exception > 2. Job restores from a checkpoint but we get the exception > {code} > Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: > Timeout waiting for connection from pool > {code} > 3. Job restarts > 4. Job restores from a checkpoint but we get the same exception > repeat a few times within 2-3 minutes > 5. YARN kills containers with out of memory > {code} > 2018-11-14 00:16:04,430 INFO org.apache.flink.yarn.YarnResourceManager > - Closing TaskExecutor connection > container_1541433014652_0001_01_000716 because: Container > [pid=7725,containerID=container_1541433014652_0001_01_ > 000716] is running beyond physical memory limits. Current usage: 6.4 GB of > 6.4 GB physical memory used; 8.4 GB of 31.9 GB virtual memory used. Killing > container. > Dump of the process-tree for container_1541433014652_0001_01_000716 : > |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) > SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > |- 7725 7723 7725 7725 (bash) 0 0 115863552 696 /bin/bash -c > /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m > -XX:MaxDirectMemorySize=1533m > -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log > -XX:GCLogF > ileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause > -XX:+PrintGCDateStamps -XX:+UseG1GC > -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652_00 > 01/container_1541433014652_0001_01_000716/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties > org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > /var/log/hadoop-yarn/containers/application_1541433014652_0001/container > _1541433014652_0001_01_000716/taskmanager.out 2> > /var/log/hadoop-yarn/containers/application_1541433014652_0001/container_1541433014652_0001_01_000716/taskmanager.err > |- 7738 7725 7725 7725 (java) 6959576 976377 8904458240 1671684 > /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m > -XX:MaxDirectMemorySize=1533m > -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log > -XX:GCL > ogFileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails > -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause > -XX:+PrintGCDateStamps -XX:+UseG1GC > -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652 > _0001/container_1541433014652_0001_01_000716/taskmanager.log >
[jira] [Closed] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9555. --- Resolution: Fixed Reverted for 1.7.1 in: d00dd3323d53cba323bde5ebfe390ffdb4d777d4 > Support table api in scala shell > > > Key: FLINK-9555 > URL: https://issues.apache.org/jira/browse/FLINK-9555 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Assignee: shuiqiangchen >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > It would be nice to have table api available in scala shell so that user can > experience table api in interactive way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11168) LargePlanTest times out on Travis
[ https://issues.apache.org/jira/browse/FLINK-11168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11168: - Affects Version/s: (was: 1.7.0) 1.7.1 > LargePlanTest times out on Travis > - > > Key: FLINK-11168 > URL: https://issues.apache.org/jira/browse/FLINK-11168 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.1, 1.8.0 >Reporter: Chesnay Schepler >Priority: Major > > The newly added {{LargePlanTest}} has timed out on Travis: > https://travis-ci.org/apache/flink/jobs/467949929 > {code} > 13:58:43.593 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time > elapsed: 15.201 s <<< FAILURE! - in > org.apache.flink.test.planning.LargePlanTest > 13:58:43.599 [ERROR] > testPlanningOfLargePlan(org.apache.flink.test.planning.LargePlanTest) Time > elapsed: 15.028 s <<< ERROR! > org.junit.runners.model.TestTimedOutException: test timed out after 15000 > milliseconds > at > org.apache.flink.test.planning.LargePlanTest.runProgram(LargePlanTest.java:51) > at > org.apache.flink.test.planning.LargePlanTest.testPlanningOfLargePlan(LargePlanTest.java:39) > {code} > FYI [~mxm] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-9555: - Will be reverted for 1.7.1 since it introduced bugs. > Support table api in scala shell > > > Key: FLINK-9555 > URL: https://issues.apache.org/jira/browse/FLINK-9555 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Assignee: shuiqiangchen >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > It would be nice to have table api available in scala shell so that user can > experience table api in interactive way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9555) Support table api in scala shell
[ https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-9555: Fix Version/s: (was: 1.7.1) > Support table api in scala shell > > > Key: FLINK-9555 > URL: https://issues.apache.org/jira/browse/FLINK-9555 > Project: Flink > Issue Type: New Feature > Components: Scala Shell >Affects Versions: 1.5.0 >Reporter: Jeff Zhang >Assignee: shuiqiangchen >Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > > It would be nice to have table api available in scala shell so that user can > experience table api in interactive way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721561#comment-16721561 ] Piotr Nowojski commented on FLINK-9717: --- [~kisimple], this ticket is not as easy as you might think (neither as simple as I have though when I was creating it) - this optimisation is relatively trivial to implement if you do not consider restoring from checkpoints/savepoints. Currently problem is that Flink doesn't keep & restore the last previous watermark after restoring from checkpoint and this is hard to workaround. In other words, now we can easily "flush" one side of the join when we receive {{MAX_WATERMARK}}, but what should happen after restoring from checkpoint? There is no easy way to store the information that {{MAX_WATERMARK}} was previously reached. As far as I have thought about this, it can not be stored on the state of the Join operator and even if it could be done this way, it's probably not the proper/elegant solution. Probably the correct solution is to store {{MAX_WATERMARK}} in the state around watermark emitter/source operator and the last previously emitted watermark should be re-emitted when the job is restored. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11168) LargePlanTest times out on Travis
Chesnay Schepler created FLINK-11168: Summary: LargePlanTest times out on Travis Key: FLINK-11168 URL: https://issues.apache.org/jira/browse/FLINK-11168 Project: Flink Issue Type: Bug Components: Tests Affects Versions: 1.7.0, 1.8.0 Reporter: Chesnay Schepler The newly added {{LargePlanTest}} has timed out on Travis: https://travis-ci.org/apache/flink/jobs/467949929 {code} 13:58:43.593 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 15.201 s <<< FAILURE! - in org.apache.flink.test.planning.LargePlanTest 13:58:43.599 [ERROR] testPlanningOfLargePlan(org.apache.flink.test.planning.LargePlanTest) Time elapsed: 15.028 s <<< ERROR! org.junit.runners.model.TestTimedOutException: test timed out after 15000 milliseconds at org.apache.flink.test.planning.LargePlanTest.runProgram(LargePlanTest.java:51) at org.apache.flink.test.planning.LargePlanTest.testPlanningOfLargePlan(LargePlanTest.java:39) {code} FYI [~mxm] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java
twalthr commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java URL: https://github.com/apache/flink/pull/7289#issuecomment-447366595 @hequn8128 have you checked if the docs need to be updated somewhere? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation
twalthr commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation URL: https://github.com/apache/flink/pull/7291#discussion_r241799867 ## File path: docs/dev/table/connect.md ## @@ -58,11 +56,7 @@ The following table list all available connectors and formats. Their mutual comp | JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | | Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | -{% else %} - -This table is only available for stable releases. - -{% endif %} Review comment: You are using docs of an unstable release. Why should this be confusing? Please undo these changes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7311: [hotfix][docs] Correct the field name in Connect to External Systems doc
asfgit closed pull request #7311: [hotfix][docs] Correct the field name in Connect to External Systems doc URL: https://github.com/apache/flink/pull/7311 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index 1cc97a39377..6d7db117c97 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -154,7 +154,7 @@ tableEnvironment new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() - .timestampsFromField("ts") + .timestampsFromField("timestamp") .watermarksPeriodicBounded(6) ) .field("user", Types.LONG) @@ -1166,7 +1166,7 @@ ClusterBuilder builder = ... // configure Cassandra cluster connection CassandraAppendTableSink sink = new CassandraAppendTableSink( builder, // the query must match the schema of the table - INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)); + "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)"); tableEnv.registerTableSink( "cassandraOutputTable", This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r241794342 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java ## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.InputDependencyConstraint; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; +import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the inputs constraint for {@link ExecutionVertex}. + */ +public class ExecutionVertexInputConstraintTest extends TestLogger { + + @Test + public void testInputConsumable() throws Exception { + JobVertex v1 = new JobVertex("vertex1"); + JobVertex v2 = new JobVertex("vertex2"); + JobVertex v3 = new JobVertex("vertex3"); + v1.setParallelism(2); + v2.setParallelism(2); + v3.setParallelism(2); + v1.setInvokableClass(AbstractInvokable.class); + v2.setInvokableClass(AbstractInvokable.class); + v3.setInvokableClass(AbstractInvokable.class); + v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); + v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + List ordered = Arrays.asList(v1, v2, v3); + ExecutionGraph eg = createExecutionGraph(ordered); + + ExecutionVertex ev11 = eg.getJobVertex(v1.getID()).getTaskVertices()[0]; + ExecutionVertex ev12 = eg.getJobVertex(v1.getID()).getTaskVertices()[1]; + ExecutionVertex ev21 = eg.getJobVertex(v2.getID()).getTaskVertices()[0]; + ExecutionVertex ev22 = eg.getJobVertex(v2.getID()).getTaskVertices()[1]; + ExecutionVertex ev31 = eg.getJobVertex(v3.getID()).getTaskVertices()[0]; + ExecutionVertex ev32 = eg.getJobVertex(v3.getID()).getTaskVertices()[1]; + + eg.scheduleForExecution(); Review comment: Yes, I meant `@Before void init()` method could recreate new graph before each test run. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] asfgit closed pull request #7307: [hotfix][docs] Imporve the correctness in Detecting Patterns doc
asfgit closed pull request #7307: [hotfix][docs] Imporve the correctness in Detecting Patterns doc URL: https://github.com/apache/flink/pull/7307 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/streaming/match_recognize.md b/docs/dev/table/streaming/match_recognize.md index 01799cc47b4..e38fb5c1129 100644 --- a/docs/dev/table/streaming/match_recognize.md +++ b/docs/dev/table/streaming/match_recognize.md @@ -107,7 +107,7 @@ The table has a following schema: {% highlight text %} Ticker - |-- symbol: Long # symbol of the stock + |-- symbol: String # symbol of the stock |-- price: Long # price of the stock |-- tax: Long# tax liability of the stock |-- rowtime: TimeIndicatorTypeInfo(rowtime) # point in time when the change to those values happened @@ -293,6 +293,7 @@ The same query where `B*` is modified to `B*?`, which means that `B*` should be symbol lastPrice === XYZ 13 + XYZ 16 {% endhighlight %} The pattern variable `B` matches only to the row with price `12` instead of swallowing the rows with prices `12`, `13`, and `14`. @@ -842,7 +843,7 @@ The last result matched against the rows #5, #6. This combination will produce a runtime exception because one would always try to start a new match where the last one started. This would produce an infinite loop and, thus, is prohibited. -One has to keep in mind that in case of the `SKIP TO FIRST/LAST variable`strategy it might be possible that there are no rows mapped to that +One has to keep in mind that in case of the `SKIP TO FIRST/LAST variable` strategy it might be possible that there are no rows mapped to that variable (e.g. for pattern `A*`). In such cases, a runtime exception will be thrown as the standard requires a valid row to continue the matching. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r241787811 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,65 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Aggregations + +Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported. + +Aggregate functions are applied to each subset of rows mapped to a match. In order to understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +The task of the following example is to find the longest period of time for which the average price of a ticker did not go below certain threshold. It shows how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; Review comment: @dawidwys what is `MR` actually doing? Do we have to call `SELECT MR.start_tstamp`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.
[ https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-6: --- Description: In order to guarantee exactly-once semantics, the streaming file sink is implementing a two-phase commit protocol when writing files to the filesystem. Initially data is written to in-progress files. These files are then put into "pending" state when they are completed (based on the rolling policy), and they are finally committed when the checkpoint that put them in the "pending" state is acknowledged as complete. The above shows that in the case that we have: 1) checkpoints A, B, C coming 2) checkpoint A being acknowledged and 3) failure Then we may have files that do not belong to any checkpoint (because B and C were not considered successful). These files are currently not cleaned up. In order to reduce the amount of such files created, we removed the random suffix from in-progress temporary files, so that the next in-progress file that is opened for this part, overwrites them. was: In order to guarantee exactly-once semantics, the streaming file sink is implementing a two-phase commit protocol when writing files to the filesystem. Initially data is written to in-progress files. These files are then put into "pending" state when they are completed (based on the rolling policy), and they are finally committed when the checkpoint that put them in the "pending" state is acknowledged as complete. The above shows that in the case that we have: 1) checkpoints A, B, C coming 2) checkpoint A being acknowledged and 3) failure Then we may have files that do not belong to any checkpoint (because B and C were not considered successful). These files are currently not cleaned up. This issue aims at cleaning up these files. > Overwrite outdated in-progress files in StreamingFileSink. > -- > > Key: FLINK-6 > URL: https://issues.apache.org/jira/browse/FLINK-6 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.7.2 > > > In order to guarantee exactly-once semantics, the streaming file sink is > implementing a two-phase commit protocol when writing files to the filesystem. > Initially data is written to in-progress files. These files are then put into > "pending" state when they are completed (based on the rolling policy), and > they are finally committed when the checkpoint that put them in the "pending" > state is acknowledged as complete. > The above shows that in the case that we have: > 1) checkpoints A, B, C coming > 2) checkpoint A being acknowledged and > 3) failure > Then we may have files that do not belong to any checkpoint (because B and C > were not considered successful). These files are currently not cleaned up. > In order to reduce the amount of such files created, we removed the random > suffix from in-progress temporary files, so that the next in-progress file > that is opened for this part, overwrites them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.
[ https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-6: --- Summary: Overwrite outdated in-progress files in StreamingFileSink. (was: Clean-up temporary files that upon recovery, they belong to no checkpoint.) > Overwrite outdated in-progress files in StreamingFileSink. > -- > > Key: FLINK-6 > URL: https://issues.apache.org/jira/browse/FLINK-6 > Project: Flink > Issue Type: Improvement > Components: filesystem-connector >Affects Versions: 1.7.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.7.2 > > > In order to guarantee exactly-once semantics, the streaming file sink is > implementing a two-phase commit protocol when writing files to the filesystem. > Initially data is written to in-progress files. These files are then put into > "pending" state when they are completed (based on the rolling policy), and > they are finally committed when the checkpoint that put them in the "pending" > state is acknowledged as complete. > The above shows that in the case that we have: > 1) checkpoints A, B, C coming > 2) checkpoint A being acknowledged and > 3) failure > Then we may have files that do not belong to any checkpoint (because B and C > were not considered successful). These files are currently not cleaned up. > This issue aims at cleaning up these files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11167) Optimize RocksDBList#put for no empty input
[ https://issues.apache.org/jira/browse/FLINK-11167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu updated FLINK-11167: - Component/s: State Backends, Checkpointing > Optimize RocksDBList#put for no empty input > --- > > Key: FLINK-11167 > URL: https://issues.apache.org/jira/browse/FLINK-11167 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > > In `RocksDBListState.putInternal` we will first remove the current state and > then add the new list if needed(list is no empty) such as below. > I think if the list is not empty, we could {color:#FF}skip the remove > operation{color}. > > {code:java} > public void updateInternal(List values) { >Preconditions.checkNotNull(values, "List of values to add cannot be > null."); >clear(); >if (!values.isEmpty()) { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = dataOutputView.getCopyOfBuffer(); > byte[] premerge = getPreMergedValue(values, elementSerializer, > dataOutputView); > backend.db.put(columnFamily, writeOptions, key, premerge); > } catch (IOException | RocksDBException e) { > throw new FlinkRuntimeException("Error while updating data to > RocksDB", e); > } >} > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] KarmaGYZ opened a new pull request #7311: [hotfix][docs] Correct the field name in Connect to External Systems doc
KarmaGYZ opened a new pull request #7311: [hotfix][docs] Correct the field name in Connect to External Systems doc URL: https://github.com/apache/flink/pull/7311 ## What is the purpose of the change This pr correct the field name in Connect to External Systems doc and add missing quotation marks of SQL command. ## Brief change log - correct the field name in Connect to External Systems doc - add missing quotation marks of SQL command ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10595) Support patterns that can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721431#comment-16721431 ] bupt_ljy commented on FLINK-10595: -- [~dawidwys] I'm very curious about this empty match, could you tell me the background or the use cases about it? > Support patterns that can produce empty matches > --- > > Key: FLINK-10595 > URL: https://issues.apache.org/jira/browse/FLINK-10595 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Priority: Major > > CEP library does not emit empty matches. In order to be SQL standard > compliant we explicitly forbid patterns that can produce empty > matches(patterns that have only optional states) e.g A* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels updated FLINK-10566: --- Affects Version/s: 1.8.0 > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4, 1.6.1, 1.7.0, 1.8.0 >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.1, 1.8.0 > > Attachments: chart.png > > Time Spent: 10m > Remaining Estimate: 0h > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet input = env.fromElements("a", "b", "c"); > DataSet stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet analyze(DataSet input, > DataSet stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction() { > @Override > public void open(Configuration parameters) throws Exception { > Collection broadcastSet = > getRuntimeContext().getBroadcastVariable("stats"); > } > @Override > public String map(String value) throws Exception { > return value; > } > }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); > } > DataSet branch = input >.map(s -> new Tuple2(0, s + > ii)) >.groupBy(0) >.minBy(1) >.map(kv -> kv.f1); > if (stats == null) { > stats = branch; > } else { > stats = stats.union(branch); > } > } > return stats.map(s -> "(" + s + ").stats"); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10566) Flink Planning is exponential in the number of stages
[ https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-10566. Resolution: Fixed Fix Version/s: 1.8.0 1.7.1 1.6.3 1.5.6 Backported the fix to the 1.5, 1.6 and the 1.7 release branches. > Flink Planning is exponential in the number of stages > - > > Key: FLINK-10566 > URL: https://issues.apache.org/jira/browse/FLINK-10566 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.5.4, 1.6.1, 1.7.0 >Reporter: Robert Bradshaw >Assignee: Maximilian Michels >Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.1, 1.8.0 > > Attachments: chart.png > > Time Spent: 10m > Remaining Estimate: 0h > > This makes it nearly impossible to run graphs with 100 or more stages. (The > execution itself is still sub-second, but the job submission takes > increasingly long.) > I can reproduce this with the following pipeline, which resembles my > real-world workloads (with depth up to 10 and width up, and past, 50). On > Flink it seems getting width beyond width 10 is problematic (times out after > hours). Note the log scale on the chart for time. > > {code:java} > public static void runPipeline(int depth, int width) throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > DataSet input = env.fromElements("a", "b", "c"); > DataSet stats = null; > for (int i = 0; i < depth; i++) { > stats = analyze(input, stats, width / (i + 1) + 1); > } > stats.writeAsText("out.txt"); > env.execute("depth " + depth + " width " + width); > } > public static DataSet analyze(DataSet input, > DataSet stats, int branches) { > System.out.println("analyze " + branches); > for (int i = 0; i < branches; i++) { > final int ii = i; > if (stats != null) { > input = input.map(new RichMapFunction() { > @Override > public void open(Configuration parameters) throws Exception { > Collection broadcastSet = > getRuntimeContext().getBroadcastVariable("stats"); > } > @Override > public String map(String value) throws Exception { > return value; > } > }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats"); > } > DataSet branch = input >.map(s -> new Tuple2(0, s + > ii)) >.groupBy(0) >.minBy(1) >.map(kv -> kv.f1); > if (stats == null) { > stats = branch; > } else { > stats = stats.union(branch); > } > } > return stats.map(s -> "(" + s + ").stats"); > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] mxm closed pull request #7276: [FLINK-10566] Fix exponential planning time of large programs
mxm closed pull request #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index efbc4fac039..32eed69e2ce 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map.Entry; import java.util.Set; @@ -361,10 +362,14 @@ public int getMaximumParallelism() { private static final class MaxDopVisitor implements Visitor> { + private final Set visitedOperators = new HashSet<>(); private int maxDop = -1; - + @Override public boolean preVisit(Operator visitable) { + if (!visitedOperators.add(visitable)) { + return false; + } this.maxDop = Math.max(this.maxDop, visitable.getParallelism()); return true; } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 22a2a93f066..beb1b65c4a5 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -73,6 +73,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -963,12 +964,16 @@ public Plan createProgramPlan(String jobName, boolean clearSinks) { if (!config.isAutoTypeRegistrationDisabled()) { plan.accept(new Visitor>() { - private final HashSet> deduplicator = new HashSet<>(); + private final Set> registeredTypes = new HashSet<>(); + private final Set> visitedOperators = new HashSet<>(); @Override public boolean preVisit(org.apache.flink.api.common.operators.Operator visitable) { + if (!visitedOperators.add(visitable)) { + return false; + } OperatorInformation opInfo = visitable.getOperatorInfo(); - Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, deduplicator); + Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, registeredTypes); return true; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java new file mode 100644 index 000..6c30af88ad4 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.planning; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import
[GitHub] mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs
mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs URL: https://github.com/apache/flink/pull/7276#issuecomment-447323623 All green this time. Merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10594) Support SUBSETS
[ https://issues.apache.org/jira/browse/FLINK-10594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10594: --- Labels: pull-request-available (was: ) > Support SUBSETS > --- > > Key: FLINK-10594 > URL: https://issues.apache.org/jira/browse/FLINK-10594 > Project: Flink > Issue Type: Sub-task > Components: CEP, Table API SQL >Reporter: Dawid Wysakowicz >Assignee: bupt_ljy >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] buptljy opened a new pull request #7310: [FLINK-10594][table] Add Subsets for CEP
buptljy opened a new pull request #7310: [FLINK-10594][table] Add Subsets for CEP URL: https://github.com/apache/flink/pull/7310 ## What is the purpose of the change - Add subsets for CEP ## Brief change log - Add subsets as the parameter of the MatchCodeGenerator - Merge subset's specified list results ## Verifying this change - Unit testing(EventTime/ProcessingTime/Multi Subsets) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`:no - The serializers: no - The runtime per-record code paths (performance sensitive):no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not documented This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r241741665 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ## @@ -168,27 +170,23 @@ object AggregateUtil { generateRetraction: Boolean, consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = { -val (aggFields, aggregates, isDistinctAggs, accTypes, accSpecs) = - transformToAggregateFunctions( +val aggregateMetadata = extractAggregateMetadata( namedAggregates.map(_.getKey), inputRowType, consumeRetraction, tableConfig, isStateBackedDataViews = true) -val aggMapping = aggregates.indices.map(_ + groupings.length).toArray - -val outputArity = groupings.length + aggregates.length - -val aggregationStateType: RowTypeInfo = new RowTypeInfo(accTypes: _*) +val aggMapping = getAdjustedMapping(aggregateMetadata.getAggregateCallsCount, groupings.length) Review comment: Actually I thought that this function can be part of `AggregateMetadata`? Because you always apply it on the calls count. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7177#discussion_r241742837 ## File path: docs/dev/table/streaming/match_recognize.md ## @@ -211,6 +211,65 @@ If a condition is not defined for a pattern variable, a default condition will b For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section. +### Aggregations + +Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both [built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom [user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported. + +Aggregate functions are applied to each subset of rows mapped to a match. In order to understand how those subsets are evaluated have a look at the [event stream navigation](#pattern-navigation) section. + +The task of the following example is to find the longest period of time for which the average price of a ticker did not go below certain threshold. It shows how expressible `MATCH_RECOGNIZE` can become with aggregations. +This task can be performed with the following query: + +{% highlight sql %} +SELECT * +FROM Ticker +MATCH_RECOGNIZE ( +PARTITION BY symbol +ORDER BY rowtime +MEASURES +FIRST(A.rowtime) AS start_tstamp, +LAST(A.rowtime) AS end_tstamp, +AVG(A.price) AS avgPrice +ONE ROW PER MATCH +AFTER MATCH SKIP TO FIRST B +PATTERN (A+ B) +DEFINE +A AS AVG(A.price) < 15 +) MR; +{% endhighlight %} + +Given this query and following input values: + +{% highlight text %} +symbol rowtime pricetax +== === === +'ACME' '01-Apr-11 10:00:00' 12 1 +'ACME' '01-Apr-11 10:00:01' 17 2 +'ACME' '01-Apr-11 10:00:02' 13 1 +'ACME' '01-Apr-11 10:00:03' 16 3 +'ACME' '01-Apr-11 10:00:04' 25 2 +'ACME' '01-Apr-11 10:00:05' 2 1 +'ACME' '01-Apr-11 10:00:06' 4 1 +'ACME' '01-Apr-11 10:00:07' 10 2 +'ACME' '01-Apr-11 10:00:08' 15 2 +'ACME' '01-Apr-11 10:00:09' 25 2 +'ACME' '01-Apr-11 10:00:10' 30 1 +{% endhighlight %} + +The query will accumulate events as part of the pattern variable `A` as long as the average price of them does not exceed `15`. For example, such a limit exceeding happens at `01-Apr-11 10:00:04`. +The following period exceeds the average price of `15` again at `01-Apr-11 10:00:10`. Thus the results for said query will be: + +{% highlight text %} + symbol start_tstamp end_tstamp avgPrice += == == +ACME 01-APR-11 10:00:00 01-APR-11 10:00:03 14.5 +ACME 01-APR-11 10:00:04 01-APR-11 10:00:09 13.5 +{% endhighlight %} + +Note Aggregations can be applied to expressions, but only if they reference a single pattern variable. Thus `SUM(A.price * A.tax)` is a valid one, but `AVG(A.price * B.tax)` is not. + +Attention `DISTINCT` aggregations are not supported. Review comment: Can you open a follow up issue for this if it does not exist yet? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11161) Unable to import java packages in scala-shell
[ https://issues.apache.org/jira/browse/FLINK-11161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721350#comment-16721350 ] Timo Walther commented on FLINK-11161: -- CC [~sunjincheng121] > Unable to import java packages in scala-shell > - > > Key: FLINK-11161 > URL: https://issues.apache.org/jira/browse/FLINK-11161 > Project: Flink > Issue Type: Improvement > Components: Scala Shell >Affects Versions: 1.8.0 >Reporter: Jeff Zhang >Assignee: shuiqiangchen >Priority: Minor > Labels: pull-request-available > Attachments: 09_33_31__12_14_2018.jpg > > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721348#comment-16721348 ] Tzu-Li (Gordon) Tai commented on FLINK-11046: - I propose the following fix: The {{RequestIndexer}} instance provided to the {{ActionRequestFailureHandler}} shouldn't be the same instance as the one used for indexing incoming records (i.e. the one used in {{invoke}} method of the sink). Instead, it should be a separate instance, which buffers any requests that the user attempts to re-index in the failure handler. In {{invoke}}, before processing the next element, we always check if there are buffered requests from the failure handler that needs to be added to the actual request indexer. [~luoguohao] you mentioned that you would want to try fixing this. Do you want to take a try on this? > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why the field > `concurrentRequests` was set to zero. And i saw the the initialize for > bulkprocessor in class `ElasticsearchSinkBase`: > {code:java} > protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { > ... > BulkProcessor.Builder bulkProcessorBuilder = > callBridge.createBulkProcessorBuilder(client, listener); > // This makes flush() blocking >
[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java
[ https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721330#comment-16721330 ] Timo Walther commented on FLINK-11067: -- Sorry, [~dian.fu] and [~sunjincheng121] I forgot that {{BatchTableEnvironment}} and {{StreamTableEnvironment}} are also in different packages. Yes, I think [~dawidwys] suggestion is a better solution with a common base class in {{flink-table-api-base}}. In summary: // in flink-tabe-api-base interface BaseTableEnvironment { // methods independent of batch/streaming or Java/Scala // contains main logic for within the table ecosystem // (reading from table sources and writing to table sinks) } // in flink-table-api-java package interface TableEnvironment extends BaseTableEnvironment{ // only methods for creating a table environment in Java! static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment env); static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment env); // and methods specific for Java i.e. UDF registration } interface BatchTableEnvironment extends TableEnvironment { // methods specific for batch and Java (toDataSet, fromDataSet) } interface StreamTableEnvironment extends TableEnvironment { // methods specific for streaming and Java (toRetractStream, ...) } // in flink-table-api-scala package interface TableEnvironment extends BaseTableEnvironment { // only methods for creating a table environment in Scala! static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment env); static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment env); // and methods specific for Scala i.e. UDF registration } interface BatchTableEnvironment extends TableEnvironment { // methods specific for batch and Scala (toDataSet, fromDataSet) } interface StreamTableEnvironment extends TableEnvironment { // methods specific for streaming and Scala (toRetractStream, ...) } What do you think? > Port TableEnvironments to Java > -- > > Key: FLINK-11067 > URL: https://issues.apache.org/jira/browse/FLINK-11067 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Major > > This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, > {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided > and discussed. Some refactoring and clean up might be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] wangyang0918 commented on issue #7144: [FLINK-10932] [ResourceManager] Initial flink-kubernetes module with empty implementation
wangyang0918 commented on issue #7144: [FLINK-10932] [ResourceManager] Initial flink-kubernetes module with empty implementation URL: https://github.com/apache/flink/pull/7144#issuecomment-447307319 @isunjin It’s great to start the kubernetes integration with flink. I am glad to get involved in and could help to give some suggestions based on our use case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11067) Port TableEnvironments to Java
[ https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721330#comment-16721330 ] Timo Walther edited comment on FLINK-11067 at 12/14/18 12:07 PM: - Sorry, [~dian.fu] and [~sunjincheng121] I forgot that {{BatchTableEnvironment}} and {{StreamTableEnvironment}} are also in different packages. Yes, I think [~dawidwys] suggestion is a better solution with a common base class in {{flink-table-api-base}}. In summary: {code} // in flink-tabe-api-base interface BaseTableEnvironment { // methods independent of batch/streaming or Java/Scala // contains main logic for within the table ecosystem // (reading from table sources and writing to table sinks) } // in flink-table-api-java package interface TableEnvironment extends BaseTableEnvironment{ // only methods for creating a table environment in Java! static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment env); static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment env); // and methods specific for Java i.e. UDF registration } interface BatchTableEnvironment extends TableEnvironment { // methods specific for batch and Java (toDataSet, fromDataSet) } interface StreamTableEnvironment extends TableEnvironment { // methods specific for streaming and Java (toRetractStream, ...) } // in flink-table-api-scala package interface TableEnvironment extends BaseTableEnvironment { // only methods for creating a table environment in Scala! static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment env); static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment env); // and methods specific for Scala i.e. UDF registration } interface BatchTableEnvironment extends TableEnvironment { // methods specific for batch and Scala (toDataSet, fromDataSet) } interface StreamTableEnvironment extends TableEnvironment { // methods specific for streaming and Scala (toRetractStream, ...) } {code} What do you think? was (Author: twalthr): Sorry, [~dian.fu] and [~sunjincheng121] I forgot that {{BatchTableEnvironment}} and {{StreamTableEnvironment}} are also in different packages. Yes, I think [~dawidwys] suggestion is a better solution with a common base class in {{flink-table-api-base}}. In summary: // in flink-tabe-api-base interface BaseTableEnvironment { // methods independent of batch/streaming or Java/Scala // contains main logic for within the table ecosystem // (reading from table sources and writing to table sinks) } // in flink-table-api-java package interface TableEnvironment extends BaseTableEnvironment{ // only methods for creating a table environment in Java! static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment env); static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment env); // and methods specific for Java i.e. UDF registration } interface BatchTableEnvironment extends TableEnvironment { // methods specific for batch and Java (toDataSet, fromDataSet) } interface StreamTableEnvironment extends TableEnvironment { // methods specific for streaming and Java (toRetractStream, ...) } // in flink-table-api-scala package interface TableEnvironment extends BaseTableEnvironment { // only methods for creating a table environment in Scala! static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment env); static StreamTableEnvironment getTableEnvironment(StreamExecutionEnvironment env); // and methods specific for Scala i.e. UDF registration } interface BatchTableEnvironment extends TableEnvironment { // methods specific for batch and Scala (toDataSet, fromDataSet) } interface StreamTableEnvironment extends TableEnvironment { // methods specific for streaming and Scala (toRetractStream, ...) } What do you think? > Port TableEnvironments to Java > -- > > Key: FLINK-11067 > URL: https://issues.apache.org/jira/browse/FLINK-11067 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Dawid Wysakowicz >Priority: Major > > This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, > {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided > and discussed. Some refactoring and clean up might be necessary. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol closed pull request #7309: Release 1.7
zentol closed pull request #7309: Release 1.7 URL: https://github.com/apache/flink/pull/7309 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] klion26 edited a comment on issue #7309: Release 1.7
klion26 edited a comment on issue #7309: Release 1.7 URL: https://github.com/apache/flink/pull/7309#issuecomment-447302388 Hi, @westkiller, the pr wants to merge 232 commits from release-1.7 to master, maybe we could close this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721314#comment-16721314 ] Tzu-Li (Gordon) Tai commented on FLINK-11046: - Ah , I think I understand the deadlock now. It's because the we're calling the user provided {{ActionRequestFailureHandler.onFailure(...)}} inside the {{afterBulk}} callback. The lock is only released when the {{afterBulk}} method returns. So, the deadlock is: 1. The {{BulkProcessor}} flushes, and one of the document indexing failed, which invokes the user's {{ActionRequestFailureHandler.onFailure(...)}}. At this point, the lock on {{BulkProcessor}} isn't released yet, because the {{onFailure}} call is part of the bulk processor's flush callback. 2. Within {{ActionRequestFailureHandler.onFailure(...)}}, in your case you added some new documents to be indexed. Upon adding, the {{BulkProcessor}} would try to flush again, but the lock wasn't released yet and therefore deadlock. So, the re-indexing thread (i.e. the async callback) should have been blocked on: [https://github.com/elastic/elasticsearch/blob/v6.3.1/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java#L60] While the main task thread should have been blocked on: [https://github.com/elastic/elasticsearch/blob/v6.3.1/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java#L86] Could you confirm this and see if the analysis makes sense to you? [~luoguohao] > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code}
[jira] [Closed] (FLINK-11040) Incorrect generic type of DataStream in broadcast_state.md
[ https://issues.apache.org/jira/browse/FLINK-11040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-11040. Resolution: Fixed Fix Version/s: 1.8.0 Fixed via: bc4194ab756e86c5883b272e97f6825d7d5e9ba2 > Incorrect generic type of DataStream in broadcast_state.md > -- > > Key: FLINK-11040 > URL: https://issues.apache.org/jira/browse/FLINK-11040 > Project: Flink > Issue Type: Bug >Affects Versions: 1.7.0 >Reporter: Yangze Guo >Assignee: Yangze Guo >Priority: Minor > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > In this > [document|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html#provided-apis], > the generic type of output DataStream is Match. According to my > understanding of the code, I think it should be String, as the last > operator's return type is that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #7309: Release 1.7
klion26 commented on issue #7309: Release 1.7 URL: https://github.com/apache/flink/pull/7309#issuecomment-447302388 @westkiller Why do you want to create this pr? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys closed pull request #7205: [FLINK-11040][docs] fixed the Incorrect generic type in broadcast_state.md
dawidwys closed pull request #7205: [FLINK-11040][docs] fixed the Incorrect generic type in broadcast_state.md URL: https://github.com/apache/flink/pull/7205 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/state/broadcast_state.md b/docs/dev/stream/state/broadcast_state.md index f336a855a3a..628a6308b57 100644 --- a/docs/dev/stream/state/broadcast_state.md +++ b/docs/dev/stream/state/broadcast_state.md @@ -94,7 +94,7 @@ The exact type of the function depends on the type of the non-broadcasted stream {% highlight java %} -DataStream output = colorPartitionedStream +DataStream output = colorPartitionedStream .connect(ruleBroadcastStream) .process( @@ -107,7 +107,7 @@ DataStream output = colorPartitionedStream new KeyedBroadcastProcessFunction() { // my matching logic } - ) + ); {% endhighlight %} ### BroadcastProcessFunction and KeyedBroadcastProcessFunction This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry
[ https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721296#comment-16721296 ] Tzu-Li (Gordon) Tai commented on FLINK-11046: - This seems a bit odd. While concurrent requests is indeed set to 0 and therefore only a single bulk request will be allowed to be executed and new index accumulations are blocked during the process, the lock should have been released after the bulk request finishes and un-block the new index addition. After all, the {{BulkProcessor}} is supposed to be thread-safe: [http://javadoc.kyubu.de/elasticsearch/HEAD/org/elasticsearch/action/bulk/BulkProcessor.html.] > ElasticSearch6Connector cause thread blocked when index failed with retry > - > > Key: FLINK-11046 > URL: https://issues.apache.org/jira/browse/FLINK-11046 > Project: Flink > Issue Type: Bug > Components: ElasticSearch Connector >Affects Versions: 1.6.2 >Reporter: luoguohao >Priority: Major > > When i'm using es6 sink to index into es, bulk process with some exception > catched, and i trying to reindex the document with the call > `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` > method, but things goes incorrect. The call thread stuck there, and with the > thread dump, i saw the `bulkprocessor` object was locked by other thread. > {code:java} > public interface ActionRequestFailureHandler extends Serializable { > void onFailure(ActionRequest action, Throwable failure, int restStatusCode, > RequestIndexer indexer) throws Throwable; > } > {code} > After i read the code implemented in the `indexer.add(action)`, i find that > `synchronized` is needed on each add operation. > {code:java} > private synchronized void internalAdd(DocWriteRequest request, @Nullable > Object payload) { > ensureOpen(); > bulkRequest.add(request, payload); > executeIfNeeded(); > } > {code} > And, at i also noticed that `bulkprocessor` object would also locked in the > bulk process thread. > the bulk process operation is in the following code: > {code:java} > public void execute(BulkRequest bulkRequest, long executionId) { > Runnable toRelease = () -> {}; > boolean bulkRequestSetupSuccessful = false; > try { > listener.beforeBulk(executionId, bulkRequest); > semaphore.acquire(); > toRelease = semaphore::release; > CountDownLatch latch = new CountDownLatch(1); > retry.withBackoff(consumer, bulkRequest, new > ActionListener() { > @Override > public void onResponse(BulkResponse response) { > try { > listener.afterBulk(executionId, bulkRequest, response); > } finally { > semaphore.release(); > latch.countDown(); > } > } > @Override > public void onFailure(Exception e) { > try { > listener.afterBulk(executionId, bulkRequest, e); > } finally { > semaphore.release(); > latch.countDown(); > } > } > }, Settings.EMPTY); > bulkRequestSetupSuccessful = true; >if (concurrentRequests == 0) { >latch.await(); > } > } catch (InterruptedException e) { > Thread.currentThread().interrupt(); > logger.info(() -> new ParameterizedMessage("Bulk request {} has been > cancelled.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } catch (Exception e) { > logger.warn(() -> new ParameterizedMessage("Failed to execute bulk > request {}.", executionId), e); > listener.afterBulk(executionId, bulkRequest, e); > } finally { > if (bulkRequestSetupSuccessful == false) { // if we fail on > client.bulk() release the semaphore > toRelease.run(); > } > } > } > {code} > As the read line i marked above, i think, that's the reason why the retry > operation thread was block, because the the bulk process thread never release > the lock on `bulkprocessor`. and, i also trying to figure out why the field > `concurrentRequests` was set to zero. And i saw the the initialize for > bulkprocessor in class `ElasticsearchSinkBase`: > {code:java} > protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) { > ... > BulkProcessor.Builder bulkProcessorBuilder = > callBridge.createBulkProcessorBuilder(client, listener); > // This makes flush() blocking > bulkProcessorBuilder.setConcurrentRequests(0); > > ... > return bulkProcessorBuilder.build(); > } > {code} > this field value was set to zero explicitly. So, all things seems to make > sense, but i still
[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead… URL: https://github.com/apache/flink/pull/7255#discussion_r241636524 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ## @@ -755,23 +775,10 @@ else if (numConsumers == 0) { // TODO The current approach may send many update messages even though the consuming // task has already been deployed with all necessary information. We have to check // whether this is a problem and fix it, if it is. - CompletableFuture.supplyAsync( - () -> { - try { - final ExecutionGraph executionGraph = consumerVertex.getExecutionGraph(); - consumerVertex.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed(), - LocationPreferenceConstraint.ANY, // there must be at least one known location - Collections.emptySet()); - } catch (Throwable t) { - consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + - "vertex " + consumerVertex, t)); - } - - return null; - }, - executor); + if (consumerVertex.checkInputDependencyConstraints()) { Review comment: From my understanding, the TODO comment is related to the "`consumerState == CREATED`" section in `scheduleOrUpdateConsumers`, which invokes `cachePartitionInfo` first and then schedules the vertex. The cachePartitionInfo action is needed to avoid deployment race, at the cost of redundant partition infos to update to task, which is the concern as described in the TODO comment. So far the redundant partition it's not a big problem. But I think we can optimize it later. One possible solution in my mind is to remove known partition infos from the cache when creating InputChannelDeploymentDescriptor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zzchun commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation
zzchun commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation URL: https://github.com/apache/flink/pull/7291#discussion_r241627373 ## File path: docs/dev/table/connect.md ## @@ -58,11 +56,7 @@ The following table list all available connectors and formats. Their mutual comp | JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | | Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | -{% else %} - -This table is only available for stable releases. - -{% endif %} Review comment: @klion26 Thank you for your review. As shown in the `Dependencies` module of [Connect to External Systems](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html) page, `Connectors` and `Formats` tables are described, but we cannot see theses tables, which is confusing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] westkiller opened a new pull request #7309: Release 1.7
westkiller opened a new pull request #7309: Release 1.7 URL: https://github.com/apache/flink/pull/7309 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-9142) Lower the minimum number of buffers for incoming channels to 1
[ https://issues.apache.org/jira/browse/FLINK-9142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] boshu Zheng reassigned FLINK-9142: -- Assignee: boshu Zheng > Lower the minimum number of buffers for incoming channels to 1 > -- > > Key: FLINK-9142 > URL: https://issues.apache.org/jira/browse/FLINK-9142 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Nico Kruber >Assignee: boshu Zheng >Priority: Major > Fix For: 1.6.3, 1.7.2, 1.8.0 > > > Even if we make the floating buffers optional, we still require > {{taskmanager.network.memory.buffers-per-channel}} number of (exclusive) > buffers per incoming channel with credit-based flow control while without, > the minimum was 1 and only the maximum number of buffers was influenced by > this parameter. > {{taskmanager.network.memory.buffers-per-channel}} is set to {{2}} by default > with the argumentation that this way we will have one buffer available for > netty to process while a worker thread is processing/deserializing the other > buffer. While this seems reasonable, it does increase our minimum > requirements. Instead, we could probably live with {{1}} exclusive buffer and > up to {{gate.getNumberOfInputChannels() * (networkBuffersPerChannel - 1) + > extraNetworkBuffersPerGate}} floating buffers. That way we will have the same > memory footprint as before with only slightly changed behaviour. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241457640 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java ## @@ -80,8 +82,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, DataInputView inputView) throws } } + public static boolean isSerializerTypeVariableSized(@Nonnull TypeSerializer serializer) { Review comment: This looks like something for `TypeSerializerUtils`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241465279 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; + +/** + * Responsible for serialization of currentKey, currentGroup and namespace. + * Will reuse the previous serialized currentKeyed if possible. + * @param type of the key. + */ +@NotThreadSafe +@Internal +class RocksDBSerializedCompositeKeyBuilder { + + /** The serializer for the key. */ + @Nonnull + private final TypeSerializer keySerializer; + + /** The output to write the key into. */ + @Nonnull + private final DataOutputSerializer keyOutView; + + /** The number of Key-group-prefix bytes for the key. */ + @Nonnegative + private final int keyGroupPrefixBytes; + + /** This flag indicates whether the key type has a variable byte size in serialization. */ + private final boolean keySerializerTypeVariableSized; + + /** Mark for the position after the serialized key. */ + @Nonnegative + private int afterKeyMark; + + public RocksDBSerializedCompositeKeyBuilder( Review comment: if we want to keep the class package private, do we need public for constructor/methods? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241456301 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; + +/** + * Responsible for serialization of currentKey, currentGroup and namespace. + * Will reuse the previous serialized currentKeyed if possible. Review comment: I guess: currentKeyed -> currentKey This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241709684 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -129,30 +126,78 @@ public void setCurrentNamespace(N namespace) { final TypeSerializer safeNamespaceSerializer, final TypeSerializer safeValueSerializer) throws Exception { - Preconditions.checkNotNull(serializedKeyAndNamespace); Review comment: also here @nonnull for function args? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241709422 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ## @@ -100,26 +100,23 @@ protected AbstractRocksDBState( this.dataOutputView = new DataOutputSerializer(128); this.dataInputView = new DataInputDeserializer(); - this.ambiguousKeyPossible = - RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer); + this.sharedKeyNamespaceSerializer = backend.getSharedRocksKeyBuilder(); } // @Override public void clear() { try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = dataOutputView.getCopyOfBuffer(); - backend.db.delete(columnFamily, writeOptions, key); - } catch (IOException | RocksDBException e) { + backend.db.delete(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace()); + } catch (RocksDBException e) { throw new FlinkRuntimeException("Error while removing entry from RocksDB", e); } } @Override public void setCurrentNamespace(N namespace) { - this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace"); + this.currentNamespace = namespace; Review comment: Allowing `null` namespace or just missing @Nonnull for function arg? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241707867 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +/** + * Test for @{@link RocksDBSerializedCompositeKeyBuilder}. + */ +public class RocksDBSerializedCompositeKeyBuilderTest { + + private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128); + + private static final int[] TEST_PARALLELISMS = new int[]{64, 4096}; + private static final Collection TEST_INTS = Arrays.asList(42, 4711); + private static final Collection TEST_STRINGS = Arrays.asList("test123", "abc"); + + @Before + public void before() { + dataOutputSerializer.clear(); + } + + @Test + public void testSetKey() throws IOException { + for (int parallelism : TEST_PARALLELISMS) { + testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, parallelism); + testSetKeyInternal(StringSerializer.INSTANCE, TEST_STRINGS, parallelism); + } + } + + @Test + public void testSetKeyNamespace() throws IOException { + for (int parallelism : TEST_PARALLELISMS) { + testSetKeyNamespaceInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism); Review comment: Maybe small improvement. We could have tuple2's of Serializer and test collection. Also embedded loops to iterate over possible combinations of tuple2's for key/namespace/user key cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241699307 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; + +/** + * Responsible for serialization of currentKey, currentGroup and namespace. + * Will reuse the previous serialized currentKeyed if possible. + * @param type of the key. + */ +@NotThreadSafe +@Internal +class RocksDBSerializedCompositeKeyBuilder { + + /** The serializer for the key. */ + @Nonnull + private final TypeSerializer keySerializer; + + /** The output to write the key into. */ + @Nonnull + private final DataOutputSerializer keyOutView; + + /** The number of Key-group-prefix bytes for the key. */ + @Nonnegative + private final int keyGroupPrefixBytes; + + /** This flag indicates whether the key type has a variable byte size in serialization. */ + private final boolean keySerializerTypeVariableSized; + + /** Mark for the position after the serialized key. */ + @Nonnegative + private int afterKeyMark; + + public RocksDBSerializedCompositeKeyBuilder( + @Nonnull TypeSerializer keySerializer, + @Nonnegative int keyGroupPrefixBytes, + @Nonnegative int initialSize) { + this( + keySerializer, + new DataOutputSerializer(initialSize), + keyGroupPrefixBytes, + RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer), + 0); + } + + @VisibleForTesting + RocksDBSerializedCompositeKeyBuilder( + @Nonnull TypeSerializer keySerializer, + @Nonnull DataOutputSerializer keyOutView, + @Nonnegative int keyGroupPrefixBytes, + boolean keySerializerTypeVariableSized, + @Nonnegative int afterKeyMark) { + this.keySerializer = keySerializer; + this.keyOutView = keyOutView; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.keySerializerTypeVariableSized = keySerializerTypeVariableSized; + this.afterKeyMark = afterKeyMark; + } + + /** +* Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create +* composite keys with provided namespaces. +* +* @param keythe key. +* @param keyGroupId the key-group id for the key. +*/ + public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) { + try { + serializeKeyGroupAndKey(key, keyGroupId); + } catch (IOException shouldNeverHappen) { + throw new FlinkRuntimeException(shouldNeverHappen); + } + } + + /** +* Returns a serialized composite key, from the key and key-group provided in a previous call to +* {@link #setKeyAndKeyGroup(Object, int)} and the given namespace. +* +* @param namespace the namespace to concatenate for the serialized composite key bytes. +* @param namespaceSerializer the serializer to obtain the serialized form of the namespace. +* @param the type of the namespace. +* @return the bytes for the serialized composite key of
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241453092 ## File path: flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java ## @@ -350,6 +352,11 @@ public void write(DataInputView source, int numBytes) throws IOException { this.position += numBytes; } + public void setPosition(int position) { + Preconditions.checkArgument(position >= 0 && position <= this.position, "Position out of bounds."); Review comment: should it not be: `Preconditions.checkArgument(position >= 0 && position < buffer.length ...)`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241708059 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java ## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +/** + * Test for @{@link RocksDBSerializedCompositeKeyBuilder}. + */ +public class RocksDBSerializedCompositeKeyBuilderTest { + + private final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128); + + private static final int[] TEST_PARALLELISMS = new int[]{64, 4096}; + private static final Collection TEST_INTS = Arrays.asList(42, 4711); + private static final Collection TEST_STRINGS = Arrays.asList("test123", "abc"); + + @Before + public void before() { + dataOutputSerializer.clear(); + } + + @Test + public void testSetKey() throws IOException { + for (int parallelism : TEST_PARALLELISMS) { + testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, parallelism); + testSetKeyInternal(StringSerializer.INSTANCE, TEST_STRINGS, parallelism); + } + } + + @Test + public void testSetKeyNamespace() throws IOException { + for (int parallelism : TEST_PARALLELISMS) { + testSetKeyNamespaceInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism); + testSetKeyNamespaceInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism); + testSetKeyNamespaceInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism); + testSetKeyNamespaceInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism); + } + } + + @Test + public void testSetKeyNamespaceUserKey() throws IOException { + for (int parallelism : TEST_PARALLELISMS) { + testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_INTS, parallelism); + testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_INTS, parallelism); + testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_INTS, parallelism); + testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, TEST_INTS, parallelism); + testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS, TEST_STRINGS, parallelism); + testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, TEST_STRINGS, parallelism); + testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, TEST_STRINGS, parallelism); +
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241464119 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; + +/** + * Responsible for serialization of currentKey, currentGroup and namespace. + * Will reuse the previous serialized currentKeyed if possible. + * @param type of the key. + */ +@NotThreadSafe +@Internal +class RocksDBSerializedCompositeKeyBuilder { + + /** The serializer for the key. */ + @Nonnull + private final TypeSerializer keySerializer; + + /** The output to write the key into. */ + @Nonnull + private final DataOutputSerializer keyOutView; + + /** The number of Key-group-prefix bytes for the key. */ + @Nonnegative + private final int keyGroupPrefixBytes; + + /** This flag indicates whether the key type has a variable byte size in serialization. */ + private final boolean keySerializerTypeVariableSized; + + /** Mark for the position after the serialized key. */ + @Nonnegative + private int afterKeyMark; + + public RocksDBSerializedCompositeKeyBuilder( + @Nonnull TypeSerializer keySerializer, + @Nonnegative int keyGroupPrefixBytes, + @Nonnegative int initialSize) { + this( + keySerializer, + new DataOutputSerializer(initialSize), + keyGroupPrefixBytes, + RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer), + 0); + } + + @VisibleForTesting + RocksDBSerializedCompositeKeyBuilder( + @Nonnull TypeSerializer keySerializer, + @Nonnull DataOutputSerializer keyOutView, + @Nonnegative int keyGroupPrefixBytes, + boolean keySerializerTypeVariableSized, + @Nonnegative int afterKeyMark) { + this.keySerializer = keySerializer; + this.keyOutView = keyOutView; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.keySerializerTypeVariableSized = keySerializerTypeVariableSized; + this.afterKeyMark = afterKeyMark; + } + + /** +* Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create Review comment: `they will be used` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241716805 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ## @@ -343,10 +321,32 @@ public void migrateSerializedValue( prevPosition = in.getPosition(); } try { - return result.isEmpty() ? null : getPreMergedValue(result, elementSerializer, out); + return result.isEmpty() ? null : serializeValueList(result, elementSerializer, DELIMITER); } catch (IOException e) { throw new FlinkRuntimeException("Failed to serialize transformed list", e); } } + + byte[] serializeValueList( Review comment: maybe add static method to AbstractRocksDBState with serialisation output as arg This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state URL: https://github.com/apache/flink/pull/7288#discussion_r241699417 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.IOException; + +/** + * Responsible for serialization of currentKey, currentGroup and namespace. + * Will reuse the previous serialized currentKeyed if possible. + * @param type of the key. + */ +@NotThreadSafe +@Internal +class RocksDBSerializedCompositeKeyBuilder { + + /** The serializer for the key. */ + @Nonnull + private final TypeSerializer keySerializer; + + /** The output to write the key into. */ + @Nonnull + private final DataOutputSerializer keyOutView; + + /** The number of Key-group-prefix bytes for the key. */ + @Nonnegative + private final int keyGroupPrefixBytes; + + /** This flag indicates whether the key type has a variable byte size in serialization. */ + private final boolean keySerializerTypeVariableSized; + + /** Mark for the position after the serialized key. */ + @Nonnegative + private int afterKeyMark; + + public RocksDBSerializedCompositeKeyBuilder( + @Nonnull TypeSerializer keySerializer, + @Nonnegative int keyGroupPrefixBytes, + @Nonnegative int initialSize) { + this( + keySerializer, + new DataOutputSerializer(initialSize), + keyGroupPrefixBytes, + RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer), + 0); + } + + @VisibleForTesting + RocksDBSerializedCompositeKeyBuilder( + @Nonnull TypeSerializer keySerializer, + @Nonnull DataOutputSerializer keyOutView, + @Nonnegative int keyGroupPrefixBytes, + boolean keySerializerTypeVariableSized, + @Nonnegative int afterKeyMark) { + this.keySerializer = keySerializer; + this.keyOutView = keyOutView; + this.keyGroupPrefixBytes = keyGroupPrefixBytes; + this.keySerializerTypeVariableSized = keySerializerTypeVariableSized; + this.afterKeyMark = afterKeyMark; + } + + /** +* Sets the key and key-group as prefix. This will serialize them into the buffer and the will be used to create +* composite keys with provided namespaces. +* +* @param keythe key. +* @param keyGroupId the key-group id for the key. +*/ + public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int keyGroupId) { + try { + serializeKeyGroupAndKey(key, keyGroupId); + } catch (IOException shouldNeverHappen) { + throw new FlinkRuntimeException(shouldNeverHappen); + } + } + + /** +* Returns a serialized composite key, from the key and key-group provided in a previous call to +* {@link #setKeyAndKeyGroup(Object, int)} and the given namespace. +* +* @param namespace the namespace to concatenate for the serialized composite key bytes. +* @param namespaceSerializer the serializer to obtain the serialized form of the namespace. +* @param the type of the namespace. +* @return the bytes for the serialized composite key of
[jira] [Created] (FLINK-11167) Optimize RocksDBList#put for no empty input
Congxian Qiu created FLINK-11167: Summary: Optimize RocksDBList#put for no empty input Key: FLINK-11167 URL: https://issues.apache.org/jira/browse/FLINK-11167 Project: Flink Issue Type: Improvement Reporter: Congxian Qiu In `RocksDBListState.putInternal` we will first remove the current state and then add the new list if needed(list is no empty) such as below. I think if the list is not empty, we could {color:#FF}skip the remove operation{color}. {code:java} public void updateInternal(List values) { Preconditions.checkNotNull(values, "List of values to add cannot be null."); clear(); if (!values.isEmpty()) { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = dataOutputView.getCopyOfBuffer(); byte[] premerge = getPreMergedValue(values, elementSerializer, dataOutputView); backend.db.put(columnFamily, writeOptions, key, premerge); } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while updating data to RocksDB", e); } } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11165) Refine the deploying log for easier finding of task locations
[ https://issues.apache.org/jira/browse/FLINK-11165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11165: --- Labels: pull-request-available (was: ) > Refine the deploying log for easier finding of task locations > - > > Key: FLINK-11165 > URL: https://issues.apache.org/jira/browse/FLINK-11165 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Minor > Labels: pull-request-available > > Currently there is not a straight forward way to find in which TM a task > locates in, especially when the task has failed. > We can find on which machine the task locates in by checking the JM log, > sample as below: > {color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color} > But there can be multiple TMs on the machine and we need to check them one by > one. So I'd suggest we *add the full task manager location representation in > the deploying log* to support quick locating tasks. The task manager location > contains resourceId. The resourceId is the containerId when job runs in > yarn/mesos, or a unique AbstractID in standalone mode which can be easily > identified at Flink web UI. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zhuzhurk opened a new pull request #7308: [FLINK-11165] Refine the task deploying log for easier finding of tas…
zhuzhurk opened a new pull request #7308: [FLINK-11165] Refine the task deploying log for easier finding of tas… URL: https://github.com/apache/flink/pull/7308 …k locations ## What is the purpose of the change Currently there is not a straight forward way to find in which TM a task locates in, especially when the task has failed. We can find on which machine the task locates in by checking the JM log, sample as below: "Deploying Flat Map (31/40) (attempt #0) to z05c19399" But there can be multiple TMs on the machine and we need to check them one by one. So I'd suggest we add the full task manager location representation in the deploying log to support quick locating tasks. The task manager location contains resourceId. The resourceId is the containerId when job runs in yarn/mesos, or a unique AbstractID in standalone mode which can be easily identified at Flink web UI. ## Brief change log Changed the task deploying log to print the full task manager location, including the resourceID ## Verifying this change -*This change is a trivial rework / code cleanup without any test coverage.* - *Manually verified by execution IT cases can check the deploying log* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dikei commented on a change in pull request #7078: [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
dikei commented on a change in pull request #7078: [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success URL: https://github.com/apache/flink/pull/7078#discussion_r241702906 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ## @@ -438,6 +438,8 @@ private void containersAllocated(List containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); LOG.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( Review comment: Is this safe to create a new container request object ? Maybe we should call `getMatchingRequests` and remove one of the returned results. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11167) Optimize RocksDBList#put for no empty input
[ https://issues.apache.org/jira/browse/FLINK-11167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Congxian Qiu reassigned FLINK-11167: Assignee: Congxian Qiu > Optimize RocksDBList#put for no empty input > --- > > Key: FLINK-11167 > URL: https://issues.apache.org/jira/browse/FLINK-11167 > Project: Flink > Issue Type: Improvement >Reporter: Congxian Qiu >Assignee: Congxian Qiu >Priority: Major > > In `RocksDBListState.putInternal` we will first remove the current state and > then add the new list if needed(list is no empty) such as below. > I think if the list is not empty, we could {color:#FF}skip the remove > operation{color}. > > {code:java} > public void updateInternal(List values) { >Preconditions.checkNotNull(values, "List of values to add cannot be > null."); >clear(); >if (!values.isEmpty()) { > try { > writeCurrentKeyWithGroupAndNamespace(); > byte[] key = dataOutputView.getCopyOfBuffer(); > byte[] premerge = getPreMergedValue(values, elementSerializer, > dataOutputView); > backend.db.put(columnFamily, writeOptions, key, premerge); > } catch (IOException | RocksDBException e) { > throw new FlinkRuntimeException("Error while updating data to > RocksDB", e); > } >} > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11165) Refine the deploying log for easier finding of task locations
[ https://issues.apache.org/jira/browse/FLINK-11165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhu Zhu updated FLINK-11165: Description: Currently there is not a straight forward way to find in which TM a task locates in, especially when the task has failed. We can find on which machine the task locates in by checking the JM log, sample as below: {color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color} But there can be multiple TMs on the machine and we need to check them one by one. So I'd suggest we *add the full task manager location representation in the deploying log* to support quick locating tasks. The task manager location contains resourceId. The resourceId is the containerId when job runs in yarn/mesos, or a unique AbstractID in standalone mode which can be easily identified at Flink web UI. was: Currently there is not a straight forward way to find in which TM a task locates in, especially when the task has failed. We can find on which machine the task locates in by checking the JM log, sample as below: {color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color} But there can be multiple TMs on the machine and we need to check them one by one. *So I'd suggest we add the TM resourceId in the deploying log to support quick locating tasks.* The TM resourceId is containerId when job runs in yarn/mesos, and a unique AbstractID in standalone mode which can be easily matched at Flink web UI. > Refine the deploying log for easier finding of task locations > - > > Key: FLINK-11165 > URL: https://issues.apache.org/jira/browse/FLINK-11165 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.7.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Minor > > Currently there is not a straight forward way to find in which TM a task > locates in, especially when the task has failed. > We can find on which machine the task locates in by checking the JM log, > sample as below: > {color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color} > But there can be multiple TMs on the machine and we need to check them one by > one. So I'd suggest we *add the full task manager location representation in > the deploying log* to support quick locating tasks. The task manager location > contains resourceId. The resourceId is the containerId when job runs in > yarn/mesos, or a unique AbstractID in standalone mode which can be easily > identified at Flink web UI. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)