[jira] [Comment Edited] (FLINK-30667) remove the planner @internal dependency in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003 ] Chen Qin edited comment on FLINK-30667 at 1/19/23 7:19 AM: --- parser should be PublicEvolving interface while both Flink and hive has own internal implementation. so hive connector maintainer less worry about Flink planner changes PlannerQueryOperation should keep internal in both table-planner as well as hive-connector so hive connector can have full control and evolve without worry how Flink planner PlannerQueryOperation evolve PlannerContext is simple enough util can be PublicEvolving was (Author: foxss): Paper should be PublicEvolving interface while both Flink and hive has own internal implementation. so hive connector maintainer less worry about Flink planner changes PlannerQueryOperation should keep internal in both table-planner as well as hive-connector so hive connector can have full control and evolve without worry how Flink planner PlannerQueryOperation evolve PlannerContext is simple enough util can be PublicEvolving > remove the planner @internal dependency in flink-connector-hive > > > Key: FLINK-30667 > URL: https://issues.apache.org/jira/browse/FLINK-30667 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > There are some classes in flink-connector-hive reply on planner, but > fortunately, not too many. > It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so > on. The dependency is mainly required to create RelNode. > To resolve this problem, we need more abstraction for planner and provides > public API for external dialects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30667) remove the planner @internal dependency in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003 ] Chen Qin edited comment on FLINK-30667 at 1/19/23 7:16 AM: --- Paper should be PublicEvolving interface while both Flink and hive has own internal implementation. so hive connector maintainer less worry about Flink planner changes PlannerQueryOperation should keep internal in both table-planner as well as hive-connector so hive connector can have full control and evolve without worry how Flink planner PlannerQueryOperation evolve PlannerContext is simple enough util can be PublicEvolving was (Author: foxss): ParserImpl and it's interface currently both Internal. Consider HIveParser should not rely on table-planner ParserImpl for shake of future flexibility and hive connector maintenance. I would propose annotate Parser Interface with PublicEvolving; Let HiveParser directly implement Parser Interface to decouple risk might involved with future planner refactor. PlannerQueryOperation should keep internal in both table-planner as well as hive-connector, thanks to interface QueryOperation were PublicEvolving, I would propose setting a foundational FlinkTypeFactory as PublicEvolving as well. PlannerContext could be interface with separate implementations in planner and hive-connector > remove the planner @internal dependency in flink-connector-hive > > > Key: FLINK-30667 > URL: https://issues.apache.org/jira/browse/FLINK-30667 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.17.0 > > > There are some classes in flink-connector-hive reply on planner, but > fortunately, not too many. > It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so > on. The dependency is mainly required to create RelNode. > To resolve this problem, we need more abstraction for planner and provides > public API for external dialects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30667) remove the planner @internal dependency in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-30667: - Summary: remove the planner @internal dependency in flink-connector-hive (was: remove the planner dependency in flink-connector-hive) > remove the planner @internal dependency in flink-connector-hive > > > Key: FLINK-30667 > URL: https://issues.apache.org/jira/browse/FLINK-30667 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Fix For: 1.17.0 > > > There are some classes in flink-connector-hive reply on planner, but > fortunately, not too many. > It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so > on. The dependency is mainly required to create RelNode. > To resolve this problem, we need more abstraction for planner and provides > public API for external dialects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-30667) remove the planner dependency in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003 ] Chen Qin edited comment on FLINK-30667 at 1/18/23 3:58 AM: --- ParserImpl and it's interface currently both Internal. Consider HIveParser should not rely on table-planner ParserImpl for shake of future flexibility and hive connector maintenance. I would propose annotate Parser Interface with PublicEvolving; Let HiveParser directly implement Parser Interface to decouple risk might involved with future planner refactor. PlannerQueryOperation should keep internal in both table-planner as well as hive-connector, thanks to interface QueryOperation were PublicEvolving, I would propose setting a foundational FlinkTypeFactory as PublicEvolving as well. PlannerContext could be interface with separate implementations in planner and hive-connector was (Author: foxss): ParserImpl and it's interface currently both Internal. Consider HIveParser should not rely on table-planner ParserImpl for shake of future flexibility and hive connector maintenance. I would propose annotate Parser Interface with PublicEvolving; Let HiveParser directly implement Parser Interface to decouple risk might involved with future planner refactor. PlannerQueryOperation should keep internal in both table-planner as well as hive-connector, thanks to interface QueryOperation were PublicEvolving, I would propose setting a foundational FlinkTypeFactory as PublicEvolving as well. PlannerContext still needs a bit thoughts. > remove the planner dependency in flink-connector-hive > -- > > Key: FLINK-30667 > URL: https://issues.apache.org/jira/browse/FLINK-30667 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Fix For: 1.17.0 > > > There are some classes in flink-connector-hive reply on planner, but > fortunately, not too many. > It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so > on. The dependency is mainly required to create RelNode. > To resolve this problem, we need more abstraction for planner and provides > public API for external dialects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-30667) remove the planner dependency in flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003 ] Chen Qin commented on FLINK-30667: -- ParserImpl and it's interface currently both Internal. Consider HIveParser should not rely on table-planner ParserImpl for shake of future flexibility and hive connector maintenance. I would propose annotate Parser Interface with PublicEvolving; Let HiveParser directly implement Parser Interface to decouple risk might involved with future planner refactor. PlannerQueryOperation should keep internal in both table-planner as well as hive-connector, thanks to interface QueryOperation were PublicEvolving, I would propose setting a foundational FlinkTypeFactory as PublicEvolving as well. PlannerContext still needs a bit thoughts. > remove the planner dependency in flink-connector-hive > -- > > Key: FLINK-30667 > URL: https://issues.apache.org/jira/browse/FLINK-30667 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Fix For: 1.17.0 > > > There are some classes in flink-connector-hive reply on planner, but > fortunately, not too many. > It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so > on. The dependency is mainly required to create RelNode. > To resolve this problem, we need more abstraction for planner and provides > public API for external dialects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30659) move flink-sql-parser-hive to flink-connector-hive-parent
[ https://issues.apache.org/jira/browse/FLINK-30659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-30659: - Summary: move flink-sql-parser-hive to flink-connector-hive-parent (was: move flink-sql-parser-hive to flink-connector-hive) > move flink-sql-parser-hive to flink-connector-hive-parent > - > > Key: FLINK-30659 > URL: https://issues.apache.org/jira/browse/FLINK-30659 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Fix For: 1.17.0 > > > Hive Parser should stay with hive connector and maintained together. During > runtime, those package should load/unload together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30667) remove the planner dependency in flink-connector-hive
Chen Qin created FLINK-30667: Summary: remove the planner dependency in flink-connector-hive Key: FLINK-30667 URL: https://issues.apache.org/jira/browse/FLINK-30667 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Affects Versions: 1.17.0 Reporter: Chen Qin Fix For: 1.17.0 There are some classes in flink-connector-hive reply on planner, but fortunately, not too many. It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so on. The dependency is mainly required to create RelNode. To resolve this problem, we need more abstraction for planner and provides public API for external dialects. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30664) [Connector/Hive] cleanup hive/haoop package ambiguous package dependencies
Chen Qin created FLINK-30664: Summary: [Connector/Hive] cleanup hive/haoop package ambiguous package dependencies Key: FLINK-30664 URL: https://issues.apache.org/jira/browse/FLINK-30664 Project: Flink Issue Type: Sub-task Components: Connectors / Hive Affects Versions: 1.17.0 Reporter: Chen Qin Fix For: 1.17.0 hive and hive-metastore combination introduced multiple versions of dependency packages, the goal is to ensure hive-connector has deterministic dependency -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30660) move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e
Chen Qin created FLINK-30660: Summary: move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e Key: FLINK-30660 URL: https://issues.apache.org/jira/browse/FLINK-30660 Project: Flink Issue Type: Sub-task Components: Connectors / Hive, Tests Affects Versions: 1.17.0 Reporter: Chen Qin Fix For: 1.17.0 move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e [https://github.com/apache/flink/pull/16532/files#] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30659) move flink-sql-parser-hive to flink-connector-hive
[ https://issues.apache.org/jira/browse/FLINK-30659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-30659: - Summary: move flink-sql-parser-hive to flink-connector-hive (was: move Flink-sql-parser-hive to flink-connector-hive) > move flink-sql-parser-hive to flink-connector-hive > -- > > Key: FLINK-30659 > URL: https://issues.apache.org/jira/browse/FLINK-30659 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Hive, Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Major > Fix For: 1.17.0 > > > Hive Parser should stay with hive connector and maintained together. During > runtime, those package should load/unload together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30658) remove flink-sql-parser-hive dependency in table-planner
[ https://issues.apache.org/jira/browse/FLINK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-30658: - Summary: remove flink-sql-parser-hive dependency in table-planner (was: remove Flink-sql-parser-hive dependency in table-planner) > remove flink-sql-parser-hive dependency in table-planner > > > Key: FLINK-30658 > URL: https://issues.apache.org/jira/browse/FLINK-30658 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Minor > Fix For: 1.17.0 > > > In order to move Flink-sql-parser-hive out of Flink-table, we need to remove > Flink-sql-parser-hive package dependency in Flink-table-planner. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30659) move Flink-sql-parser-hive to flink-connector-hive
Chen Qin created FLINK-30659: Summary: move Flink-sql-parser-hive to flink-connector-hive Key: FLINK-30659 URL: https://issues.apache.org/jira/browse/FLINK-30659 Project: Flink Issue Type: Sub-task Components: Connectors / Hive, Table SQL / Planner Affects Versions: 1.17.0 Reporter: Chen Qin Fix For: 1.17.0 Hive Parser should stay with hive connector and maintained together. During runtime, those package should load/unload together. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-30658) remove Flink-sql-parser-hive dependency in table-planner
[ https://issues.apache.org/jira/browse/FLINK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-30658: - Summary: remove Flink-sql-parser-hive dependency in table-planner (was: remove Flink-sql-parser-hive dependency on table-planner) > remove Flink-sql-parser-hive dependency in table-planner > > > Key: FLINK-30658 > URL: https://issues.apache.org/jira/browse/FLINK-30658 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Affects Versions: 1.17.0 >Reporter: Chen Qin >Priority: Minor > Fix For: 1.17.0 > > > In order to move Flink-sql-parser-hive out of Flink-table, we need to remove > Flink-sql-parser-hive package dependency in Flink-table-planner. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30658) remove Flink-sql-parser-hive dependency on table-planner
Chen Qin created FLINK-30658: Summary: remove Flink-sql-parser-hive dependency on table-planner Key: FLINK-30658 URL: https://issues.apache.org/jira/browse/FLINK-30658 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Affects Versions: 1.17.0 Reporter: Chen Qin Fix For: 1.17.0 In order to move Flink-sql-parser-hive out of Flink-table, we need to remove Flink-sql-parser-hive package dependency in Flink-table-planner. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde
[ https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17645799#comment-17645799 ] Chen Qin commented on FLINK-27640: -- shall we exclude org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde instead, I don't think this package is used. here is how hudi community fix https://github.com/apache/hudi/pull/3034 > Flink not compiling, flink-connector-hive_2.12 is missing jhyde > pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde > -- > > Key: FLINK-27640 > URL: https://issues.apache.org/jira/browse/FLINK-27640 > Project: Flink > Issue Type: Bug > Components: Build System, Connectors / Hive >Affects Versions: 1.16.0 >Reporter: Piotr Nowojski >Assignee: Chesnay Schepler >Priority: Critical > Labels: pull-request-available, stale-assigned > > When clean installing whole project after cleaning local {{.m2}} directory I > encountered the following error when compiling flink-connector-hive_2.12: > {noformat} > [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could > not resolve dependencies for project > org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to > collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> > org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read > artifact descriptor for > org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer > artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to > maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for > repositories: [conjars (http://conjars.org/repo, default, > releases+snapshots), apache.snapshots > (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1] > {noformat} > I've solved this by adding > {noformat} > > spring-repo-plugins > https://repo.spring.io/ui/native/plugins-release/ > > {noformat} > to ~/.m2/settings.xml file. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30362) Flink-connector-hive can't build with maven 3.8
Chen Qin created FLINK-30362: Summary: Flink-connector-hive can't build with maven 3.8 Key: FLINK-30362 URL: https://issues.apache.org/jira/browse/FLINK-30362 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.15.3, 1.16.0, 1.17.0 Environment: install maven 3.8.1+ git clone flink repo run mvn clean pcakge Reporter: Chen Qin Fix For: 1.17.0, 1.16.1, 1.15.4 Flink connector hive pull in hive-exec which depends on org.pentaho:pentaho-aggdesigner-algorithm in blocked jboss mirror. This is cve related issue which blockeds upgrade to maven 3.8.1+ [https://maven.apache.org/docs/3.8.1/release-notes.html#cve-2021-26291] {code:java} [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.12:jar:1.17-SNAPSHOT: Failed to collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: [repository.jboss.org (http://repository.jboss.org/nexus/content/groups/public/, default, disabled), conjars (http://conjars.org/repo, default, releases+snapshots), apache.snapshots (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1]{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-27726) shade thrift and fb303 in hive connector
[ https://issues.apache.org/jira/browse/FLINK-27726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-27726: - Summary: shade thrift and fb303 in hive connector (was: shad thrift and fb303 in hive connector) > shade thrift and fb303 in hive connector > > > Key: FLINK-27726 > URL: https://issues.apache.org/jira/browse/FLINK-27726 > Project: Flink > Issue Type: Improvement > Components: Connectors / Hive >Affects Versions: 1.12.3, 1.13.1, 1.12.4, 1.12.5, 1.13.2, 1.13.3, 1.15.0, > 1.11.6, 1.12.7, 1.13.5, 1.14.2, 1.13.6, 1.14.3, 1.14.4 >Reporter: Chen Qin >Priority: Minor > > Hive connector introduced fb303 and thrift version to connect to specific > hive meta store version. If user code also pull specific thrift version along > with fb303 that is not same as hive connector introduced, user code will not > able to connect to hive meta store. > > This fix has been verified in production environment as part of support > thrift encoded FlinkSQL for more than 6 months. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (FLINK-27726) shad thrift and fb303 in hive connector
Chen Qin created FLINK-27726: Summary: shad thrift and fb303 in hive connector Key: FLINK-27726 URL: https://issues.apache.org/jira/browse/FLINK-27726 Project: Flink Issue Type: Improvement Components: Connectors / Hive Affects Versions: 1.14.4, 1.14.3, 1.13.6, 1.14.2, 1.13.5, 1.12.7, 1.11.6, 1.15.0, 1.13.3, 1.13.2, 1.12.5, 1.12.4, 1.13.1, 1.12.3 Reporter: Chen Qin Hive connector introduced fb303 and thrift version to connect to specific hive meta store version. If user code also pull specific thrift version along with fb303 that is not same as hive connector introduced, user code will not able to connect to hive meta store. This fix has been verified in production environment as part of support thrift encoded FlinkSQL for more than 6 months. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1733#comment-1733 ] Chen Qin commented on FLINK-10052: -- Set timeline on leader change to new address null exception. In this time, when curator signaled zk suspened state, other code path deregister task executor in other instance resulting restart. Basically, when suspended message land to container 1, container 2 react with TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) and exception out. While it all point to supspended message handling, this part doesn't seems directly touch changed code path. Here is timeline of warn/exceptions on container_e26_1617655625710_9571_01_17 2021-04-23T13:57:37.290 - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. 2021-04-23T13:57:37.304 - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper. on container_e26_1617655625710_9571_01_01 2021-04-23T13:57:37.333 - USER_EVENTS.spo_derived_event.SINK-stream_joiner -> USER_EVENTS.spo_derived_event.SINK-late-event-tracker (32/270) (c60dc612ec4d703d1bff646c3442193a) switched from RUNNING to FAILED on container_e26_1617655625710_9571_01_17 @ xenon-pii-dev-001-20191210-data-slave-dev-0a01fa8b.ec2.pin220.com (dataPort=45229). org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) on container_e26_1617655625710_9571_01_17 2021-04-23T13:57:38.465 - Connection to ZooKeeper lost. Can no longer retrieve the leader from ZooKeeper. 2021-04-23T13:57:38.496 - Unable to reconnect to ZooKeeper service, session 0x1050b21fe3006a6 has expired > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1 >Reporter: Till Rohrmann >Assignee: Zili Chen >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > Time Spent: 50m > Remaining Estimate: 0h > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330890#comment-17330890 ] Chen Qin commented on FLINK-10052: -- here is another exception we observed in another job after apply this pr {code:java} 2021-04-23 11:09:03,388 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e26_1617655625710_8692_01_000115 because: ResourceManager leader changed to new address null 2021-04-23 11:09:03,391 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) (bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on container_e26_1617655625710_8692_01_000115 @ .ec2.pin220.com (dataPort=46719). org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:539) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) at akka.actor.ActorCell.invoke(ActorCell.scala:581) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) at akka.dispatch.Mailbox.run(Mailbox.scala:229) at akka.dispatch.Mailbox.exec(Mailbox.scala:241) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1 >Reporter: Till Rohrmann >Assignee: Zili Chen >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > Time Spent: 50m > Remaining Estimate: 0h > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330890#comment-17330890 ] Chen Qin edited comment on FLINK-10052 at 4/23/21, 4:13 PM: here is another exception we observed in another job, may or may not caused by this pr. {code:java} 2021-04-23 11:09:03,388 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e26_1617655625710_8692_01_000115 because: ResourceManager leader changed to new address null 2021-04-23 11:09:03,391 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) (bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on container_e26_1617655625710_8692_01_000115 @ .ec2.pin220.com (dataPort=46719). org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:539) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) at akka.actor.ActorCell.invoke(ActorCell.scala:581) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) at akka.dispatch.Mailbox.run(Mailbox.scala:229) at akka.dispatch.Mailbox.exec(Mailbox.scala:241) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} was (Author: foxss): here is another exception we observed in another job after apply this pr {code:java} 2021-04-23 11:09:03,388 INFO org.apache.flink.yarn.YarnResourceManager - Closing TaskExecutor connection container_e26_1617655625710_8692_01_000115 because: ResourceManager leader changed to new address null 2021-04-23 11:09:03,391 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) (bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on container_e26_1617655625710_8692_01_000115 @ .ec2.pin220.com (dataPort=46719). org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
[jira] [Comment Edited] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330858#comment-17330858 ] Chen Qin edited comment on FLINK-10052 at 4/23/21, 4:04 PM: run load testing on pr, seems suspended message no longer trigger leadership lost and job restart. At same time, found following exception when job restarts caused by other user jar issue. {code:java} 2021-04-21 18:24:44,639 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error occurred in TaskExecutor akka.tcp://flink@xxx:33435/user/rpc/taskmanager_0. org.apache.flink.util.FlinkException: Unhandled error in ZooKeeperLeaderRetrievalService:Background operation retry gave up at org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.unhandledError(ZooKeeperLeaderRetrievalService.java:208) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100) at org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:874) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:990) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:943) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:66) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:346) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:862) ... 10 more {code} was (Author: foxss): run load testing on pr, found following exception when job restarts. {code:java} 2021-04-21 18:24:44,639 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error occurred in TaskExecutor akka.tcp://flink@xxx:33435/user/rpc/taskmanager_0. org.apache.flink.util.FlinkException: Unhandled error in ZooKeeperLeaderRetrievalService:Background operation retry gave up at org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.unhandledError(ZooKeeperLeaderRetrievalService.java:208) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100) at org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708) at
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330858#comment-17330858 ] Chen Qin commented on FLINK-10052: -- run load testing on pr, found following exception when job restarts. {code:java} 2021-04-21 18:24:44,639 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error occurred in TaskExecutor akka.tcp://flink@xxx:33435/user/rpc/taskmanager_0. org.apache.flink.util.FlinkException: Unhandled error in ZooKeeperLeaderRetrievalService:Background operation retry gave up at org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.unhandledError(ZooKeeperLeaderRetrievalService.java:208) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100) at org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30) at org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:874) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:990) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:943) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:66) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:346) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:862) ... 10 more {code} > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1 >Reporter: Till Rohrmann >Assignee: Zili Chen >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > Time Spent: 50m > Remaining Estimate: 0h > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6113) Implement split/select with Side Outputs
[ https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321877#comment-17321877 ] Chen Qin commented on FLINK-6113: - Seems community already get rid of split and select transformation in master branch. So this Jira seems no longer make sense. attaching patch for curious minds. [^split_select.patch] > Implement split/select with Side Outputs > > > Key: FLINK-6113 > URL: https://issues.apache.org/jira/browse/FLINK-6113 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Chen Qin >Priority: Minor > Labels: stale-minor > Attachments: split_select.patch > > > With completion of FLINK-4460(side outputs), this is one of follow up item > towards deprecate string tag based split/select with OutputTag based > split/select. > In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-6113) Implement split/select with Side Outputs
[ https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin closed FLINK-6113. --- Resolution: Fixed > Implement split/select with Side Outputs > > > Key: FLINK-6113 > URL: https://issues.apache.org/jira/browse/FLINK-6113 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Chen Qin >Priority: Minor > Labels: stale-minor > Attachments: split_select.patch > > > With completion of FLINK-4460(side outputs), this is one of follow up item > towards deprecate string tag based split/select with OutputTag based > split/select. > In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-6113) Implement split/select with Side Outputs
[ https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-6113: Attachment: split_select.patch > Implement split/select with Side Outputs > > > Key: FLINK-6113 > URL: https://issues.apache.org/jira/browse/FLINK-6113 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.3.0 >Reporter: Chen Qin >Priority: Minor > Labels: stale-minor > Attachments: split_select.patch > > > With completion of FLINK-4460(side outputs), this is one of follow up item > towards deprecate string tag based split/select with OutputTag based > split/select. > In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312867#comment-17312867 ] Chen Qin commented on FLINK-22081: -- [~AHeise] could you assign this Jira to me and help review pr? > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > Attachments: image (13).png > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a plugin. > > Repro steps: > Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection > key _entropy_ > observe checkpoint dir with entropy marker not removed. > s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ > compare to removed when running Flink 1.9.1 > s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ > Add some logging to getEntropyFs, observe it return null because passed in > parameter is not {{SafetyNetWrapperFileSystem}} but > {{ClassLoaderFixingFileSystem}} > Apply patch, build release and run same job, resolved issue as attachment > shows > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-22081: - Description: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Repro steps: Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection key _entropy_ observe checkpoint dir with entropy marker not removed. s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ compare to removed when running Flink 1.9.1 s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ Add some logging to getEntropyFs, observe it return null because passed in parameter is not {{SafetyNetWrapperFileSystem}} but {{ClassLoaderFixingFileSystem}} Apply patch, build release and run same job, resolved issue as attachment shows was: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Repro steps: Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection key _entropy_ observe checkpoint dir with entropy marker not removed. s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ compare to removed when running Flink 1.9.1 s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ Add some logging to getEntropyFs, observe it return null because passed in parameter is not {{SafetyNetWrapperFileSystem}} but {{ClassLoaderFixingFileSystem}} Apply patch, build release and run same job, resolved issue as attachment shows > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > Attachments: image (13).png > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a
[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-22081: - Description: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Repro steps: Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection key _entropy_ observe checkpoint dir with entropy marker not removed. s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ compare to removed when running Flink 1.9.1 s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ Add some logging to getEntropyFs, observe it return null because passed in parameter is not {{SafetyNetWrapperFileSystem}} but {{ClassLoaderFixingFileSystem}} Apply patch, build release and run same job, resolved issue as attachment shows was: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Repro steps: Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection key _entropy_ observe checkpoint dir with entropy marker not removed. s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ compare to removed when running Flink 1.9.1 s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ Add some logging to getEntropyFs, observe it return null because passed in parameter is not {{SafetyNetWrapperFileSystem}} but {{ClassLoaderFixingFileSystem}} Apply patch, build release and run same job > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > Attachments: image (13).png > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a plugin. > > Repro steps:
[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-22081: - Attachment: image (13).png > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > Attachments: image (13).png > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a plugin. > > Repro steps: > Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection > key _entropy_ > observe checkpoint dir with entropy marker not removed. > s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ > compare to removed when running Flink 1.9.1 > s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ > > Add some logging to getEntropyFs, observe it return null because passed in > parameter is not {{SafetyNetWrapperFileSystem}} but > {{ClassLoaderFixingFileSystem}} > > Apply patch, build release and run same job > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-22081: - Description: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Repro steps: Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection key _entropy_ observe checkpoint dir with entropy marker not removed. s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ compare to removed when running Flink 1.9.1 s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ Add some logging to getEntropyFs, observe it return null because passed in parameter is not {{SafetyNetWrapperFileSystem}} but {{ClassLoaderFixingFileSystem}} Apply patch, build release and run same job was: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Current behavior when using hadoop-s3 s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/6da165c7b3c8422125abbfdb97ca9c04/chk-5/ > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > Attachments: image (13).png > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a plugin. > > Repro steps: > Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection > key _entropy_ > observe checkpoint dir with entropy marker not removed. > s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/ > compare to removed when running Flink 1.9.1 > s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/ > > Add some logging to getEntropyFs, observe it return null because passed in > parameter is not
[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-22081: - Description: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. Current behavior when using hadoop-s3 s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/6da165c7b3c8422125abbfdb97ca9c04/chk-5/ was: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a plugin. > > Current behavior when using hadoop-s3 > s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/6da165c7b3c8422125abbfdb97ca9c04/chk-5/ > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-22081: - Fix Version/s: 1.10.2 1.10.3 1.11.1 1.11.2 1.11.3 1.12.0 1.12.1 1.12.2 1.12.3 1.13.0 1.11.4 1.10.4 Description: Using flink 1.11.2 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} directly in getEntorpyFs method which would be the type if S3 file system dependencies are added as a plugin. was: Using flink 1.10 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} and if so we check if the delegate is of type {{EntropyInjectingFileSystem}} but don't check for {{[ClassLoaderFixingFileSystem |https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} which would be the type if S3 file system dependencies are added as a plugin. Priority: Minor (was: Major) > Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin > --- > > Key: FLINK-22081 > URL: https://issues.apache.org/jira/browse/FLINK-22081 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Chen Qin >Assignee: Prem Santosh >Priority: Minor > Labels: pull-request-available > Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, > 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3 > > > Using flink 1.11.2 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} > and if so we check if the filesysystem is of type > {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for > {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > directly in getEntorpyFs method which would be the type if S3 file system > dependencies are added as a plugin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
Chen Qin created FLINK-22081: Summary: Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin Key: FLINK-22081 URL: https://issues.apache.org/jira/browse/FLINK-22081 Project: Flink Issue Type: Bug Components: FileSystems Reporter: Chen Qin Assignee: Prem Santosh Fix For: 1.10.1, 1.11.0 Using flink 1.10 I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the checkpoints paths like {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} which means the entropy injection key is not being resolved. After some debugging I found that in the [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} and if so we check if the delegate is of type {{EntropyInjectingFileSystem}} but don't check for {{[ClassLoaderFixingFileSystem |https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} which would be the type if S3 file system dependencies are added as a plugin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17359) Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-17359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311956#comment-17311956 ] Chen Qin commented on FLINK-17359: -- [https://github.com/apache/flink/pull/15442] > Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin > -- > > Key: FLINK-17359 > URL: https://issues.apache.org/jira/browse/FLINK-17359 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Prem Santosh >Assignee: Prem Santosh >Priority: Major > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > > Using flink 1.10 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} > and if so we check if the delegate is of type {{EntropyInjectingFileSystem}} > but don't check for {{[ClassLoaderFixingFileSystem > |https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > which would be the type if S3 file system dependencies are added as a plugin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-17359) Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin
[ https://issues.apache.org/jira/browse/FLINK-17359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307991#comment-17307991 ] Chen Qin commented on FLINK-17359: -- Hi [~premsantosh], We see a regression of this bug with 1.11.2. detail[ http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-job-cannot-find-recover-path-after-using-entropy-injection-for-s3-file-systems-tp49527p49656.html|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-job-cannot-find-recover-path-after-using-entropy-injection-for-s3-file-systems-tp49527p49656.html] do you have cycle to review a small fix pr? > Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin > -- > > Key: FLINK-17359 > URL: https://issues.apache.org/jira/browse/FLINK-17359 > Project: Flink > Issue Type: Bug > Components: FileSystems >Reporter: Prem Santosh >Assignee: Prem Santosh >Priority: Major > Labels: pull-request-available > Fix For: 1.10.1, 1.11.0 > > > Using flink 1.10 > I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the > checkpoints paths like > {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}} > which means the entropy injection key is not being resolved. After some > debugging I found that in the > [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97] > we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} > and if so we check if the delegate is of type {{EntropyInjectingFileSystem}} > but don't check for {{[ClassLoaderFixingFileSystem > |https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}} > which would be the type if S3 file system dependencies are added as a plugin. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-18158) Add a utility to create a DDL statement from avro schema
[ https://issues.apache.org/jira/browse/FLINK-18158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162956#comment-17162956 ] Chen Qin commented on FLINK-18158: -- [~twalthr]what if user have nested struct definition in protobuf/thrift schema? struct { map>> property. } > Add a utility to create a DDL statement from avro schema > > > Key: FLINK-18158 > URL: https://issues.apache.org/jira/browse/FLINK-18158 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: Dawid Wysakowicz >Priority: Major > > User asked if there is a way to create a TableSchema/Table originating from > avro schema. > https://lists.apache.org/thread.html/r9bd43449314230fad0b627a170db05284c9727371092fc275fc05b74%40%3Cuser.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin closed FLINK-16392. Resolution: Feedback Received > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left > and right buffer. This design choice reduce minimize heap memory footprint > while bounded process throughput of single taskmanager iops to rocksdb access > speed. Here at Pinterest, we have some large use cases where developers join > large and slow evolving data stream (e.g post updates in last 28 days) with > web traffic datastream (e.g post views up to 28 days after given update). > This post some challenge to current implementation of intervaljoin > * partitioned rocksdb needs to keep both updates and views for 28 days, > large buffer(especially view stream side) cause rocksdb slow down and lead to > overall interval join performance degregate quickly as state build up. > * view stream is web scale, even after setting large parallelism it can put > lot of pressure on each subtask and backpressure entire job > In proposed implementation, we plan to introduce two changes > * support ProcessJoinFunction settings to opt-in earlier cleanup time of > right stream(e.g view stream don't have to stay in buffer for 28 days and > wait for update stream to join, related post views happens after update in > event time semantic) This optimization can reduce state size to improve > rocksdb throughput. If extreme case, user can opt-in in flight join and skip > write into right view stream buffer to save iops budget on each subtask > * support ProcessJoinFunction settings to expedite keyed lookup of slow > changing stream. Instead of every post view pull post updates from rocksdb. > user can opt-in and having one side buffer cache available in memory. If a > given post update, cache load recent views from right buffer and use > sortedMap to find buckets. If a given post view, cache load recent updates > from left buffer to memory. When another view for that post arrives, flink > save cost of rocksdb access. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-16392: - Description: Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left and right buffer. This design choice reduce minimize heap memory footprint while bounded process throughput of single taskmanager iops to rocksdb access speed. Here at Pinterest, we have some large use cases where developers join large and slow evolving data stream (e.g post updates in last 28 days) with web traffic datastream (e.g post views up to 28 days after given update). This post some challenge to current implementation of intervaljoin * partitioned rocksdb needs to keep both updates and views for 28 days, large buffer(especially view stream side) cause rocksdb slow down and lead to overall interval join performance degregate quickly as state build up. * view stream is web scale, even after setting large parallelism it can put lot of pressure on each subtask and backpressure entire job In proposed implementation, we plan to introduce two changes * support ProcessJoinFunction settings to opt-in earlier cleanup time of right stream(e.g view stream don't have to stay in buffer for 28 days and wait for update stream to join, related post views happens after update in event time semantic) This optimization can reduce state size to improve rocksdb throughput. If extreme case, user can opt-in in flight join and skip write into right view stream buffer to save iops budget on each subtask * support ProcessJoinFunction settings to expedite keyed lookup of slow changing stream. Instead of every post view pull post updates from rocksdb. user can opt-in and having one side buffer cache available in memory. If a given post update, cache load recent views from right buffer and use sortedMap to find buckets. If a given post view, cache load recent updates from left buffer to memory. When another view for that post arrives, flink save cost of rocksdb access. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from low QPS source * from right stream to left stream lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in more aggresive right buffer cleanup ** allow overwrite earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left > and right buffer. This design choice reduce minimize heap memory footprint > while bounded process throughput of single taskmanager iops to rocksdb access > speed. Here at Pinterest, we have some large use cases where developers join > large and slow evolving data stream (e.g post updates in last 28 days) with
[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-16392: - Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from low QPS source * from right stream to left stream lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in more aggresive right buffer cleanup ** allow overwrite earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by customize and inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following similar pattern > * left stream pulled from low QPS source > * from right stream to left stream lookup
[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-16392: - Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by customize and inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ssd and lot more to spin disk, this is a small overhead (1% - 5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following
[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-16392: - Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ssd and lot more to spin disk, this is a small overhead (1% - 5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following similar pattern >
[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-16392: - Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in use cases like above by inherit from ProcessJoinFunction. ** whether skip trigger scan from left events(static data set) ** allow set earlier clean up right stream earlier than interval upper-bound * leverage ram cache on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** by default cache size is set to 1 key * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. * how is performance ** Given assumption ram is magnitude faster than ram, this is a small overhead (<5%) to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. If key recurring more than 1 access in same direction on cache, we expect significant perf gain. was: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in by inherit from ProcessJoinFunction. ** if they want to skip trigger scan from left events(static data set) * on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** TBD * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following similar pattern > * left stream pulled from slow evolving static dataset periodically > * lookup time range is very large (days weeks) > * right stream is web traffic with high QPS > In current interval join implementation, we treat both streams equal and > ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb > fetch and update gets more
[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin
[ https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-16392: - Description: IntervalJoin is getting lots of usecases in our side. Those use cases shares following similar pattern * left stream pulled from slow evolving static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal and ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb fetch and update gets more expensive, performance took hit to unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in by inherit from ProcessJoinFunction. ** if they want to skip trigger scan from left events(static data set) * on demand build sortedMap from it's otherBuffer for each join key, in our use cases, it helps ** expedite right stream lookup of left buffers without access rocksdb everytime (disk -> sorted memory cache) ** if a key see event from left side, it cleanup cache and load cache from right side ** in worst case scenario, we only see two stream with round robin processElement1 and processElement2 of same set of keys at same frequency. Performance is expected to be similar with current implementation, memory footprint will be bounded by 1/2 state size. Open discussion * how to control cache size? ** TBD * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. was: IntervalJoin is getting lots of usecases. Those use cases shares following similar pattern * left stream pulled from static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal. Specifically as rocksdb fetch and update getting more expensive, performance took hit and unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in in ProcessJoinFunction if they want to skip scan when intervaljoin operator receive events from left stream(static data set) * build sortedMap from otherBuffer of each seen key granularity ** expedite right stream lookup of left buffers without access rocksdb everytime ** if a key see event from left side, it cleanup buffer and load buffer from right side Open discussion * how to control cache size? ** TBD * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. > oneside sorted cache in intervaljoin > > > Key: FLINK-16392 > URL: https://issues.apache.org/jira/browse/FLINK-16392 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 1.10.0 >Reporter: Chen Qin >Priority: Major > Labels: pull-request-available > Fix For: 1.11.0 > > Time Spent: 10m > Remaining Estimate: 0h > > IntervalJoin is getting lots of usecases in our side. Those use cases shares > following similar pattern > * left stream pulled from slow evolving static dataset periodically > * lookup time range is very large (days weeks) > * right stream is web traffic with high QPS > In current interval join implementation, we treat both streams equal and > ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb > fetch and update gets more expensive, performance took hit to unblock large > use cases. > In proposed implementation, we plan to introduce two changes > * allow user opt-in by inherit from ProcessJoinFunction. > ** if they want to skip trigger scan from left events(static data set) > * on demand build sortedMap from it's otherBuffer for each join key, in our > use cases, it helps > ** expedite right stream lookup of left buffers without access rocksdb > everytime (disk -> sorted memory cache) > ** if a key see event from left side, it cleanup cache and load cache from > right side > ** in worst case scenario, we only see two stream with round robin > processElement1 and processElement2 of same set of keys at same
[jira] [Created] (FLINK-16392) oneside sorted cache in intervaljoin
Chen Qin created FLINK-16392: Summary: oneside sorted cache in intervaljoin Key: FLINK-16392 URL: https://issues.apache.org/jira/browse/FLINK-16392 Project: Flink Issue Type: Improvement Components: API / DataStream Affects Versions: 1.10.0 Reporter: Chen Qin Fix For: 1.11.0 IntervalJoin is getting lots of usecases. Those use cases shares following similar pattern * left stream pulled from static dataset periodically * lookup time range is very large (days weeks) * right stream is web traffic with high QPS In current interval join implementation, we treat both streams equal. Specifically as rocksdb fetch and update getting more expensive, performance took hit and unblock large use cases. In proposed implementation, we plan to introduce two changes * allow user opt-in in ProcessJoinFunction if they want to skip scan when intervaljoin operator receive events from left stream(static data set) * build sortedMap from otherBuffer of each seen key granularity ** expedite right stream lookup of left buffers without access rocksdb everytime ** if a key see event from left side, it cleanup buffer and load buffer from right side Open discussion * how to control cache size? ** TBD * how to avoid dirty cache ** if a given key see insertion from other side, cache will be cleared for that key and rebuild. This is a small overhead to populate cache, compare with current rocksdb implemenation, we need do full loop at every event. It saves on bucket scan logic. * what happens when checkpoint/restore ** state still persists in statebackend, clear cache and rebuild of each new key seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-8437) SideOutput() API is ambiguous
[ https://issues.apache.org/jira/browse/FLINK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin reassigned FLINK-8437: --- Assignee: Chen Qin > SideOutput() API is ambiguous > - > > Key: FLINK-8437 > URL: https://issues.apache.org/jira/browse/FLINK-8437 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chen Qin >Priority: Minor > > The current API for retrieving Side Outputs is a bit ambiguous. Consider the > program below: > {code:java} > sideOutput = stream > .process(...) > .filter(...) > .getSideOutput(tag) > {code} > This may be the sideOutput of the process function that is passed through the > API for convenience, or the sideOutput of the filter function (which would > always be empty). > Given that only process functions can have sideOutputs we may want to change > the return type so that getSideOutput can only be called after a process > function. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7954) sideoutput in async function
[ https://issues.apache.org/jira/browse/FLINK-7954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260265#comment-16260265 ] Chen Qin commented on FLINK-7954: - [~phoenixjiangnan] Thanks for quick response. I think we can always starts with RichAsyncFunction which user expect to more control when they build own logic on top. I will leave [~aljoscha] for additional comments since I haven't look into detail how asyncIO implemented. My concern is if sideoutput should queued along with with main output when user call orderedwait. > sideoutput in async function > > > Key: FLINK-7954 > URL: https://issues.apache.org/jira/browse/FLINK-7954 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.3.2 > Environment: similar to FLINK-7635,adding support of sideoutput to > asynFunction >Reporter: Chen Qin >Assignee: Bowen Li > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7954) sideoutput in async function
Chen Qin created FLINK-7954: --- Summary: sideoutput in async function Key: FLINK-7954 URL: https://issues.apache.org/jira/browse/FLINK-7954 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.3.2 Environment: similar to FLINK-7635,adding support of sideoutput to asynFunction Reporter: Chen Qin Priority: Minor -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7635) support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
[ https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170922#comment-16170922 ] Chen Qin commented on FLINK-7635: - [~phoenixjiangnan] there are good reference in PorcessFunction.Context especially in unit tests and comments. Also might consider how to surface OutputTag error message to user where late arriving events conflict with ones from ProcessWindowFunction possible. > support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction > -- > > Key: FLINK-7635 > URL: https://issues.apache.org/jira/browse/FLINK-7635 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Scala API >Reporter: Chen Qin >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0, 1.3.3 > > > [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only > implemented output to ProcessFunction Context. It would be nice to add > support to ProcessWindow and ProcessAllWindow functions as well. [email > threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] > [~aljoscha] I thought this is good warm up task for ppl to learn how window > function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7635) support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
Chen Qin created FLINK-7635: --- Summary: support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction Key: FLINK-7635 URL: https://issues.apache.org/jira/browse/FLINK-7635 Project: Flink Issue Type: Improvement Components: DataStream API, Scala API Reporter: Chen Qin Priority: Minor Fix For: 1.4.0, 1.3.3 [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only implemented output to ProcessFunction Context. It would be nice to add support to ProcessWindow and ProcessAllWindow functions as well. [email threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html] [~aljoscha] I thought this is good warm up task for ppl to learn how window function works in general. Otherwise feel free to assign back to me. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7106) Make SubmittedJobGraphStore implementation configurable
[ https://issues.apache.org/jira/browse/FLINK-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin reassigned FLINK-7106: --- Assignee: Chen Qin > Make SubmittedJobGraphStore implementation configurable > --- > > Key: FLINK-7106 > URL: https://issues.apache.org/jira/browse/FLINK-7106 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Local Runtime >Reporter: Chen Qin >Assignee: Chen Qin > > Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user > choose HAMode. The goal of this task is to allow user build their own > implementation and pass configuration from flink.conf and define how/where > those information stores. (e.g rocksdb statebackend) > P.S I think this would be interesting to see how flink in HA mode can > fallback to Standalone when zk suffers temp outages. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7106) Make SubmittedJobGraphStore implementation configurable
[ https://issues.apache.org/jira/browse/FLINK-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-7106: Summary: Make SubmittedJobGraphStore implementation configurable (was: Make SubmittedJobGraphStore configurable) > Make SubmittedJobGraphStore implementation configurable > --- > > Key: FLINK-7106 > URL: https://issues.apache.org/jira/browse/FLINK-7106 > Project: Flink > Issue Type: Improvement > Components: flink-contrib, Local Runtime >Reporter: Chen Qin > > Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user > choose HAMode. The goal of this task is to allow user build their own > implementation and pass configuration from flink.conf and define how/where > those information stores. (e.g rocksdb statebackend) > P.S I think this would be interesting to see how flink in HA mode can > fallback to Standalone when zk suffers temp outages. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7106) Make SubmittedJobGraphStore configurable
Chen Qin created FLINK-7106: --- Summary: Make SubmittedJobGraphStore configurable Key: FLINK-7106 URL: https://issues.apache.org/jira/browse/FLINK-7106 Project: Flink Issue Type: Improvement Components: flink-contrib, Local Runtime Reporter: Chen Qin Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user choose HAMode. The goal of this task is to allow user build their own implementation and pass configuration from flink.conf and define how/where those information stores. (e.g rocksdb statebackend) P.S I think this would be interesting to see how flink in HA mode can fallback to Standalone when zk suffers temp outages. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-6085) flink as micro service
[ https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945991#comment-15945991 ] Chen Qin edited comment on FLINK-6085 at 3/29/17 3:04 AM: -- To unlock scenarios like blocking rpc call or async callback, I am currently thinking of way to connecting web front directly to pipeline in normal case. Use some kind of durable buffer to store requests since last successful checkpoint against failure scenario. I think what we can do at this point is to assume client will do retry after connection failure and flink as a micro service maintain at least once semantics. So the problem simplified to implement a web front source and feedback loop from sink to source & locate pending connection to response. What do you think [~till.rohrmann] [~tudandan] was (Author: foxss): To unlock scenarios like blocking rpc call or async callback, I am currently thinking of way to get rid of using distributed queue by connecting web front directly to pipeline. I put a bit more thoughts on this topic, exact once seems really hard to achieve through rpc source. Same issue as using web front ingestion to distributed queue at a matter of fact. Clients can do arbitrary retry within long time span. I think what we can do at this point is to assume client will do retry after connection failure and flink as a micro service maintain at least once semantics. So the problem simplified to implement a web front source and feedback loop from sink to source & locate pending connection to response. What do you think [~till.rohrmann] [~tudandan] > flink as micro service > -- > > Key: FLINK-6085 > URL: https://issues.apache.org/jira/browse/FLINK-6085 > Project: Flink > Issue Type: Improvement > Components: DataStream API, JobManager >Reporter: Chen Qin >Priority: Minor > Attachments: Untitled document.jpg > > > Track discussion around run flink as a micro service, includes but not > limited to > - RPC (web service endpoint) source > as web service endpoint accept RPC call, ingest to the streaming job(only > one) > - callback mechanism > - task assignment should honor deployment group (web tier hosts should be > isolated from rest of task assignment) > https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6085) flink as micro service
[ https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945991#comment-15945991 ] Chen Qin commented on FLINK-6085: - To unlock scenarios like blocking rpc call or async callback, I am currently thinking of way to get rid of using distributed queue by connecting web front directly to pipeline. I put a bit more thoughts on this topic, exact once seems really hard to achieve through rpc source. Same issue as using web front ingestion to distributed queue at a matter of fact. Clients can do arbitrary retry within long time span. I think what we can do at this point is to assume client will do retry after connection failure and flink as a micro service maintain at least once semantics. So the problem simplified to implement a web front source and feedback loop from sink to source & locate pending connection to response. What do you think [~till.rohrmann] [~tudandan] > flink as micro service > -- > > Key: FLINK-6085 > URL: https://issues.apache.org/jira/browse/FLINK-6085 > Project: Flink > Issue Type: Improvement > Components: DataStream API, JobManager >Reporter: Chen Qin >Priority: Minor > Attachments: Untitled document.jpg > > > Track discussion around run flink as a micro service, includes but not > limited to > - RPC (web service endpoint) source > as web service endpoint accept RPC call, ingest to the streaming job(only > one) > - callback mechanism > - task assignment should honor deployment group (web tier hosts should be > isolated from rest of task assignment) > https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6085) flink as micro service
[ https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939209#comment-15939209 ] Chen Qin edited comment on FLINK-6085 at 3/24/17 8:32 PM: -- I would like to see if we can agree on high level first. Service is primarily rpc interaction with horizontal scalability and latency requirements. Current way of bridge service with streaming pipeline via distributed Queue provides benefit of failure resilience and topic reuse at cost of extra hardware/software and latency, also no callback support. [~till.rohrmann] updates Briefly chatted offline with Maxim, it seems a bit hard to work around distributed queue consider pipeline can restart and offset rewind anytime, loss of insertion events is not acceptable(query might be fine but seems flink already address this issue with query able states) To echo Till's comments, yes, custom code could track those requests. Future question is if we can have a specific sink implementation which can reroute results to a specific rpc hosts (e.g http response or callback). was (Author: foxss): I would like to see if we can agree on high level first. Service is primarily rpc interaction with horizontal scalability and latency requirements. Current way of bridge service with streaming pipeline via distributed Queue provides benefit of failure resilience and topic reuse at cost of extra hardware/software and latency, also no callback support. [~till.rohrmann] > flink as micro service > -- > > Key: FLINK-6085 > URL: https://issues.apache.org/jira/browse/FLINK-6085 > Project: Flink > Issue Type: Improvement > Components: DataStream API, JobManager >Reporter: Chen Qin >Priority: Minor > Attachments: Untitled document.jpg > > > Track discussion around run flink as a micro service, includes but not > limited to > - RPC (web service endpoint) source > as web service endpoint accept RPC call, ingest to the streaming job(only > one) > - callback mechanism > - task assignment should honor deployment group (web tier hosts should be > isolated from rest of task assignment) > https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6085) flink as micro service
[ https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-6085: Description: Track discussion around run flink as a micro service, includes but not limited to - RPC (web service endpoint) source as web service endpoint accept RPC call, ingest to the streaming job(only one) - callback mechanism - task assignment should honor deployment group (web tier hosts should be isolated from rest of task assignment) https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing was: Track discussion around run flink as a micro service, includes but not limited to - RPC (web service endpoint) source as web service endpoint accept RPC call, ingest to the streaming job(only one) - callback mechanism - task assignment should honor deployment group (web tier hosts should be isolated from rest of task assignment) > flink as micro service > -- > > Key: FLINK-6085 > URL: https://issues.apache.org/jira/browse/FLINK-6085 > Project: Flink > Issue Type: Improvement > Components: DataStream API, JobManager >Reporter: Chen Qin >Priority: Minor > Attachments: Untitled document.jpg > > > Track discussion around run flink as a micro service, includes but not > limited to > - RPC (web service endpoint) source > as web service endpoint accept RPC call, ingest to the streaming job(only > one) > - callback mechanism > - task assignment should honor deployment group (web tier hosts should be > isolated from rest of task assignment) > https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6085) flink as micro service
[ https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-6085: Attachment: Untitled document.jpg I would like to see if we can agree on high level first. Service is primarily rpc interaction with horizontal scalability and latency requirements. Current way of bridge service with streaming pipeline via distributed Queue provides benefit of failure resilience and topic reuse at cost of extra hardware/software and latency, also no callback support. [~till.rohrmann] > flink as micro service > -- > > Key: FLINK-6085 > URL: https://issues.apache.org/jira/browse/FLINK-6085 > Project: Flink > Issue Type: Improvement > Components: DataStream API, JobManager >Reporter: Chen Qin >Priority: Minor > Attachments: Untitled document.jpg > > > Track discussion around run flink as a micro service, includes but not > limited to > - RPC (web service endpoint) source > as web service endpoint accept RPC call, ingest to the streaming job(only > one) > - callback mechanism > - task assignment should honor deployment group (web tier hosts should be > isolated from rest of task assignment) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6113) Implement split/select with OutputTag
Chen Qin created FLINK-6113: --- Summary: Implement split/select with OutputTag Key: FLINK-6113 URL: https://issues.apache.org/jira/browse/FLINK-6113 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.3.0 Reporter: Chen Qin Priority: Minor With completion of FLINK-4460(side outputs), this is one of follow up item towards deprecate string tag based split/select with OutputTag based split/select. In Flink 2.0, we might consider eventually deprecate split/select -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6085) flink as micro service
Chen Qin created FLINK-6085: --- Summary: flink as micro service Key: FLINK-6085 URL: https://issues.apache.org/jira/browse/FLINK-6085 Project: Flink Issue Type: Improvement Components: DataStream API, JobManager Reporter: Chen Qin Priority: Minor Track discussion around run flink as a micro service, includes but not limited to - RPC (web service endpoint) source as web service endpoint accept RPC call, ingest to the streaming job(only one) - callback mechanism - task assignment should honor deployment group (web tier hosts should be isolated from rest of task assignment) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856972#comment-15856972 ] Chen Qin commented on FLINK-4266: - When this jira was filed, there are multiple issues around incremental checkpointing support as well as scaling non partitioned states of long running jobs. With completion of dynamic scaling of non partitioned states FLINK-4379, scaling large states dynamically no longer becomes a blocker. Flink is also working on a incremental checkpointing design which likely address large states checkpointing performance http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/States-split-over-to-external-storage-td15344.html#none In light of all these efforts, I would like to limit effort of this jira to serve as a split over of rocksdb statebackend. updated states will be batched writes to remote db during checkpointing phase. When jobs running in multiple data centers in parallel, remote split over would enable source fail over without loss states nor manual file movements which introduce latency & errors > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Chen Qin >Assignee: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-4266) Cassandra SplitOver Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4266: Summary: Cassandra SplitOver Statebackend (was: Remote Database Statebackend) > Cassandra SplitOver Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Chen Qin >Assignee: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin reassigned FLINK-4266: --- Assignee: Chen Qin Affects Version/s: (was: 1.2.0) 1.3.0 > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.0 >Reporter: Chen Qin >Assignee: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin reassigned FLINK-4460: --- Assignee: Chen Qin > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin >Assignee: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4266: Affects Version/s: (was: 1.0.3) > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15717166#comment-15717166 ] Chen Qin commented on FLINK-4460: - Ongoing implementation reflecting feedbacks, missing window stream impl https://github.com/apache/flink/compare/master...chenqin:flip > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin > Labels: latearrivingevents, sideoutput > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4460) Side Outputs in Flink
[ https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4460: Affects Version/s: 1.1.3 Labels: latearrivingevents sideoutput (was: ) Priority: Major (was: Minor) Fix Version/s: (was: 1.2.0) 1.1.4 Description: https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing Issue Type: New Feature (was: Improvement) Summary: Side Outputs in Flink (was: Expose Late Arriving Events) > Side Outputs in Flink > - > > Key: FLINK-4460 > URL: https://issues.apache.org/jira/browse/FLINK-4460 > Project: Flink > Issue Type: New Feature > Components: Core, DataStream API >Affects Versions: 1.2.0, 1.1.3 >Reporter: Chen Qin > Labels: latearrivingevents, sideoutput > Fix For: 1.1.4 > > > https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4460) Expose Late Arriving Events
Chen Qin created FLINK-4460: --- Summary: Expose Late Arriving Events Key: FLINK-4460 URL: https://issues.apache.org/jira/browse/FLINK-4460 Project: Flink Issue Type: Improvement Components: Core, DataStream API Affects Versions: 1.2.0 Reporter: Chen Qin Priority: Minor Fix For: 1.2.0 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410474#comment-15410474 ] Chen Qin commented on FLINK-4266: - https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3YR2xLdk/edit?usp=sharing > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.0.3, 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4266) Remote Storage Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4266: Description: Current FileSystem statebackend limits whole state size to disk space. Dealing with scale out checkpoint states beyond local disk space such as long running task that hold window content for long period of time. Pipelines needs to split out states to durable remote storage even replicated to different data centers. We draft a design that leverage checkpoint id as mono incremental logic timestamp and perform range query to get evicited state k/v. we also introduce checkpoint time commit and eviction threshold that reduce "hot states" hitting remote db per every update between adjacent checkpoints by tracking update logs and merge them, do batch updates only when checkpoint; lastly, we are looking for eviction policy that can identify "hot keys" in k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). For now, we don't have good story regarding to data retirement. We might have to keep forever until manually run command and clean per job related state data. Some of features might related to incremental checkpointing feature, we hope to align with effort there also. Welcome comments, I will try to put a draft design doc after gathering some feedback. was: Current FileSystem statebackend limits whole state size to disk space. For long running task that hold window content for long period of time, it needs to split out states to durable remote storage and replicated across data centers. We look into implementation from leverage checkpoint timestamp as versioning and do range query to get current state; we also want to reduce "hot states" hitting remote db per every update between adjacent checkpoints by tracking update logs and merge them, do batch updates only when checkpoint; lastly, we are looking for eviction policy that can identify "hot keys" in k/v state and lazy load those "cold keys" from Cassandra. For now, we don't have good story regarding to data retirement. We might have to keep forever until manually run command and clean per job related state data. Some of features might related to incremental checkpointing feature, we hope to align with effort there also. Welcome comments, I will try to put a draft design doc after gathering some feedback. > Remote Storage Statebackend > --- > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.0.3, 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4266) Remote Database Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4266: Summary: Remote Database Statebackend (was: Remote Storage Statebackend) > Remote Database Statebackend > > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.0.3, 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > Dealing with scale out checkpoint states beyond local disk space such as long > running task that hold window content for long period of time. Pipelines > needs to split out states to durable remote storage even replicated to > different data centers. > We draft a design that leverage checkpoint id as mono incremental logic > timestamp and perform range query to get evicited state k/v. we also > introduce checkpoint time commit and eviction threshold that reduce "hot > states" hitting remote db per every update between adjacent checkpoints by > tracking update logs and merge them, do batch updates only when checkpoint; > lastly, we are looking for eviction policy that can identify "hot keys" in > k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra). > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4266) Remote Storage Statebackend
[ https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Qin updated FLINK-4266: Summary: Remote Storage Statebackend (was: Cassandra StateBackend) > Remote Storage Statebackend > --- > > Key: FLINK-4266 > URL: https://issues.apache.org/jira/browse/FLINK-4266 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.0.3, 1.2.0 >Reporter: Chen Qin >Priority: Minor > > Current FileSystem statebackend limits whole state size to disk space. > For long running task that hold window content for long period of time, it > needs to split out states to durable remote storage and replicated across > data centers. > We look into implementation from leverage checkpoint timestamp as versioning > and do range query to get current state; we also want to reduce "hot states" > hitting remote db per every update between adjacent checkpoints by tracking > update logs and merge them, do batch updates only when checkpoint; lastly, we > are looking for eviction policy that can identify "hot keys" in k/v state and > lazy load those "cold keys" from Cassandra. > For now, we don't have good story regarding to data retirement. We might have > to keep forever until manually run command and clean per job related state > data. Some of features might related to incremental checkpointing feature, we > hope to align with effort there also. > Welcome comments, I will try to put a draft design doc after gathering some > feedback. -- This message was sent by Atlassian JIRA (v6.3.4#6332)