[jira] [Comment Edited] (FLINK-30667) remove the planner @internal dependency in flink-connector-hive

2023-01-18 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003
 ] 

Chen Qin edited comment on FLINK-30667 at 1/19/23 7:19 AM:
---

parser should be PublicEvolving interface while both Flink and hive has own 
internal implementation. so hive connector maintainer less worry about Flink 
planner changes

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector so hive connector can have full control and evolve without worry 
how Flink planner PlannerQueryOperation evolve

PlannerContext is simple enough util can be PublicEvolving


was (Author: foxss):
Paper should be PublicEvolving interface while both Flink and hive has own 
internal implementation. so hive connector maintainer less worry about Flink 
planner changes

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector so hive connector can have full control and evolve without worry 
how Flink planner PlannerQueryOperation evolve

PlannerContext is simple enough util can be PublicEvolving

>  remove the planner @internal dependency in flink-connector-hive
> 
>
> Key: FLINK-30667
> URL: https://issues.apache.org/jira/browse/FLINK-30667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> There are some classes in flink-connector-hive reply on  planner, but 
> fortunately, not too many.
> It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so 
> on.  The dependency is mainly required to create RelNode.
> To resolve this problem,  we need more abstraction for planner and provides 
> public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30667) remove the planner @internal dependency in flink-connector-hive

2023-01-18 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003
 ] 

Chen Qin edited comment on FLINK-30667 at 1/19/23 7:16 AM:
---

Paper should be PublicEvolving interface while both Flink and hive has own 
internal implementation. so hive connector maintainer less worry about Flink 
planner changes

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector so hive connector can have full control and evolve without worry 
how Flink planner PlannerQueryOperation evolve

PlannerContext is simple enough util can be PublicEvolving


was (Author: foxss):
ParserImpl and it's interface currently both Internal. Consider HIveParser 
should not rely on table-planner ParserImpl for shake of future flexibility and 
hive connector maintenance. I would propose annotate Parser Interface with 
PublicEvolving; Let HiveParser directly implement Parser Interface to decouple 
risk might involved with future planner refactor.

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector, thanks to interface QueryOperation were PublicEvolving, I would 
propose setting a foundational FlinkTypeFactory as PublicEvolving as well.

 

PlannerContext could be interface with separate implementations in planner and 
hive-connector

>  remove the planner @internal dependency in flink-connector-hive
> 
>
> Key: FLINK-30667
> URL: https://issues.apache.org/jira/browse/FLINK-30667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.17.0
>
>
> There are some classes in flink-connector-hive reply on  planner, but 
> fortunately, not too many.
> It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so 
> on.  The dependency is mainly required to create RelNode.
> To resolve this problem,  we need more abstraction for planner and provides 
> public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30667) remove the planner @internal dependency in flink-connector-hive

2023-01-17 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-30667:
-
Summary:  remove the planner @internal dependency in flink-connector-hive  
(was:  remove the planner dependency in flink-connector-hive)

>  remove the planner @internal dependency in flink-connector-hive
> 
>
> Key: FLINK-30667
> URL: https://issues.apache.org/jira/browse/FLINK-30667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
> Fix For: 1.17.0
>
>
> There are some classes in flink-connector-hive reply on  planner, but 
> fortunately, not too many.
> It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so 
> on.  The dependency is mainly required to create RelNode.
> To resolve this problem,  we need more abstraction for planner and provides 
> public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-30667) remove the planner dependency in flink-connector-hive

2023-01-17 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003
 ] 

Chen Qin edited comment on FLINK-30667 at 1/18/23 3:58 AM:
---

ParserImpl and it's interface currently both Internal. Consider HIveParser 
should not rely on table-planner ParserImpl for shake of future flexibility and 
hive connector maintenance. I would propose annotate Parser Interface with 
PublicEvolving; Let HiveParser directly implement Parser Interface to decouple 
risk might involved with future planner refactor.

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector, thanks to interface QueryOperation were PublicEvolving, I would 
propose setting a foundational FlinkTypeFactory as PublicEvolving as well.

 

PlannerContext could be interface with separate implementations in planner and 
hive-connector


was (Author: foxss):
ParserImpl and it's interface currently both Internal. Consider HIveParser 
should not rely on table-planner ParserImpl for shake of future flexibility and 
hive connector maintenance. I would propose annotate Parser Interface with 
PublicEvolving; Let HiveParser directly implement Parser Interface to decouple 
risk might involved with future planner refactor.

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector, thanks to interface QueryOperation were PublicEvolving, I would 
propose setting a foundational FlinkTypeFactory as PublicEvolving as well.

 

PlannerContext still needs a bit thoughts.

>  remove the planner dependency in flink-connector-hive
> --
>
> Key: FLINK-30667
> URL: https://issues.apache.org/jira/browse/FLINK-30667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
> Fix For: 1.17.0
>
>
> There are some classes in flink-connector-hive reply on  planner, but 
> fortunately, not too many.
> It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so 
> on.  The dependency is mainly required to create RelNode.
> To resolve this problem,  we need more abstraction for planner and provides 
> public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-30667) remove the planner dependency in flink-connector-hive

2023-01-17 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17678003#comment-17678003
 ] 

Chen Qin commented on FLINK-30667:
--

ParserImpl and it's interface currently both Internal. Consider HIveParser 
should not rely on table-planner ParserImpl for shake of future flexibility and 
hive connector maintenance. I would propose annotate Parser Interface with 
PublicEvolving; Let HiveParser directly implement Parser Interface to decouple 
risk might involved with future planner refactor.

PlannerQueryOperation should keep internal in both table-planner as well as 
hive-connector, thanks to interface QueryOperation were PublicEvolving, I would 
propose setting a foundational FlinkTypeFactory as PublicEvolving as well.

 

PlannerContext still needs a bit thoughts.

>  remove the planner dependency in flink-connector-hive
> --
>
> Key: FLINK-30667
> URL: https://issues.apache.org/jira/browse/FLINK-30667
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
> Fix For: 1.17.0
>
>
> There are some classes in flink-connector-hive reply on  planner, but 
> fortunately, not too many.
> It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so 
> on.  The dependency is mainly required to create RelNode.
> To resolve this problem,  we need more abstraction for planner and provides 
> public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30659) move flink-sql-parser-hive to flink-connector-hive-parent

2023-01-12 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-30659:
-
Summary: move flink-sql-parser-hive to flink-connector-hive-parent  (was: 
move flink-sql-parser-hive to flink-connector-hive)

> move flink-sql-parser-hive to flink-connector-hive-parent
> -
>
> Key: FLINK-30659
> URL: https://issues.apache.org/jira/browse/FLINK-30659
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
> Fix For: 1.17.0
>
>
> Hive Parser should stay with hive connector and maintained together. During 
> runtime, those package should load/unload together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30667) remove the planner dependency in flink-connector-hive

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30667:


 Summary:  remove the planner dependency in flink-connector-hive
 Key: FLINK-30667
 URL: https://issues.apache.org/jira/browse/FLINK-30667
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


There are some classes in flink-connector-hive reply on  planner, but 
fortunately, not too many.

It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so on.  
The dependency is mainly required to create RelNode.

To resolve this problem,  we need more abstraction for planner and provides 
public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30664) [Connector/Hive] cleanup hive/haoop package ambiguous package dependencies

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30664:


 Summary: [Connector/Hive] cleanup hive/haoop package ambiguous 
package dependencies
 Key: FLINK-30664
 URL: https://issues.apache.org/jira/browse/FLINK-30664
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


hive and hive-metastore combination introduced multiple versions of dependency 
packages, the goal is to ensure hive-connector has deterministic dependency 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30660) move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30660:


 Summary: move SQLClientHiveITCase and TestHiveCatalogFactory to 
flink-connector-hive e2e
 Key: FLINK-30660
 URL: https://issues.apache.org/jira/browse/FLINK-30660
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Tests
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

[https://github.com/apache/flink/pull/16532/files#]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30659) move flink-sql-parser-hive to flink-connector-hive

2023-01-12 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-30659:
-
Summary: move flink-sql-parser-hive to flink-connector-hive  (was: move 
Flink-sql-parser-hive to flink-connector-hive)

> move flink-sql-parser-hive to flink-connector-hive
> --
>
> Key: FLINK-30659
> URL: https://issues.apache.org/jira/browse/FLINK-30659
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / Hive, Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Major
> Fix For: 1.17.0
>
>
> Hive Parser should stay with hive connector and maintained together. During 
> runtime, those package should load/unload together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30658) remove flink-sql-parser-hive dependency in table-planner

2023-01-12 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-30658:
-
Summary: remove flink-sql-parser-hive dependency in table-planner  (was: 
remove Flink-sql-parser-hive dependency in table-planner)

> remove flink-sql-parser-hive dependency in table-planner
> 
>
> Key: FLINK-30658
> URL: https://issues.apache.org/jira/browse/FLINK-30658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Minor
> Fix For: 1.17.0
>
>
> In order to move Flink-sql-parser-hive out of Flink-table, we need to remove 
> Flink-sql-parser-hive package dependency in Flink-table-planner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30659) move Flink-sql-parser-hive to flink-connector-hive

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30659:


 Summary: move Flink-sql-parser-hive to flink-connector-hive
 Key: FLINK-30659
 URL: https://issues.apache.org/jira/browse/FLINK-30659
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


Hive Parser should stay with hive connector and maintained together. During 
runtime, those package should load/unload together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-30658) remove Flink-sql-parser-hive dependency in table-planner

2023-01-12 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-30658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-30658:
-
Summary: remove Flink-sql-parser-hive dependency in table-planner  (was: 
remove Flink-sql-parser-hive dependency on table-planner)

> remove Flink-sql-parser-hive dependency in table-planner
> 
>
> Key: FLINK-30658
> URL: https://issues.apache.org/jira/browse/FLINK-30658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Planner
>Affects Versions: 1.17.0
>Reporter: Chen Qin
>Priority: Minor
> Fix For: 1.17.0
>
>
> In order to move Flink-sql-parser-hive out of Flink-table, we need to remove 
> Flink-sql-parser-hive package dependency in Flink-table-planner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30658) remove Flink-sql-parser-hive dependency on table-planner

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30658:


 Summary: remove Flink-sql-parser-hive dependency on table-planner
 Key: FLINK-30658
 URL: https://issues.apache.org/jira/browse/FLINK-30658
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


In order to move Flink-sql-parser-hive out of Flink-table, we need to remove 
Flink-sql-parser-hive package dependency in Flink-table-planner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-27640) Flink not compiling, flink-connector-hive_2.12 is missing jhyde pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde

2022-12-11 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-27640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17645799#comment-17645799
 ] 

Chen Qin commented on FLINK-27640:
--

shall we exclude org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
instead, I don't think this package is used.

here is how hudi community fix  https://github.com/apache/hudi/pull/3034

> Flink not compiling, flink-connector-hive_2.12 is missing jhyde 
> pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde 
> --
>
> Key: FLINK-27640
> URL: https://issues.apache.org/jira/browse/FLINK-27640
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Connectors / Hive
>Affects Versions: 1.16.0
>Reporter: Piotr Nowojski
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: pull-request-available, stale-assigned
>
> When clean installing whole project after cleaning local {{.m2}} directory I 
> encountered the following error when compiling flink-connector-hive_2.12:
> {noformat}
> [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could 
> not resolve dependencies for project 
> org.apache.flink:flink-connector-hive_2.12:jar:1.16-SNAPSHOT: Failed to 
> collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
> artifact descriptor for 
> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
> artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
> maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for 
> repositories: [conjars (http://conjars.org/repo, default, 
> releases+snapshots), apache.snapshots 
> (http://repository.apache.org/snapshots, default, snapshots)] -> [Help 1]
> {noformat}
> I've solved this by adding 
> {noformat}
> 
> spring-repo-plugins
> https://repo.spring.io/ui/native/plugins-release/
> 
> {noformat}
> to ~/.m2/settings.xml file. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30362) Flink-connector-hive can't build with maven 3.8

2022-12-11 Thread Chen Qin (Jira)
Chen Qin created FLINK-30362:


 Summary: Flink-connector-hive can't build with maven 3.8
 Key: FLINK-30362
 URL: https://issues.apache.org/jira/browse/FLINK-30362
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.15.3, 1.16.0, 1.17.0
 Environment: install maven 3.8.1+

git clone flink repo

run mvn clean pcakge
Reporter: Chen Qin
 Fix For: 1.17.0, 1.16.1, 1.15.4


Flink connector hive pull in hive-exec  which depends on 
org.pentaho:pentaho-aggdesigner-algorithm in blocked jboss mirror. 

This is cve related issue which blockeds upgrade to maven 3.8.1+

[https://maven.apache.org/docs/3.8.1/release-notes.html#cve-2021-26291]

 
{code:java}
[ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could not 
resolve dependencies for project 
org.apache.flink:flink-connector-hive_2.12:jar:1.17-SNAPSHOT: Failed to collect 
dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
artifact descriptor for 
org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: 
[repository.jboss.org 
(http://repository.jboss.org/nexus/content/groups/public/, default, disabled), 
conjars (http://conjars.org/repo, default, releases+snapshots), 
apache.snapshots (http://repository.apache.org/snapshots, default, snapshots)] 
-> [Help 1]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-27726) shade thrift and fb303 in hive connector

2022-05-20 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-27726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-27726:
-
Summary: shade thrift and fb303 in hive connector  (was: shad thrift and 
fb303 in hive connector)

> shade thrift and fb303 in hive connector
> 
>
> Key: FLINK-27726
> URL: https://issues.apache.org/jira/browse/FLINK-27726
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hive
>Affects Versions: 1.12.3, 1.13.1, 1.12.4, 1.12.5, 1.13.2, 1.13.3, 1.15.0, 
> 1.11.6, 1.12.7, 1.13.5, 1.14.2, 1.13.6, 1.14.3, 1.14.4
>Reporter: Chen Qin
>Priority: Minor
>
> Hive connector introduced fb303 and thrift version to connect to specific 
> hive meta store version. If user code also pull specific thrift version along 
> with fb303 that is not same as hive connector introduced, user code will not 
> able to connect to hive meta store.
>  
> This fix has been verified in production environment as part of support 
> thrift encoded FlinkSQL for more than 6 months.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Created] (FLINK-27726) shad thrift and fb303 in hive connector

2022-05-20 Thread Chen Qin (Jira)
Chen Qin created FLINK-27726:


 Summary: shad thrift and fb303 in hive connector
 Key: FLINK-27726
 URL: https://issues.apache.org/jira/browse/FLINK-27726
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.14.4, 1.14.3, 1.13.6, 1.14.2, 1.13.5, 1.12.7, 1.11.6, 
1.15.0, 1.13.3, 1.13.2, 1.12.5, 1.12.4, 1.13.1, 1.12.3
Reporter: Chen Qin


Hive connector introduced fb303 and thrift version to connect to specific hive 
meta store version. If user code also pull specific thrift version along with 
fb303 that is not same as hive connector introduced, user code will not able to 
connect to hive meta store.

 

This fix has been verified in production environment as part of support thrift 
encoded FlinkSQL for more than 6 months.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2021-04-23 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1733#comment-1733
 ] 

Chen Qin commented on FLINK-10052:
--

Set timeline on leader change to new address null exception. 
 In this time, when curator signaled zk suspened state, other code path 
deregister task executor in other instance resulting restart.

Basically, when suspended message land to container 1, container 2 react with 
TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) and 
exception out.

While it all point to supspended message handling, this part doesn't seems 
directly touch changed code path.

 

Here is timeline of warn/exceptions 

on container_e26_1617655625710_9571_01_17
 2021-04-23T13:57:37.290 - Connection to ZooKeeper suspended. Can no longer 
retrieve the leader from ZooKeeper.
 2021-04-23T13:57:37.304 - Connection to ZooKeeper suspended. Can no longer 
retrieve the leader from ZooKeeper.

on container_e26_1617655625710_9571_01_01

2021-04-23T13:57:37.333 - USER_EVENTS.spo_derived_event.SINK-stream_joiner -> 
USER_EVENTS.spo_derived_event.SINK-late-event-tracker (32/270) 
(c60dc612ec4d703d1bff646c3442193a) switched from RUNNING to FAILED on 
container_e26_1617655625710_9571_01_17 @ 
xenon-pii-dev-001-20191210-data-slave-dev-0a01fa8b.ec2.pin220.com 
(dataPort=45229). org.apache.flink.util.FlinkException: ResourceManager leader 
changed to new address null
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
 at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)

on container_e26_1617655625710_9571_01_17 

2021-04-23T13:57:38.465 - Connection to ZooKeeper lost. Can no longer retrieve 
the leader from ZooKeeper.
 2021-04-23T13:57:38.496 - Unable to reconnect to ZooKeeper service, session 
0x1050b21fe3006a6 has expired

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1
>Reporter: Till Rohrmann
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2021-04-23 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330890#comment-17330890
 ] 

Chen Qin commented on FLINK-10052:
--

here is another exception we observed in another job after apply this pr

{code:java}
2021-04-23 11:09:03,388 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e26_1617655625710_8692_01_000115 because: ResourceManager leader 
changed to new address null
2021-04-23 11:09:03,391 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) 
(bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on 
container_e26_1617655625710_8692_01_000115 @ .ec2.pin220.com 
(dataPort=46719).
org.apache.flink.util.FlinkException: ResourceManager leader changed to new 
address null
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
at akka.actor.ActorCell.invoke(ActorCell.scala:581)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}


> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1
>Reporter: Till Rohrmann
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2021-04-23 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330890#comment-17330890
 ] 

Chen Qin edited comment on FLINK-10052 at 4/23/21, 4:13 PM:


here is another exception we observed in another job, may or may not caused by 
this pr.

{code:java}
2021-04-23 11:09:03,388 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e26_1617655625710_8692_01_000115 because: ResourceManager leader 
changed to new address null
2021-04-23 11:09:03,391 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) 
(bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on 
container_e26_1617655625710_8692_01_000115 @ .ec2.pin220.com 
(dataPort=46719).
org.apache.flink.util.FlinkException: ResourceManager leader changed to new 
address null
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
at akka.actor.ActorCell.invoke(ActorCell.scala:581)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}



was (Author: foxss):
here is another exception we observed in another job after apply this pr

{code:java}
2021-04-23 11:09:03,388 INFO  org.apache.flink.yarn.YarnResourceManager 
- Closing TaskExecutor connection 
container_e26_1617655625710_8692_01_000115 because: ResourceManager leader 
changed to new address null
2021-04-23 11:09:03,391 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
USER_AGGREGATE_STATE.user_signal_v2.SINK-async (200/360) 
(bf815073df08c3426bf41b63d74510fb) switched from RUNNING to FAILED on 
container_e26_1617655625710_8692_01_000115 @ .ec2.pin220.com 
(dataPort=46719).
org.apache.flink.util.FlinkException: ResourceManager leader changed to new 
address null
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at 

[jira] [Comment Edited] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2021-04-23 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330858#comment-17330858
 ] 

Chen Qin edited comment on FLINK-10052 at 4/23/21, 4:04 PM:


run load testing on pr, seems suspended message no longer trigger leadership 
lost and job restart. At same time, found following exception when job restarts 
caused by other user jar issue.

{code:java}
2021-04-21 18:24:44,639 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx:33435/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: Unhandled error in 
ZooKeeperLeaderRetrievalService:Background operation retry gave up
at 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.unhandledError(ZooKeeperLeaderRetrievalService.java:208)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100)
at 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:874)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:990)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:943)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:66)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:346)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException:
 KeeperErrorCode = ConnectionLoss
at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:862)
... 10 more
{code}


was (Author: foxss):
run load testing on pr, found following exception when job restarts.

{code:java}
2021-04-21 18:24:44,639 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx:33435/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: Unhandled error in 
ZooKeeperLeaderRetrievalService:Background operation retry gave up
at 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.unhandledError(ZooKeeperLeaderRetrievalService.java:208)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100)
at 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708)
at 

[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2021-04-23 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17330858#comment-17330858
 ] 

Chen Qin commented on FLINK-10052:
--

run load testing on pr, found following exception when job restarts.

{code:java}
2021-04-21 18:24:44,639 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx:33435/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: Unhandled error in 
ZooKeeperLeaderRetrievalService:Background operation retry gave up
at 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.unhandledError(ZooKeeperLeaderRetrievalService.java:208)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:713)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$6.apply(CuratorFrameworkImpl.java:709)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer$1.run(ListenerContainer.java:100)
at 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:92)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.logError(CuratorFrameworkImpl.java:708)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:874)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:990)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:943)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:66)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:346)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException:
 KeeperErrorCode = ConnectionLoss
at 
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
at 
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:862)
... 10 more
{code}

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1
>Reporter: Till Rohrmann
>Assignee: Zili Chen
>Priority: Major
>  Labels: pull-request-available, stale-assigned
> Fix For: 1.13.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6113) Implement split/select with Side Outputs

2021-04-14 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321877#comment-17321877
 ] 

Chen Qin commented on FLINK-6113:
-

Seems community already get rid of split and select transformation in master 
branch. So this Jira seems no longer make sense.

attaching patch for curious minds.

[^split_select.patch]

 

> Implement split/select with Side Outputs
> 
>
> Key: FLINK-6113
> URL: https://issues.apache.org/jira/browse/FLINK-6113
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Priority: Minor
>  Labels: stale-minor
> Attachments: split_select.patch
>
>
> With completion of FLINK-4460(side outputs), this is one of follow up item 
> towards deprecate string tag based split/select with OutputTag based 
> split/select.
> In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-6113) Implement split/select with Side Outputs

2021-04-14 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin closed FLINK-6113.
---
Resolution: Fixed

> Implement split/select with Side Outputs
> 
>
> Key: FLINK-6113
> URL: https://issues.apache.org/jira/browse/FLINK-6113
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Priority: Minor
>  Labels: stale-minor
> Attachments: split_select.patch
>
>
> With completion of FLINK-4460(side outputs), this is one of follow up item 
> towards deprecate string tag based split/select with OutputTag based 
> split/select.
> In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-6113) Implement split/select with Side Outputs

2021-04-14 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-6113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-6113:

Attachment: split_select.patch

> Implement split/select with Side Outputs
> 
>
> Key: FLINK-6113
> URL: https://issues.apache.org/jira/browse/FLINK-6113
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Priority: Minor
>  Labels: stale-minor
> Attachments: split_select.patch
>
>
> With completion of FLINK-4460(side outputs), this is one of follow up item 
> towards deprecate string tag based split/select with OutputTag based 
> split/select.
> In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17312867#comment-17312867
 ] 

Chen Qin commented on FLINK-22081:
--

[~AHeise] could you assign this Jira to me and help review pr?

> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
> Attachments: image (13).png
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a plugin.
>  
> Repro steps: 
> Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
> key _entropy_
> observe checkpoint dir with entropy marker not removed.
> s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  
> compare to removed when running Flink 1.9.1
> s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  
> Add some logging to getEntropyFs, observe it return null because passed in 
> parameter is not {{SafetyNetWrapperFileSystem}} but 
> {{ClassLoaderFixingFileSystem}}
> Apply patch, build release and run same job, resolved issue as attachment 
> shows
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-22081:
-
Description: 
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Repro steps: 

Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
key _entropy_
observe checkpoint dir with entropy marker not removed.
s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  
compare to removed when running Flink 1.9.1
s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  

Add some logging to getEntropyFs, observe it return null because passed in 
parameter is not {{SafetyNetWrapperFileSystem}} but 
{{ClassLoaderFixingFileSystem}}

Apply patch, build release and run same job, resolved issue as attachment shows

 

 

  was:
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Repro steps: 

Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
key _entropy_

observe checkpoint dir with entropy marker not removed.

s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  

compare to removed when running Flink 1.9.1

s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  

 

Add some logging to getEntropyFs, observe it return null because passed in 
parameter is not {{SafetyNetWrapperFileSystem}} but 
{{ClassLoaderFixingFileSystem}}

 

Apply patch, build release and run same job, resolved issue as attachment shows

 

 


> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
> Attachments: image (13).png
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a 

[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-22081:
-
Description: 
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Repro steps: 

Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
key _entropy_

observe checkpoint dir with entropy marker not removed.

s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  

compare to removed when running Flink 1.9.1

s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  

 

Add some logging to getEntropyFs, observe it return null because passed in 
parameter is not {{SafetyNetWrapperFileSystem}} but 
{{ClassLoaderFixingFileSystem}}

 

Apply patch, build release and run same job, resolved issue as attachment shows

 

 

  was:
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Repro steps: 

Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
key _entropy_

observe checkpoint dir with entropy marker not removed.

s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  

compare to removed when running Flink 1.9.1

s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  

 

Add some logging to getEntropyFs, observe it return null because passed in 
parameter is not {{SafetyNetWrapperFileSystem}} but 
{{ClassLoaderFixingFileSystem}}

 

Apply patch, build release and run same job

 

 


> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
> Attachments: image (13).png
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a plugin.
>  
> Repro steps: 

[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-22081:
-
Attachment: image (13).png

> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
> Attachments: image (13).png
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a plugin.
>  
> Repro steps: 
> Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
> key _entropy_
> observe checkpoint dir with entropy marker not removed.
> s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  
> compare to removed when running Flink 1.9.1
> s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  
>  
> Add some logging to getEntropyFs, observe it return null because passed in 
> parameter is not {{SafetyNetWrapperFileSystem}} but 
> {{ClassLoaderFixingFileSystem}}
>  
> Apply patch, build release and run same job
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-22081:
-
Description: 
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Repro steps: 

Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
key _entropy_

observe checkpoint dir with entropy marker not removed.

s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  

compare to removed when running Flink 1.9.1

s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  

 

Add some logging to getEntropyFs, observe it return null because passed in 
parameter is not {{SafetyNetWrapperFileSystem}} but 
{{ClassLoaderFixingFileSystem}}

 

Apply patch, build release and run same job

 

 

  was:
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Current behavior when using hadoop-s3

s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/6da165c7b3c8422125abbfdb97ca9c04/chk-5/
   

 


> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
> Attachments: image (13).png
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a plugin.
>  
> Repro steps: 
> Flink 1.11.2 with flink-s3-fs-hadoop as plugin and turn on entropy injection 
> key _entropy_
> observe checkpoint dir with entropy marker not removed.
> s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/jobid/chk-5/  
> compare to removed when running Flink 1.9.1
> s3a://xxx/dev/checkpoints/xenon/event-stream-splitter/jobid/chk-5/  
>  
> Add some logging to getEntropyFs, observe it return null because passed in 
> parameter is not 

[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-22081:
-
Description: 
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

 

Current behavior when using hadoop-s3

s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/6da165c7b3c8422125abbfdb97ca9c04/chk-5/
   

 

  was:
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.


> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a plugin.
>  
> Current behavior when using hadoop-s3
> s3a://xxx/dev/checkpoints/_entropy_/xenon/event-stream-splitter/6da165c7b3c8422125abbfdb97ca9c04/chk-5/
>    
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-22081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-22081:
-
Fix Version/s: 1.10.2
   1.10.3
   1.11.1
   1.11.2
   1.11.3
   1.12.0
   1.12.1
   1.12.2
   1.12.3
   1.13.0
   1.11.4
   1.10.4
  Description: 
Using flink 1.11.2

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
and if so we check if the filesysystem is of type {{SafetyNetWrapperFileSystem 
as well as it's delegate }}but don't check for 
{{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 directly in getEntorpyFs method which would be the type if S3 file system 
dependencies are added as a plugin.

  was:
Using flink 1.10

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} and 
if so we check if the delegate is of type {{EntropyInjectingFileSystem}} but 
don't check for {{[ClassLoaderFixingFileSystem 
|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 which would be the type if S3 file system dependencies are added as a plugin.

 Priority: Minor  (was: Major)

> Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin
> ---
>
> Key: FLINK-22081
> URL: https://issues.apache.org/jira/browse/FLINK-22081
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Chen Qin
>Assignee: Prem Santosh
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.10.2, 1.10.3, 1.10.4, 1.11.0, 1.11.1, 1.11.2, 
> 1.11.3, 1.11.4, 1.12.0, 1.12.1, 1.12.2, 1.13.0, 1.12.3
>
>
> Using flink 1.11.2
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{ClassLoaderFixingFileSystem}} 
> and if so we check if the filesysystem is of type 
> {{SafetyNetWrapperFileSystem as well as it's delegate }}but don't check for 
> {{[ClassLoaderFixingFileSystem|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  directly in getEntorpyFs method which would be the type if S3 file system 
> dependencies are added as a plugin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)
Chen Qin created FLINK-22081:


 Summary: Entropy key not resolved if flink-s3-fs-hadoop is added 
as a plugin
 Key: FLINK-22081
 URL: https://issues.apache.org/jira/browse/FLINK-22081
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Reporter: Chen Qin
Assignee: Prem Santosh
 Fix For: 1.10.1, 1.11.0


Using flink 1.10

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} and 
if so we check if the delegate is of type {{EntropyInjectingFileSystem}} but 
don't check for {{[ClassLoaderFixingFileSystem 
|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 which would be the type if S3 file system dependencies are added as a plugin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17359) Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-30 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311956#comment-17311956
 ] 

Chen Qin commented on FLINK-17359:
--

[https://github.com/apache/flink/pull/15442]  

> Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin
> --
>
> Key: FLINK-17359
> URL: https://issues.apache.org/jira/browse/FLINK-17359
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Prem Santosh
>Assignee: Prem Santosh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>
> Using flink 1.10
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} 
> and if so we check if the delegate is of type {{EntropyInjectingFileSystem}} 
> but don't check for {{[ClassLoaderFixingFileSystem 
> |https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  which would be the type if S3 file system dependencies are added as a plugin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17359) Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-24 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17307991#comment-17307991
 ] 

Chen Qin commented on FLINK-17359:
--

Hi  [~premsantosh], We see a regression of this bug with 1.11.2.

detail[ 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-job-cannot-find-recover-path-after-using-entropy-injection-for-s3-file-systems-tp49527p49656.html|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Flink-job-cannot-find-recover-path-after-using-entropy-injection-for-s3-file-systems-tp49527p49656.html]

do you have cycle to review a small fix pr?

> Entropy key is not resolved if flink-s3-fs-hadoop is added as a plugin
> --
>
> Key: FLINK-17359
> URL: https://issues.apache.org/jira/browse/FLINK-17359
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystems
>Reporter: Prem Santosh
>Assignee: Prem Santosh
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>
> Using flink 1.10
> I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
> checkpoints paths like 
> {{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
>  which means the entropy injection key is not being resolved. After some 
> debugging I found that in the 
> [EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
>  we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} 
> and if so we check if the delegate is of type {{EntropyInjectingFileSystem}} 
> but don't check for {{[ClassLoaderFixingFileSystem 
> |https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
>  which would be the type if S3 file system dependencies are added as a plugin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-18158) Add a utility to create a DDL statement from avro schema

2020-07-22 Thread Chen Qin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-18158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17162956#comment-17162956
 ] 

Chen Qin commented on FLINK-18158:
--

[~twalthr]what if user have nested struct definition in protobuf/thrift schema?

struct {

   map>> property.

}

> Add a utility to create a DDL statement from avro schema
> 
>
> Key: FLINK-18158
> URL: https://issues.apache.org/jira/browse/FLINK-18158
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> User asked if there is a way to create a TableSchema/Table originating from 
> avro schema. 
> https://lists.apache.org/thread.html/r9bd43449314230fad0b627a170db05284c9727371092fc275fc05b74%40%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-13 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin closed FLINK-16392.

Resolution: Feedback Received

> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left 
> and right buffer. This design choice reduce minimize heap memory footprint 
> while bounded process throughput of single taskmanager iops to rocksdb access 
> speed. Here at Pinterest, we have some large use cases where developers join 
> large and slow evolving data stream (e.g post updates in last 28 days) with 
> web traffic datastream (e.g post views up to 28 days after given update).
> This post some challenge to current implementation of intervaljoin
>  * partitioned rocksdb needs to keep both updates and views for 28 days, 
> large buffer(especially view stream side) cause rocksdb slow down and lead to 
> overall interval join performance degregate quickly as state build up.
>  * view stream is web scale, even after setting large parallelism it can put 
> lot of pressure on each subtask and backpressure entire job
> In proposed implementation, we plan to introduce two changes
>  * support ProcessJoinFunction settings to opt-in earlier cleanup time of 
> right stream(e.g view stream don't have to stay in buffer for 28 days and 
> wait for update stream to join, related post views happens after update in 
> event time semantic) This optimization can reduce state size to improve 
> rocksdb throughput. If extreme case, user can opt-in in flight join and skip 
> write into right view stream buffer to save iops budget on each subtask
>  * support ProcessJoinFunction settings to expedite keyed lookup of slow 
> changing stream. Instead of every post view pull post updates from rocksdb. 
> user can opt-in and having one side buffer cache available in memory. If a 
> given post update, cache load recent views from right buffer and use 
> sortedMap to find buckets. If a given post view, cache load recent updates 
> from left buffer to memory. When another view for that post arrives, flink 
> save cost of rocksdb access.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-05 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-
Description: 
Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left and 
right buffer. This design choice reduce minimize heap memory footprint while 
bounded process throughput of single taskmanager iops to rocksdb access speed. 
Here at Pinterest, we have some large use cases where developers join large and 
slow evolving data stream (e.g post updates in last 28 days) with web traffic 
datastream (e.g post views up to 28 days after given update).

This post some challenge to current implementation of intervaljoin
 * partitioned rocksdb needs to keep both updates and views for 28 days, large 
buffer(especially view stream side) cause rocksdb slow down and lead to overall 
interval join performance degregate quickly as state build up.

 * view stream is web scale, even after setting large parallelism it can put 
lot of pressure on each subtask and backpressure entire job

In proposed implementation, we plan to introduce two changes
 * support ProcessJoinFunction settings to opt-in earlier cleanup time of right 
stream(e.g view stream don't have to stay in buffer for 28 days and wait for 
update stream to join, related post views happens after update in event time 
semantic) This optimization can reduce state size to improve rocksdb 
throughput. If extreme case, user can opt-in in flight join and skip write into 
right view stream buffer to save iops budget on each subtask

 * support ProcessJoinFunction settings to expedite keyed lookup of slow 
changing stream. Instead of every post view pull post updates from rocksdb. 
user can opt-in and having one side buffer cache available in memory. If a 
given post update, cache load recent views from right buffer and use sortedMap 
to find buckets. If a given post view, cache load recent updates from left 
buffer to memory. When another view for that post arrives, flink save cost of 
rocksdb access.

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream pulled from low QPS source
 * from right stream to left stream lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in more aggresive right buffer cleanup

 ** allow overwrite earlier clean up right stream earlier than interval 
upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 


> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left 
> and right buffer. This design choice reduce minimize heap memory footprint 
> while bounded process throughput of single taskmanager iops to rocksdb access 
> speed. Here at Pinterest, we have some large use cases where developers join 
> large and slow evolving data stream (e.g post updates in last 28 days) with 

[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-03 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-
Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream pulled from low QPS source
 * from right stream to left stream lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in more aggresive right buffer cleanup

 ** allow overwrite earlier clean up right stream earlier than interval 
upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by customize and inherit from 
ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 


> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following similar pattern
>  * left stream pulled from low QPS source
>  * from right stream to left stream lookup 

[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-03 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-
Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by customize and inherit from 
ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ssd and lot more to spin 
disk, this is a small overhead (1% - 5%) to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic. If key recurring more than 1 access in same direction on 
cache, we expect significant perf gain.

 


> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following 

[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-03 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-
Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ssd and lot more to spin 
disk, this is a small overhead (1% - 5%) to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic. If key recurring more than 1 access in same direction on 
cache, we expect significant perf gain.

 

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 


> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following similar pattern
> 

[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-03 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-
Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in use cases like above by inherit from ProcessJoinFunction.
 ** whether skip trigger scan from left events(static data set)
 ** allow set earlier clean up right stream earlier than interval upper-bound
 * leverage ram cache on demand build sortedMap from it's otherBuffer for each 
join key, in our use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** by default cache size is set to 1 key
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.
 * how is performance
 ** Given assumption ram is magnitude faster than ram, this is a small overhead 
(<5%) to populate cache, compare with current rocksdb implemenation, we need do 
full loop at every event. It saves on bucket scan logic. If key recurring more 
than 1 access in same direction on cache, we expect significant perf gain.

 

  was:
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in by inherit from ProcessJoinFunction.
 ** if they want to skip trigger scan from left events(static data set)
 * on demand build sortedMap from it's otherBuffer for each join key, in our 
use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 


> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following similar pattern
>  * left stream  pulled from slow evolving static dataset periodically
>  * lookup time range is very large (days weeks)
>  * right stream is web traffic with high QPS
> In current interval join implementation, we treat both streams equal and 
> ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
> fetch and update gets more 

[jira] [Updated] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-02 Thread Chen Qin (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-16392:
-
Description: 
IntervalJoin is getting lots of usecases in our side. Those use cases shares 
following similar pattern
 * left stream  pulled from slow evolving static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal and 
ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
fetch and update gets more expensive, performance took hit to unblock large use 
cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in by inherit from ProcessJoinFunction.
 ** if they want to skip trigger scan from left events(static data set)
 * on demand build sortedMap from it's otherBuffer for each join key, in our 
use cases, it helps
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime (disk -> sorted memory cache)
 ** if a key see event from left side, it cleanup cache and load cache from 
right side
 ** in worst case scenario, we only see two stream with round robin 
processElement1 and processElement2 of same set of keys at same frequency. 
Performance is expected to be similar with current implementation, memory 
footprint will be bounded by 1/2 state size.

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 

  was:
IntervalJoin is getting lots of usecases. Those use cases shares following 
similar pattern
 * left stream  pulled from static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal. 
Specifically as rocksdb fetch and update getting more expensive, performance 
took hit and unblock large use cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in in ProcessJoinFunction if they want to skip scan when 
intervaljoin operator receive events from left stream(static data set)
 * build sortedMap from otherBuffer of each seen key granularity
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime
 ** if a key see event from left side, it cleanup buffer and load buffer from 
right side

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 


> oneside sorted cache in intervaljoin
> 
>
> Key: FLINK-16392
> URL: https://issues.apache.org/jira/browse/FLINK-16392
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream
>Affects Versions: 1.10.0
>Reporter: Chen Qin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> IntervalJoin is getting lots of usecases in our side. Those use cases shares 
> following similar pattern
>  * left stream  pulled from slow evolving static dataset periodically
>  * lookup time range is very large (days weeks)
>  * right stream is web traffic with high QPS
> In current interval join implementation, we treat both streams equal and 
> ondemand pull / scan keyed state from backend (rocksdb here). When rocksdb 
> fetch and update gets more expensive, performance took hit to unblock large 
> use cases.
> In proposed implementation, we plan to introduce two changes
>  * allow user opt-in by inherit from ProcessJoinFunction.
>  ** if they want to skip trigger scan from left events(static data set)
>  * on demand build sortedMap from it's otherBuffer for each join key, in our 
> use cases, it helps
>  ** expedite right stream lookup of left buffers without access rocksdb 
> everytime (disk -> sorted memory cache)
>  ** if a key see event from left side, it cleanup cache and load cache from 
> right side
>  ** in worst case scenario, we only see two stream with round robin 
> processElement1 and processElement2 of same set of keys at same 

[jira] [Created] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-02 Thread Chen Qin (Jira)
Chen Qin created FLINK-16392:


 Summary: oneside sorted cache in intervaljoin
 Key: FLINK-16392
 URL: https://issues.apache.org/jira/browse/FLINK-16392
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Chen Qin
 Fix For: 1.11.0


IntervalJoin is getting lots of usecases. Those use cases shares following 
similar pattern
 * left stream  pulled from static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal. 
Specifically as rocksdb fetch and update getting more expensive, performance 
took hit and unblock large use cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in in ProcessJoinFunction if they want to skip scan when 
intervaljoin operator receive events from left stream(static data set)
 * build sortedMap from otherBuffer of each seen key granularity
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime
 ** if a key see event from left side, it cleanup buffer and load buffer from 
right side

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-8437) SideOutput() API is ambiguous

2018-01-22 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin reassigned FLINK-8437:
---

Assignee: Chen Qin

> SideOutput() API is ambiguous
> -
>
> Key: FLINK-8437
> URL: https://issues.apache.org/jira/browse/FLINK-8437
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chen Qin
>Priority: Minor
>
> The current API for retrieving Side Outputs is a bit ambiguous. Consider the 
> program below:
> {code:java}
> sideOutput = stream
>     .process(...)
>     .filter(...)
>     .getSideOutput(tag)
> {code}
> This may be the sideOutput of the process function that is passed through the 
> API for convenience, or the sideOutput of the filter function (which would 
> always be empty).
> Given that only process functions can have sideOutputs we may want to change 
> the return type so that getSideOutput can only be called after a process 
> function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7954) sideoutput in async function

2017-11-20 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16260265#comment-16260265
 ] 

Chen Qin commented on FLINK-7954:
-

[~phoenixjiangnan] Thanks for quick response. I think we can always starts with 
RichAsyncFunction which user expect to more control when they build own logic 
on top. I will leave [~aljoscha] for additional comments since I haven't look 
into detail how asyncIO implemented. My concern is if sideoutput should queued 
along with with main output when user call orderedwait. 

> sideoutput in async function
> 
>
> Key: FLINK-7954
> URL: https://issues.apache.org/jira/browse/FLINK-7954
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.3.2
> Environment: similar to FLINK-7635,adding support of sideoutput to 
> asynFunction 
>Reporter: Chen Qin
>Assignee: Bowen Li
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7954) sideoutput in async function

2017-10-31 Thread Chen Qin (JIRA)
Chen Qin created FLINK-7954:
---

 Summary: sideoutput in async function
 Key: FLINK-7954
 URL: https://issues.apache.org/jira/browse/FLINK-7954
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.2
 Environment: similar to FLINK-7635,adding support of sideoutput to 
asynFunction 

Reporter: Chen Qin
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7635) support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction

2017-09-18 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16170922#comment-16170922
 ] 

Chen Qin commented on FLINK-7635:
-

[~phoenixjiangnan] there are good reference in PorcessFunction.Context 
especially in unit tests and comments. Also might consider how to surface 
OutputTag error message to user where late arriving events conflict with ones 
from ProcessWindowFunction possible.


> support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction
> --
>
> Key: FLINK-7635
> URL: https://issues.apache.org/jira/browse/FLINK-7635
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Chen Qin
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0, 1.3.3
>
>
> [FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only 
> implemented output to ProcessFunction Context. It would be nice to add 
> support to ProcessWindow and ProcessAllWindow functions as well. [email 
> threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]
> [~aljoscha] I thought this is good warm up task for ppl to learn how window 
> function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7635) support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction

2017-09-17 Thread Chen Qin (JIRA)
Chen Qin created FLINK-7635:
---

 Summary: support sideOutput in ProcessWindowFunciton & 
ProcessAllWindowFunction
 Key: FLINK-7635
 URL: https://issues.apache.org/jira/browse/FLINK-7635
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Scala API
Reporter: Chen Qin
Priority: Minor
 Fix For: 1.4.0, 1.3.3


[FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only implemented 
output to ProcessFunction Context. It would be nice to add support to 
ProcessWindow and ProcessAllWindow functions as well. [email 
threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]

[~aljoscha] I thought this is good warm up task for ppl to learn how window 
function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7106) Make SubmittedJobGraphStore implementation configurable

2017-07-06 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin reassigned FLINK-7106:
---

Assignee: Chen Qin

> Make SubmittedJobGraphStore implementation configurable
> ---
>
> Key: FLINK-7106
> URL: https://issues.apache.org/jira/browse/FLINK-7106
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib, Local Runtime
>Reporter: Chen Qin
>Assignee: Chen Qin
>
> Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user 
> choose HAMode. The goal of this task is to allow user build their own 
> implementation and pass configuration from flink.conf and define how/where 
> those information stores. (e.g rocksdb statebackend)
> P.S I think this would be interesting to see how flink in HA mode can 
> fallback to Standalone when zk suffers temp outages.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7106) Make SubmittedJobGraphStore implementation configurable

2017-07-05 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-7106:

Summary: Make SubmittedJobGraphStore implementation configurable  (was: 
Make SubmittedJobGraphStore configurable)

> Make SubmittedJobGraphStore implementation configurable
> ---
>
> Key: FLINK-7106
> URL: https://issues.apache.org/jira/browse/FLINK-7106
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib, Local Runtime
>Reporter: Chen Qin
>
> Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user 
> choose HAMode. The goal of this task is to allow user build their own 
> implementation and pass configuration from flink.conf and define how/where 
> those information stores. (e.g rocksdb statebackend)
> P.S I think this would be interesting to see how flink in HA mode can 
> fallback to Standalone when zk suffers temp outages.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7106) Make SubmittedJobGraphStore configurable

2017-07-05 Thread Chen Qin (JIRA)
Chen Qin created FLINK-7106:
---

 Summary: Make SubmittedJobGraphStore configurable
 Key: FLINK-7106
 URL: https://issues.apache.org/jira/browse/FLINK-7106
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib, Local Runtime
Reporter: Chen Qin


Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user 
choose HAMode. The goal of this task is to allow user build their own 
implementation and pass configuration from flink.conf and define how/where 
those information stores. (e.g rocksdb statebackend)

P.S I think this would be interesting to see how flink in HA mode can fallback 
to Standalone when zk suffers temp outages.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-6085) flink as micro service

2017-03-28 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945991#comment-15945991
 ] 

Chen Qin edited comment on FLINK-6085 at 3/29/17 3:04 AM:
--

To unlock scenarios like blocking rpc call or async callback, I am currently 
thinking of way to connecting web front directly to pipeline in normal case.  
Use some kind of durable buffer to store requests since last successful 
checkpoint against failure scenario.

I think what we can do at this point is to assume client will do retry after 
connection failure and flink as a micro service maintain at least once 
semantics. So the problem simplified to implement a web front source and 
feedback loop from sink to source & locate pending connection to response.

What do you think [~till.rohrmann] [~tudandan]

 


was (Author: foxss):
To unlock scenarios like blocking rpc call or async callback, I am currently 
thinking of way to get rid of using distributed queue by connecting web front 
directly to pipeline.

I put a bit more thoughts on this topic, exact once seems really hard to 
achieve through rpc source. Same issue as using web front ingestion to 
distributed queue at a matter of fact. Clients can do arbitrary retry within 
long time span.

I think what we can do at this point is to assume client will do retry after 
connection failure and flink as a micro service maintain at least once 
semantics. So the problem simplified to implement a web front source and 
feedback loop from sink to source & locate pending connection to response.

What do you think [~till.rohrmann] [~tudandan]

 

> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6085) flink as micro service

2017-03-28 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15945991#comment-15945991
 ] 

Chen Qin commented on FLINK-6085:
-

To unlock scenarios like blocking rpc call or async callback, I am currently 
thinking of way to get rid of using distributed queue by connecting web front 
directly to pipeline.

I put a bit more thoughts on this topic, exact once seems really hard to 
achieve through rpc source. Same issue as using web front ingestion to 
distributed queue at a matter of fact. Clients can do arbitrary retry within 
long time span.

I think what we can do at this point is to assume client will do retry after 
connection failure and flink as a micro service maintain at least once 
semantics. So the problem simplified to implement a web front source and 
feedback loop from sink to source & locate pending connection to response.

What do you think [~till.rohrmann] [~tudandan]

 

> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6085) flink as micro service

2017-03-24 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15939209#comment-15939209
 ] 

Chen Qin edited comment on FLINK-6085 at 3/24/17 8:32 PM:
--

I would like to see if we can agree on high level first. Service is primarily 
rpc interaction with horizontal scalability and latency requirements.

Current way of bridge service with streaming pipeline via distributed Queue 
provides benefit of failure resilience and topic reuse at cost of extra 
hardware/software and latency, also no callback support.

[~till.rohrmann]

updates

Briefly chatted offline with Maxim, it seems a bit hard to work around 
distributed queue consider pipeline can restart and offset rewind anytime, loss 
of insertion events is not acceptable(query might be fine but seems flink 
already address this issue with query able states)

To echo Till's comments, yes, custom code could track those requests. Future 
question is if we can have a specific sink implementation which can reroute 
results to a specific rpc hosts (e.g http response or callback). 




was (Author: foxss):
I would like to see if we can agree on high level first. Service is primarily 
rpc interaction with horizontal scalability and latency requirements.

Current way of bridge service with streaming pipeline via distributed Queue 
provides benefit of failure resilience and topic reuse at cost of extra 
hardware/software and latency, also no callback support.

[~till.rohrmann]


> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6085) flink as micro service

2017-03-24 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-6085:

Description: 
Track discussion around run flink as a micro service, includes but not limited 
to 
- RPC (web service endpoint) source
  as web service endpoint accept RPC call, ingest to the streaming job(only one)
- callback mechanism 
- task assignment should honor deployment group (web tier hosts should be 
isolated from rest of task assignment)


https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing

  was:
Track discussion around run flink as a micro service, includes but not limited 
to 
- RPC (web service endpoint) source
  as web service endpoint accept RPC call, ingest to the streaming job(only one)
- callback mechanism 
- task assignment should honor deployment group (web tier hosts should be 
isolated from rest of task assignment)



> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)
> https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6085) flink as micro service

2017-03-23 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-6085:

Attachment: Untitled document.jpg

I would like to see if we can agree on high level first. Service is primarily 
rpc interaction with horizontal scalability and latency requirements.

Current way of bridge service with streaming pipeline via distributed Queue 
provides benefit of failure resilience and topic reuse at cost of extra 
hardware/software and latency, also no callback support.

[~till.rohrmann]


> flink as micro service
> --
>
> Key: FLINK-6085
> URL: https://issues.apache.org/jira/browse/FLINK-6085
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, JobManager
>Reporter: Chen Qin
>Priority: Minor
> Attachments: Untitled document.jpg
>
>
> Track discussion around run flink as a micro service, includes but not 
> limited to 
> - RPC (web service endpoint) source
>   as web service endpoint accept RPC call, ingest to the streaming job(only 
> one)
> - callback mechanism 
> - task assignment should honor deployment group (web tier hosts should be 
> isolated from rest of task assignment)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6113) Implement split/select with OutputTag

2017-03-18 Thread Chen Qin (JIRA)
Chen Qin created FLINK-6113:
---

 Summary: Implement split/select with OutputTag
 Key: FLINK-6113
 URL: https://issues.apache.org/jira/browse/FLINK-6113
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.0
Reporter: Chen Qin
Priority: Minor


With completion of FLINK-4460(side outputs), this is one of follow up item 
towards deprecate string tag based split/select with OutputTag based 
split/select.

In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6085) flink as micro service

2017-03-16 Thread Chen Qin (JIRA)
Chen Qin created FLINK-6085:
---

 Summary: flink as micro service
 Key: FLINK-6085
 URL: https://issues.apache.org/jira/browse/FLINK-6085
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, JobManager
Reporter: Chen Qin
Priority: Minor


Track discussion around run flink as a micro service, includes but not limited 
to 
- RPC (web service endpoint) source
  as web service endpoint accept RPC call, ingest to the streaming job(only one)
- callback mechanism 
- task assignment should honor deployment group (web tier hosts should be 
isolated from rest of task assignment)




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4266) Remote Database Statebackend

2017-02-07 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15856972#comment-15856972
 ] 

Chen Qin commented on FLINK-4266:
-

When this jira was filed, there are multiple issues around incremental 
checkpointing support as well as scaling non partitioned states of long running 
jobs.

With completion of dynamic scaling of non partitioned states FLINK-4379, 
scaling large states dynamically no longer becomes a blocker. Flink is also 
working on a incremental checkpointing design which likely address large states 
checkpointing performance 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/States-split-over-to-external-storage-td15344.html#none

In light of all these efforts, I would like to limit effort of this jira to 
serve as a split over of rocksdb statebackend. updated states will be batched 
writes to remote db during checkpointing phase. When jobs running in multiple 
data centers in parallel, remote split over would enable source fail over 
without loss states nor manual file movements which introduce latency & errors


> Remote Database Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Assignee: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4266) Cassandra SplitOver Statebackend

2017-02-07 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-4266:

Summary: Cassandra SplitOver Statebackend  (was: Remote Database 
Statebackend)

> Cassandra SplitOver Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Assignee: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4266) Remote Database Statebackend

2017-02-07 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin reassigned FLINK-4266:
---

 Assignee: Chen Qin
Affects Version/s: (was: 1.2.0)
   1.3.0

> Remote Database Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Chen Qin
>Assignee: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (FLINK-4460) Side Outputs in Flink

2017-02-07 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin reassigned FLINK-4460:
---

Assignee: Chen Qin

> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>Assignee: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-4266) Remote Database Statebackend

2017-01-16 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-4266:

Affects Version/s: (was: 1.0.3)

> Remote Database Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4460) Side Outputs in Flink

2016-12-02 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15717166#comment-15717166
 ] 

Chen Qin commented on FLINK-4460:
-

Ongoing implementation reflecting feedbacks, missing window stream impl
https://github.com/apache/flink/compare/master...chenqin:flip

> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>  Labels: latearrivingevents, sideoutput
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4460) Side Outputs in Flink

2016-10-24 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-4460:

Affects Version/s: 1.1.3
   Labels: latearrivingevents sideoutput  (was: )
 Priority: Major  (was: Minor)
Fix Version/s: (was: 1.2.0)
   1.1.4
  Description: 
https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing
   Issue Type: New Feature  (was: Improvement)
  Summary: Side Outputs in Flink  (was: Expose Late Arriving Events)

> Side Outputs in Flink
> -
>
> Key: FLINK-4460
> URL: https://issues.apache.org/jira/browse/FLINK-4460
> Project: Flink
>  Issue Type: New Feature
>  Components: Core, DataStream API
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Chen Qin
>  Labels: latearrivingevents, sideoutput
> Fix For: 1.1.4
>
>
> https://docs.google.com/document/d/1vg1gpR8JL4dM07Yu4NyerQhhVvBlde5qdqnuJv4LcV4/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4460) Expose Late Arriving Events

2016-08-23 Thread Chen Qin (JIRA)
Chen Qin created FLINK-4460:
---

 Summary: Expose Late Arriving Events
 Key: FLINK-4460
 URL: https://issues.apache.org/jira/browse/FLINK-4460
 Project: Flink
  Issue Type: Improvement
  Components: Core, DataStream API
Affects Versions: 1.2.0
Reporter: Chen Qin
Priority: Minor
 Fix For: 1.2.0






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4266) Remote Database Statebackend

2016-08-05 Thread Chen Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410474#comment-15410474
 ] 

Chen Qin commented on FLINK-4266:
-

https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3YR2xLdk/edit?usp=sharing


> Remote Database Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3, 1.2.0
>Reporter: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4266) Remote Storage Statebackend

2016-08-05 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-4266:

Description: 
Current FileSystem statebackend limits whole state size to disk space. Dealing 
with scale out checkpoint states beyond local disk space such as long running 
task that hold window content for long period of time. Pipelines needs to split 
out states to durable remote storage even replicated to different data centers.

We draft a design that leverage checkpoint id as mono incremental logic 
timestamp and perform range query to get evicited state k/v. we also introduce 
checkpoint time commit and eviction threshold that reduce "hot states" hitting 
remote db per every update between adjacent checkpoints by tracking update logs 
and merge them, do batch updates only when checkpoint; lastly, we are looking 
for eviction policy that can identify "hot keys" in k/v state and lazy load 
those "cold keys" from remote storage(e.g Cassandra).

For now, we don't have good story regarding to data retirement. We might have 
to keep forever until manually run command and clean per job related state 
data. Some of features might related to incremental checkpointing feature, we 
hope to align with effort there also.

Welcome comments, I will try to put a draft design doc after gathering some 
feedback.







  was:
Current FileSystem statebackend limits whole state size to disk space. 
For long running task that hold window content for long period of time, it 
needs to split out states to durable remote storage and replicated across data 
centers.

We look into implementation from leverage checkpoint timestamp as versioning 
and do range query to get current state; we also want to reduce "hot states" 
hitting remote db per every update between adjacent checkpoints by tracking 
update logs and merge them, do batch updates only when checkpoint; lastly, we 
are looking for eviction policy that can identify "hot keys" in k/v state and 
lazy load those "cold keys" from Cassandra.

For now, we don't have good story regarding to data retirement. We might have 
to keep forever until manually run command and clean per job related state 
data. Some of features might related to incremental checkpointing feature, we 
hope to align with effort there also.

Welcome comments, I will try to put a draft design doc after gathering some 
feedback.








> Remote Storage Statebackend
> ---
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3, 1.2.0
>Reporter: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4266) Remote Database Statebackend

2016-08-05 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-4266:

Summary: Remote Database Statebackend  (was: Remote Storage Statebackend)

> Remote Database Statebackend
> 
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3, 1.2.0
>Reporter: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> Dealing with scale out checkpoint states beyond local disk space such as long 
> running task that hold window content for long period of time. Pipelines 
> needs to split out states to durable remote storage even replicated to 
> different data centers.
> We draft a design that leverage checkpoint id as mono incremental logic 
> timestamp and perform range query to get evicited state k/v. we also 
> introduce checkpoint time commit and eviction threshold that reduce "hot 
> states" hitting remote db per every update between adjacent checkpoints by 
> tracking update logs and merge them, do batch updates only when checkpoint; 
> lastly, we are looking for eviction policy that can identify "hot keys" in 
> k/v state and lazy load those "cold keys" from remote storage(e.g Cassandra).
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4266) Remote Storage Statebackend

2016-08-05 Thread Chen Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen Qin updated FLINK-4266:

Summary: Remote Storage Statebackend  (was: Cassandra StateBackend)

> Remote Storage Statebackend
> ---
>
> Key: FLINK-4266
> URL: https://issues.apache.org/jira/browse/FLINK-4266
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.0.3, 1.2.0
>Reporter: Chen Qin
>Priority: Minor
>
> Current FileSystem statebackend limits whole state size to disk space. 
> For long running task that hold window content for long period of time, it 
> needs to split out states to durable remote storage and replicated across 
> data centers.
> We look into implementation from leverage checkpoint timestamp as versioning 
> and do range query to get current state; we also want to reduce "hot states" 
> hitting remote db per every update between adjacent checkpoints by tracking 
> update logs and merge them, do batch updates only when checkpoint; lastly, we 
> are looking for eviction policy that can identify "hot keys" in k/v state and 
> lazy load those "cold keys" from Cassandra.
> For now, we don't have good story regarding to data retirement. We might have 
> to keep forever until manually run command and clean per job related state 
> data. Some of features might related to incremental checkpointing feature, we 
> hope to align with effort there also.
> Welcome comments, I will try to put a draft design doc after gathering some 
> feedback.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)