Re: [Discussion] externalize Hive connector

2023-01-19 Thread Chen Qin
Hi Yuxia,

FLINK-30667 <https://issues.apache.org/jira/browse/FLINK-30667> aims to
address hive-connector dependency on table-planner @internal class .  Here
is a bit of rationale.

ParserImpl in table-planner and HiveParser in hive connector should be able
to evolve separately after future externalization; Parser interface on
other hand should keep consistent when community plan for other dialect
support. This would help decouple one of the internal class dependencies
you mentioned in FLINK-26603
<https://issues.apache.org/jira/browse/FLINK-26603>.

QueryOperation is a simple operator class shared between table-planner as
well as hive-connector. Keep a copy instead of announcing public in
table-planner could decouple hive connectors from table-planner change or
future proof when considering more customized operations.

PlannerContext acts as a utility class providing reference to a group of
components. we could make it publicEvolving.

Let me know what you think.

Chen



On Tue, Jan 10, 2023 at 8:34 PM yuxia  wrote:

> Hi, Chen.
> Cool! Of course not. Much appreciated that you can help continue this work.
> If you have any question about it, please let me know.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Chen Qin" 
> 收件人: "dev" 
> 发送时间: 星期三, 2023年 1 月 11日 上午 11:24:48
> 主题: Re: [Discussion] externalize Hive connector
>
> Yuxia,
>
> thanks for details context, I see with your point on maintainability. If
> your don’t mind, I would like to help continue the work you mentioned.
>
> Chen
>
> On Tue, Jan 10, 2023 at 03:47 yuxia  wrote:
>
> > Hi, Chen.
> > Appreciated your efforts.
> > I would like to shared my thonghts about it.
> >
> > I do think it's not a good timing to externalize Hive connector right
> now.
> > As you know, the Hive connector relies some internal methods in
> > flink-table-planner. The Flink devs can
> > modify them with more freedom as they are internal methods with providing
> > any compatibility gurantee.
> > If externalize it now, it may well happen that some methods relied by
> Hive
> > connector have been removed/renamed from flink-table-planner, which will
> > then cause the Hive connector fail.
> > It'll bring much burden to the connector maintainer, the maintainer will
> > need to take much care of these changes and do the adaption. As I see,
> > it'll always end with only some users report this issue, did the
> maintainer
> > notice that. It'll be definitely a bad user experience.
> >
> > It's a histrical technical debt that Hive connector relies the internal
> > methods in flink-table-planner, and we have created FLINK-26603[1] to
> > resolve it.
> > I'm intended to finish it in Flink 1.17, but may well fail to catch up
> > with Flink 1.17. Feel sorry for that for I'm busy with other stuff. If
> you
> > have interest about it, appreciated that you can also take part in this
> > jira.
> >
> > Of course you can just create a seperate Hive connector now, and then
> > cherry pick changes from the Flink repo before the Hive connector can be
> > removed from Flink repo. But I'm afraid that it may still take much
> > efforts. And IIUC, the external Hive connector can't be released as
> offical
> > Hive connector unless we resolve the problem I refered above.
> > So, why not wait any thing is ready before externalize the Hive
> connector?
> >
> > Best regards,
> > Yuxia
> >
> > - 原始邮件 -
> > 发件人: "Chen Qin" 
> > 收件人: "dev" 
> > 发送时间: 星期二, 2023年 1 月 10日 上午 11:47:02
> > 主题: Re: [Discussion] externalize Hive connector
> >
> > Hi Martijin,
> >
> > Thank you for sharing your thoughts. In my opinion, FLINK-26603 is no
> > longer blocking FLINK-30064.
> > That being said, we could consider keeping FLINK-26603 as follow-up work
> > and breaking it down into smaller tasks as we proceed.
> >
> > [PR] https://github.com/apache/flink-connector-hive/pull/3
> >
> > Chen
> >
> > On Mon, Jan 9, 2023 at 7:45 AM Martijn Visser 
> > wrote:
> >
> > > Hi Chen,
> > >
> > > Thanks for bringing this up! I think it would be great if the Hive
> > > connector is externalized. We've already previously established [1]
> that
> > it
> > > should be externalized. I believe the only reason why this hasn't been
> > done
> > > yet is because it's blocked by
> > > https://issues.apache.org/jira/browse/FLINK-26603. Is that still the
> > case?
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > [1] https

[jira] [Created] (FLINK-30667) remove the planner dependency in flink-connector-hive

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30667:


 Summary:  remove the planner dependency in flink-connector-hive
 Key: FLINK-30667
 URL: https://issues.apache.org/jira/browse/FLINK-30667
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


There are some classes in flink-connector-hive reply on  planner, but 
fortunately, not too many.

It mainly rely on ParserImpl, PlannerContext, PlannerQueryOperation and so on.  
The dependency is mainly required to create RelNode.

To resolve this problem,  we need more abstraction for planner and provides 
public API for external dialects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30664) [Connector/Hive] cleanup hive/haoop package ambiguous package dependencies

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30664:


 Summary: [Connector/Hive] cleanup hive/haoop package ambiguous 
package dependencies
 Key: FLINK-30664
 URL: https://issues.apache.org/jira/browse/FLINK-30664
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


hive and hive-metastore combination introduced multiple versions of dependency 
packages, the goal is to ensure hive-connector has deterministic dependency 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30660) move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30660:


 Summary: move SQLClientHiveITCase and TestHiveCatalogFactory to 
flink-connector-hive e2e
 Key: FLINK-30660
 URL: https://issues.apache.org/jira/browse/FLINK-30660
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Tests
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


move SQLClientHiveITCase and TestHiveCatalogFactory to flink-connector-hive e2e

[https://github.com/apache/flink/pull/16532/files#]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30659) move Flink-sql-parser-hive to flink-connector-hive

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30659:


 Summary: move Flink-sql-parser-hive to flink-connector-hive
 Key: FLINK-30659
 URL: https://issues.apache.org/jira/browse/FLINK-30659
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


Hive Parser should stay with hive connector and maintained together. During 
runtime, those package should load/unload together.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30658) remove Flink-sql-parser-hive dependency on table-planner

2023-01-12 Thread Chen Qin (Jira)
Chen Qin created FLINK-30658:


 Summary: remove Flink-sql-parser-hive dependency on table-planner
 Key: FLINK-30658
 URL: https://issues.apache.org/jira/browse/FLINK-30658
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Planner
Affects Versions: 1.17.0
Reporter: Chen Qin
 Fix For: 1.17.0


In order to move Flink-sql-parser-hive out of Flink-table, we need to remove 
Flink-sql-parser-hive package dependency in Flink-table-planner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [Discussion] externalize Hive connector

2023-01-10 Thread Chen Qin
Yuxia,

thanks for details context, I see with your point on maintainability. If
your don’t mind, I would like to help continue the work you mentioned.

Chen

On Tue, Jan 10, 2023 at 03:47 yuxia  wrote:

> Hi, Chen.
> Appreciated your efforts.
> I would like to shared my thonghts about it.
>
> I do think it's not a good timing to externalize Hive connector right now.
> As you know, the Hive connector relies some internal methods in
> flink-table-planner. The Flink devs can
> modify them with more freedom as they are internal methods with providing
> any compatibility gurantee.
> If externalize it now, it may well happen that some methods relied by Hive
> connector have been removed/renamed from flink-table-planner, which will
> then cause the Hive connector fail.
> It'll bring much burden to the connector maintainer, the maintainer will
> need to take much care of these changes and do the adaption. As I see,
> it'll always end with only some users report this issue, did the maintainer
> notice that. It'll be definitely a bad user experience.
>
> It's a histrical technical debt that Hive connector relies the internal
> methods in flink-table-planner, and we have created FLINK-26603[1] to
> resolve it.
> I'm intended to finish it in Flink 1.17, but may well fail to catch up
> with Flink 1.17. Feel sorry for that for I'm busy with other stuff. If you
> have interest about it, appreciated that you can also take part in this
> jira.
>
> Of course you can just create a seperate Hive connector now, and then
> cherry pick changes from the Flink repo before the Hive connector can be
> removed from Flink repo. But I'm afraid that it may still take much
> efforts. And IIUC, the external Hive connector can't be released as offical
> Hive connector unless we resolve the problem I refered above.
> So, why not wait any thing is ready before externalize the Hive connector?
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Chen Qin" 
> 收件人: "dev" 
> 发送时间: 星期二, 2023年 1 月 10日 上午 11:47:02
> 主题: Re: [Discussion] externalize Hive connector
>
> Hi Martijin,
>
> Thank you for sharing your thoughts. In my opinion, FLINK-26603 is no
> longer blocking FLINK-30064.
> That being said, we could consider keeping FLINK-26603 as follow-up work
> and breaking it down into smaller tasks as we proceed.
>
> [PR] https://github.com/apache/flink-connector-hive/pull/3
>
> Chen
>
> On Mon, Jan 9, 2023 at 7:45 AM Martijn Visser 
> wrote:
>
> > Hi Chen,
> >
> > Thanks for bringing this up! I think it would be great if the Hive
> > connector is externalized. We've already previously established [1] that
> it
> > should be externalized. I believe the only reason why this hasn't been
> done
> > yet is because it's blocked by
> > https://issues.apache.org/jira/browse/FLINK-26603. Is that still the
> case?
> >
> > Best regards,
> >
> > Martijn
> >
> > [1] https://lists.apache.org/thread/bk9f91o6wk66zdh353j1n7sfshh262tr
> >
> > On Mon, Jan 9, 2023 at 4:22 PM Chen Qin  wrote:
> >
> > > Hi there,
> > >
> > > Following community guidance Externalized+Connector+development
> > > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
> >,
> > We
> > > would like to initiate discussion on moving connector/hive to
> > > apache/flink-connector-hive
> > > <https://github.com/apache/flink-connector-hive>.
> > >
> > > Currently proposed changes includes
> > >
> > >- cleanup dependencies introduced from hive/yarn dependencies with
> > >latest package version stated in properties section in POM file
> > >- add FlinkPlannerCalciteShim to handle PlannerCalcite API signature
> > >changes from 1.16 v.s 1.17-SNAPSHOT
> > >- add PackageITTests and ProductionArchitectureTests
> > >- [bonus] adding docker e2e tests with list of supported Hive/HMS
> > >versions
> > >
> > > Risk associated with this change includes not being able to release
> until
> > > 1.17 release, so we would have to keep cherry-pick changes from
> > > flink/connectors/hive for a period of time.
> > >
> > > Looking forward to hearing community feedback.
> > >
> > > Chen
> > >
> >
>


Re: [Discussion] externalize Hive connector

2023-01-09 Thread Chen Qin
Hi Martijin,

Thank you for sharing your thoughts. In my opinion, FLINK-26603 is no
longer blocking FLINK-30064.
That being said, we could consider keeping FLINK-26603 as follow-up work
and breaking it down into smaller tasks as we proceed.

[PR] https://github.com/apache/flink-connector-hive/pull/3

Chen

On Mon, Jan 9, 2023 at 7:45 AM Martijn Visser 
wrote:

> Hi Chen,
>
> Thanks for bringing this up! I think it would be great if the Hive
> connector is externalized. We've already previously established [1] that it
> should be externalized. I believe the only reason why this hasn't been done
> yet is because it's blocked by
> https://issues.apache.org/jira/browse/FLINK-26603. Is that still the case?
>
> Best regards,
>
> Martijn
>
> [1] https://lists.apache.org/thread/bk9f91o6wk66zdh353j1n7sfshh262tr
>
> On Mon, Jan 9, 2023 at 4:22 PM Chen Qin  wrote:
>
> > Hi there,
> >
> > Following community guidance Externalized+Connector+development
> > <
> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development>,
> We
> > would like to initiate discussion on moving connector/hive to
> > apache/flink-connector-hive
> > <https://github.com/apache/flink-connector-hive>.
> >
> > Currently proposed changes includes
> >
> >- cleanup dependencies introduced from hive/yarn dependencies with
> >latest package version stated in properties section in POM file
> >- add FlinkPlannerCalciteShim to handle PlannerCalcite API signature
> >changes from 1.16 v.s 1.17-SNAPSHOT
> >- add PackageITTests and ProductionArchitectureTests
> >- [bonus] adding docker e2e tests with list of supported Hive/HMS
> >versions
> >
> > Risk associated with this change includes not being able to release until
> > 1.17 release, so we would have to keep cherry-pick changes from
> > flink/connectors/hive for a period of time.
> >
> > Looking forward to hearing community feedback.
> >
> > Chen
> >
>


[Discussion] externalize Hive connector

2023-01-09 Thread Chen Qin
Hi there,

Following community guidance Externalized+Connector+development
,
We
would like to initiate discussion on moving connector/hive to
apache/flink-connector-hive 
.

Currently proposed changes includes

   - cleanup dependencies introduced from hive/yarn dependencies with
   latest package version stated in properties section in POM file
   - add FlinkPlannerCalciteShim to handle PlannerCalcite API signature
   changes from 1.16 v.s 1.17-SNAPSHOT
   - add PackageITTests and ProductionArchitectureTests
   - [bonus] adding docker e2e tests with list of supported Hive/HMS
   versions

Risk associated with this change includes not being able to release until
1.17 release, so we would have to keep cherry-pick changes from
flink/connectors/hive for a period of time.

Looking forward to hearing community feedback.

Chen


[jira] [Created] (FLINK-30362) Flink-connector-hive can't build with maven 3.8

2022-12-11 Thread Chen Qin (Jira)
Chen Qin created FLINK-30362:


 Summary: Flink-connector-hive can't build with maven 3.8
 Key: FLINK-30362
 URL: https://issues.apache.org/jira/browse/FLINK-30362
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.15.3, 1.16.0, 1.17.0
 Environment: install maven 3.8.1+

git clone flink repo

run mvn clean pcakge
Reporter: Chen Qin
 Fix For: 1.17.0, 1.16.1, 1.15.4


Flink connector hive pull in hive-exec  which depends on 
org.pentaho:pentaho-aggdesigner-algorithm in blocked jboss mirror. 

This is cve related issue which blockeds upgrade to maven 3.8.1+

[https://maven.apache.org/docs/3.8.1/release-notes.html#cve-2021-26291]

 
{code:java}
[ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could not 
resolve dependencies for project 
org.apache.flink:flink-connector-hive_2.12:jar:1.17-SNAPSHOT: Failed to collect 
dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> 
org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read 
artifact descriptor for 
org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer 
artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to 
maven-default-http-blocker (http://0.0.0.0/): Blocked mirror for repositories: 
[repository.jboss.org 
(http://repository.jboss.org/nexus/content/groups/public/, default, disabled), 
conjars (http://conjars.org/repo, default, releases+snapshots), 
apache.snapshots (http://repository.apache.org/snapshots, default, snapshots)] 
-> [Help 1]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Table API thrift support

2022-12-06 Thread Chen Qin
Hi Martjin,

We would like to propose split externalizing flink-connector-hive
(connector) from
supporting thrift format (new format encode/decode).

As we discussed, Externalized+Connector+development
<https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development>,
Hive connector master might need a shim to be backward compatible with
release-1.16.
Do we have examples handy?
code I play with chenqin/connector-hive
<https://github.com/chenqin/connector-hive.git>

Chen

On Mon, Nov 28, 2022 at 3:05 AM Martijn Visser 
wrote:

> Hi Chen,
>
> Everything on connector externalization is documented at
>
> https://cwiki.apache.org/confluence/display/FLINK/Externalized+Connector+development
> ,
> including the links to the relevant discussions on that topic in the
> community.
>
> Thanks,
>
> Martijn
>
> On Mon, Nov 28, 2022 at 11:59 AM Chen Qin  wrote:
>
> > Hi Martijn,
> >
> > I feel our proposal “shading libthrift in hive connector” seems pinching
> a
> > new problem “how to externalization  connectors”. I assume there might be
> > some discussion in community already.  If so please kindly pass some
> > contexts.
> >
> > I would incline take back shading proposal at this point. If user choose
> to
> > use flink hive connector and thrift format, they should be responsible to
> > keep libthrift version in sync.
> >
> > Chen
> >
> >
> >
> > On Mon, Nov 28, 2022 at 00:27 Martijn Visser 
> > wrote:
> >
> > > Hi Chen,
> > >
> > > While I agree that Hive Metastore is a crucial component for a lot of
> > > companies, this isn't the case for all companies. Right now it sounds
> > like
> > > Flink has to take on tech debt because users of Flink are running on
> > older
> > > versions of the Hive Metastore. I don't think that's a good idea at
> all.
> > > Like I said, we want to externalize the Hive connector so there's no
> root
> > > level config then available anymore. How would it then work?
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Sun, Nov 27, 2022 at 4:02 AM Chen Qin  wrote:
> > >
> > > > Hi Martjin,
> > > >
> > > > "shading Thrift libraries from the Hive connector"
> > > > Hivemetastore is foundational software running in many companies used
> > by
> > > > Spark/Flink... etc. Upgrading the hive metastore touches many pieces
> of
> > > > data engineering. If the user updates flink job jar dependency to the
> > > > latest 0.17, it would not guarantee both HMS and jar would work
> > properly.
> > > > and yes, 0.5-p6 is unfortunate internal tech debt we would work on
> > > outside
> > > > of this FLIP work.
> > > >
> > > > "KafkaSource and KafkaSink"
> > > > sounds good, this part seems outdated.
> > > >
> > > > "explain how a Thrift schema can be compiled/used in a SQL"
> > > > I see, our approach requires extra schema gen and jar load compared
> to
> > > > proto-buf implementation.  Our internal implementation contains a
> > schema
> > > > inference patch that got moved out of this FLIP document. I agree it
> > > might
> > > > be worth removing compile requirement for ease of use.
> > > >
> > > > Chen
> > > >
> > > >
> > > > On Wed, Nov 23, 2022 at 6:42 AM Martijn Visser <
> > martijnvis...@apache.org
> > > >
> > > > wrote:
> > > >
> > > > > Hi Chen,
> > > > >
> > > > > I'm a bit skeptical of shading Thrift libraries from the Hive
> > > connector,
> > > > > especially with the plans to externalize connectors (including
> Hive).
> > > > Have
> > > > > we considered getting the versions in sync to avoid the need of any
> > > > > shading?
> > > >
> > > >
> > > > > The FLIP also shows a version of Thrift (0.5.0-p6) that I don't see
> > in
> > > > > Maven central, but the latest version there is 0.17.0. We should
> > > support
> > > > > the latest version. Do you know when Thrift expects to reach a
> major
> > > > > version? I'm not too fond of not having any major
> > version/compatibility
> > > > > guarantees.
> > > > >
> > > > > The FLIP mentions FlinkKafkaConsumer and FlinkKafka

Re: Table API thrift support

2022-11-28 Thread Chen Qin
Hi Martijn,

I feel our proposal “shading libthrift in hive connector” seems pinching a
new problem “how to externalization  connectors”. I assume there might be
some discussion in community already.  If so please kindly pass some
contexts.

I would incline take back shading proposal at this point. If user choose to
use flink hive connector and thrift format, they should be responsible to
keep libthrift version in sync.

Chen



On Mon, Nov 28, 2022 at 00:27 Martijn Visser 
wrote:

> Hi Chen,
>
> While I agree that Hive Metastore is a crucial component for a lot of
> companies, this isn't the case for all companies. Right now it sounds like
> Flink has to take on tech debt because users of Flink are running on older
> versions of the Hive Metastore. I don't think that's a good idea at all.
> Like I said, we want to externalize the Hive connector so there's no root
> level config then available anymore. How would it then work?
>
> Best regards,
>
> Martijn
>
> On Sun, Nov 27, 2022 at 4:02 AM Chen Qin  wrote:
>
> > Hi Martjin,
> >
> > "shading Thrift libraries from the Hive connector"
> > Hivemetastore is foundational software running in many companies used by
> > Spark/Flink... etc. Upgrading the hive metastore touches many pieces of
> > data engineering. If the user updates flink job jar dependency to the
> > latest 0.17, it would not guarantee both HMS and jar would work properly.
> > and yes, 0.5-p6 is unfortunate internal tech debt we would work on
> outside
> > of this FLIP work.
> >
> > "KafkaSource and KafkaSink"
> > sounds good, this part seems outdated.
> >
> > "explain how a Thrift schema can be compiled/used in a SQL"
> > I see, our approach requires extra schema gen and jar load compared to
> > proto-buf implementation.  Our internal implementation contains a schema
> > inference patch that got moved out of this FLIP document. I agree it
> might
> > be worth removing compile requirement for ease of use.
> >
> > Chen
> >
> >
> > On Wed, Nov 23, 2022 at 6:42 AM Martijn Visser  >
> > wrote:
> >
> > > Hi Chen,
> > >
> > > I'm a bit skeptical of shading Thrift libraries from the Hive
> connector,
> > > especially with the plans to externalize connectors (including Hive).
> > Have
> > > we considered getting the versions in sync to avoid the need of any
> > > shading?
> >
> >
> > > The FLIP also shows a version of Thrift (0.5.0-p6) that I don't see in
> > > Maven central, but the latest version there is 0.17.0. We should
> support
> > > the latest version. Do you know when Thrift expects to reach a major
> > > version? I'm not too fond of not having any major version/compatibility
> > > guarantees.
> > >
> > > The FLIP mentions FlinkKafkaConsumer and FlinkKafkaProducer; these are
> > > deprecated and should not be implemented, only KafkaSource and
> KafkaSink.
> > >
> > > Can you explain how a Thrift schema can be compiled/used in a SQL
> > > application, like also is done for Protobuf?
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/protobuf/
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Tue, Nov 22, 2022 at 6:44 PM Chen Qin  wrote:
> > >
> > > > Hi Yuxia, Martijin,
> > > >
> > > > Thanks for your feedback on FLIP-237!
> > > > My understanding is that FLIP-237 better focuses on thrift
> > > > encoding/decoding in Datastream/Table API/ Pyflink.
> > > > To address feedbacks, I made follow changes to FLIP-237
> > > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-237%3A+Thrift+Format+Support
> > > >
> > > >  doc
> > > >
> > > >- remove table schema section inference as flink doesn't have
> > built-in
> > > >support yet
> > > >- remove paritialser/deser given this fits better as a kafka table
> > > >source optimization that apply to various of encoding formats
> > > >- align implementation with protol-buf flink support to keep code
> > > >consistency
> > > >
> > > > Please give another pass and let me know if you have any questions.
> > > >
> > > > Chen
> > > >
> > > > On Mon, May 30, 2022 at 6:34 PM Chen Qin  wrote:
> > > >
> > > >>
> > > >>
> > >

Re: Table API thrift support

2022-11-26 Thread Chen Qin
Hi Martjin,

"shading Thrift libraries from the Hive connector"
Hivemetastore is foundational software running in many companies used by
Spark/Flink... etc. Upgrading the hive metastore touches many pieces of
data engineering. If the user updates flink job jar dependency to the
latest 0.17, it would not guarantee both HMS and jar would work properly.
and yes, 0.5-p6 is unfortunate internal tech debt we would work on outside
of this FLIP work.

"KafkaSource and KafkaSink"
sounds good, this part seems outdated.

"explain how a Thrift schema can be compiled/used in a SQL"
I see, our approach requires extra schema gen and jar load compared to
proto-buf implementation.  Our internal implementation contains a schema
inference patch that got moved out of this FLIP document. I agree it might
be worth removing compile requirement for ease of use.

Chen


On Wed, Nov 23, 2022 at 6:42 AM Martijn Visser 
wrote:

> Hi Chen,
>
> I'm a bit skeptical of shading Thrift libraries from the Hive connector,
> especially with the plans to externalize connectors (including Hive). Have
> we considered getting the versions in sync to avoid the need of any
> shading?


> The FLIP also shows a version of Thrift (0.5.0-p6) that I don't see in
> Maven central, but the latest version there is 0.17.0. We should support
> the latest version. Do you know when Thrift expects to reach a major
> version? I'm not too fond of not having any major version/compatibility
> guarantees.
>
> The FLIP mentions FlinkKafkaConsumer and FlinkKafkaProducer; these are
> deprecated and should not be implemented, only KafkaSource and KafkaSink.
>
> Can you explain how a Thrift schema can be compiled/used in a SQL
> application, like also is done for Protobuf?
>
> https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/formats/protobuf/
>
> Best regards,
>
> Martijn
>
> On Tue, Nov 22, 2022 at 6:44 PM Chen Qin  wrote:
>
> > Hi Yuxia, Martijin,
> >
> > Thanks for your feedback on FLIP-237!
> > My understanding is that FLIP-237 better focuses on thrift
> > encoding/decoding in Datastream/Table API/ Pyflink.
> > To address feedbacks, I made follow changes to FLIP-237
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-237%3A+Thrift+Format+Support
> >
> >  doc
> >
> >- remove table schema section inference as flink doesn't have built-in
> >support yet
> >- remove paritialser/deser given this fits better as a kafka table
> >source optimization that apply to various of encoding formats
> >- align implementation with protol-buf flink support to keep code
> >consistency
> >
> > Please give another pass and let me know if you have any questions.
> >
> > Chen
> >
> > On Mon, May 30, 2022 at 6:34 PM Chen Qin  wrote:
> >
> >>
> >>
> >> On Mon, May 30, 2022 at 7:35 AM Martijn Visser <
> martijnvis...@apache.org>
> >> wrote:
> >>
> >>> Hi Chen,
> >>>
> >>> I think the best starting point would be to create a FLIP [1]. One of
> the
> >>> important topics from my point of view is to make sure that such
> changes
> >>> are not only available for SQL users, but are also being considered for
> >>> Table API, DataStream and/or Python. There might be reasons why not to
> do
> >>> that, but then those considerations should also be captured in the
> FLIP.
> >>>
> >>> > thanks for piointer, working on Flip-237, stay tune
> >>
> >>> Another thing that would be interesting is how Thrift translates into
> >>> Flink
> >>> connectors & Flink formats. Or is your Thrift implementation only a
> >>> connector?
> >>>
> >> > it's flink-format for most part, hope it can help with pyflink not
> sure.
> >>
> >>>
> >>> Best regards,
> >>>
> >>> Martijn
> >>>
> >>> [1]
> >>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >>>
> >>> Op zo 29 mei 2022 om 19:06 schreef Chen Qin :
> >>>
> >>> > Hi there,
> >>> >
> >>> > We would like to discuss and potentially upstream our thrift support
> >>> > patches to flink.
> >>> >
> >>> > For some context, we have been internally patched flink-1.11.2 to
> >>> support
> >>> > FlinkSQL jobs read/write to thrift encoded kafka source/sink. Over
> the
> >>> > course of last 12 months,

Re: Table API thrift support

2022-11-22 Thread Chen Qin
Hi Yuxia, Martijin,

Thanks for your feedback on FLIP-237!
My understanding is that FLIP-237 better focuses on thrift
encoding/decoding in Datastream/Table API/ Pyflink.
To address feedbacks, I made follow changes to FLIP-237
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-237%3A+Thrift+Format+Support>
 doc

   - remove table schema section inference as flink doesn't have built-in
   support yet
   - remove paritialser/deser given this fits better as a kafka table
   source optimization that apply to various of encoding formats
   - align implementation with protol-buf flink support to keep code
   consistency

Please give another pass and let me know if you have any questions.

Chen

On Mon, May 30, 2022 at 6:34 PM Chen Qin  wrote:

>
>
> On Mon, May 30, 2022 at 7:35 AM Martijn Visser 
> wrote:
>
>> Hi Chen,
>>
>> I think the best starting point would be to create a FLIP [1]. One of the
>> important topics from my point of view is to make sure that such changes
>> are not only available for SQL users, but are also being considered for
>> Table API, DataStream and/or Python. There might be reasons why not to do
>> that, but then those considerations should also be captured in the FLIP.
>>
>> > thanks for piointer, working on Flip-237, stay tune
>
>> Another thing that would be interesting is how Thrift translates into
>> Flink
>> connectors & Flink formats. Or is your Thrift implementation only a
>> connector?
>>
> > it's flink-format for most part, hope it can help with pyflink not sure.
>
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>
>> Op zo 29 mei 2022 om 19:06 schreef Chen Qin :
>>
>> > Hi there,
>> >
>> > We would like to discuss and potentially upstream our thrift support
>> > patches to flink.
>> >
>> > For some context, we have been internally patched flink-1.11.2 to
>> support
>> > FlinkSQL jobs read/write to thrift encoded kafka source/sink. Over the
>> > course of last 12 months, those patches supports a few features not
>> > available in open source master, including
>> >
>> >- allow user defined inference thrift stub class name in table DDL,
>> >Thrift binary <-> Row
>> >- dynamic overwrite schema type information loaded from HiveCatalog
>> >(Table only)
>> >- forward compatible when kafka topic encode with new schema (adding
>> new
>> >field)
>> >- backward compatible when job with new schema handles input or state
>> >with old schema
>> >
>> > With more FlinkSQL jobs in production, we expect maintenance of
>> divergent
>> > feature sets to increase in the next 6-12 months. Specifically
>> challenges
>> > around
>> >
>> >- lack of systematic way to support inference based table/view ddl
>> >(parity with hiveql serde
>> ><
>> >
>> https://cwiki.apache.org/confluence/display/hive/serde#:~:text=SerDe%20Overview,-SerDe%20is%20short=Hive%20uses%20the%20SerDe%20interface,HDFS%20in%20any%20custom%20format
>> > .>
>> >)
>> >- lack of robust mapping from thrift field to row field
>> >- dynamic update set of table with same inference class when
>> performing
>> >schema change (e.g adding new field)
>> >- minor lack of handle UNSET case, use NULL
>> >
>> > Please kindly provide pointers around the challenges section.
>> >
>> > Thanks,
>> > Chen, Pinterest.
>> >
>>
>


Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-20 Thread Chen Qin
On Sun, Nov 20, 2022 at 7:25 AM Gyula Fóra  wrote:

> Hi Chen!
>
> I think in the long term it makes sense to provide some pluggable
> mechanisms but it's not completely trivial where exactly you would plug in
> your custom logic at this point.
>
sounds good, more specifically would be great if it can accept input features
(including previous scaling decisions) and output decisions.
Folks might keep their own secret sauce and avoid patching oss fork.

>
> In any case the problems you mentioned should be solved robustly by the
> algorithm itself without any customization:
>  - We need to be able to detect ineffective scaling decisions, let\s say we
> scaled up (expecting better throughput with a higher parallelism) but we
> did not get a better processing capacity (this would be the external
> service bottleneck)
>
sounds good, so we would at least try restart job once (optimistic path) as
design choice.

>  - We are evaluating metrics in windows, and we have some flexible
> boundaries to avoid scaling on minor load spikes
>
yes, would be great if user can feed in throughput changes over different
time buckets (last 10s, 30s, 1 min,5 mins) as input features

>
> Regards,
> Gyula
>
> On Sun, Nov 20, 2022 at 12:28 AM Chen Qin  wrote:
>
> > Hi Gyula,
> >
> > Do we think the scaler could be a plugin or hard coded ?
> > We observed some cases scaler can't address (e.g async io dependency
> > service degradation or small spike that doesn't worth restarting job)
> >
> > Thanks,
> > Chen
> >
> > On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra  wrote:
> >
> > > Hi Dong!
> > >
> > > Could you please confirm that your main concerns have been addressed?
> > >
> > > Some other minor details that might not have been fully clarified:
> > >  - The prototype has been validated on some production workloads yes
> > >  - We are only planning to use metrics that are generally available and
> > are
> > > previously accepted to be standardized connector metrics (not Kafka
> > > specific). This is actually specified in the FLIP
> > >  - Even if some metrics (such as pendingRecords) are not accessible the
> > > scaling algorithm works and can be used. For source scaling based on
> > > utilization alone we still need some trivial modifications on the
> > > implementation side.
> > >
> > > Cheers,
> > > Gyula
> > >
> > > On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra 
> wrote:
> > >
> > > > Hi Dong!
> > > >
> > > > This is not an experimental feature proposal. The implementation of
> the
> > > > prototype is still in an experimental phase but by the time the FLIP,
> > > > initial prototype and review is done, this should be in a good stable
> > > first
> > > > version.
> > > > This proposal is pretty general as autoscalers/tuners get as far as I
> > > > understand and there is no history of any alternative effort that
> even
> > > > comes close to the applicability of this solution.
> > > >
> > > > Any large features that were added to Flink in the past have gone
> > through
> > > > several iterations over the years and the APIs have evolved as they
> > > matured.
> > > > Something like the autoscaler can only be successful if there is
> enough
> > > > user exposure and feedback to make it good, putting it in an external
> > > repo
> > > > will not get us anywhere.
> > > >
> > > > We have a prototype implementation ready that works well and it is
> more
> > > or
> > > > less feature complete. We proposed this FLIP based on something that
> we
> > > see
> > > > as a working solution, please do not underestimate the effort that
> went
> > > > into this proposal and the validation of the ideas. So in this sense
> > our
> > > > approach here is the same as with the Table Store and Kubernetes
> > Operator
> > > > and other big components of the past. On the other hand it's
> impossible
> > > to
> > > > sufficiently explain all the technical depth/implementation details
> of
> > > such
> > > > complex components in FLIPs to 100%, I feel we have a good overview
> of
> > > the
> > > > algorithm in the FLIP and the implementation should cover all
> remaining
> > > > questions. We will have an extended code review phase following the
> > FLIP
> > > > vote before this make it into the project.
> >

Re: [DISCUSS] FLIP-271: Autoscaling

2022-11-19 Thread Chen Qin
Hi Gyula,

Do we think the scaler could be a plugin or hard coded ?
We observed some cases scaler can't address (e.g async io dependency
service degradation or small spike that doesn't worth restarting job)

Thanks,
Chen

On Fri, Nov 18, 2022 at 1:03 AM Gyula Fóra  wrote:

> Hi Dong!
>
> Could you please confirm that your main concerns have been addressed?
>
> Some other minor details that might not have been fully clarified:
>  - The prototype has been validated on some production workloads yes
>  - We are only planning to use metrics that are generally available and are
> previously accepted to be standardized connector metrics (not Kafka
> specific). This is actually specified in the FLIP
>  - Even if some metrics (such as pendingRecords) are not accessible the
> scaling algorithm works and can be used. For source scaling based on
> utilization alone we still need some trivial modifications on the
> implementation side.
>
> Cheers,
> Gyula
>
> On Thu, Nov 17, 2022 at 5:22 PM Gyula Fóra  wrote:
>
> > Hi Dong!
> >
> > This is not an experimental feature proposal. The implementation of the
> > prototype is still in an experimental phase but by the time the FLIP,
> > initial prototype and review is done, this should be in a good stable
> first
> > version.
> > This proposal is pretty general as autoscalers/tuners get as far as I
> > understand and there is no history of any alternative effort that even
> > comes close to the applicability of this solution.
> >
> > Any large features that were added to Flink in the past have gone through
> > several iterations over the years and the APIs have evolved as they
> matured.
> > Something like the autoscaler can only be successful if there is enough
> > user exposure and feedback to make it good, putting it in an external
> repo
> > will not get us anywhere.
> >
> > We have a prototype implementation ready that works well and it is more
> or
> > less feature complete. We proposed this FLIP based on something that we
> see
> > as a working solution, please do not underestimate the effort that went
> > into this proposal and the validation of the ideas. So in this sense our
> > approach here is the same as with the Table Store and Kubernetes Operator
> > and other big components of the past. On the other hand it's impossible
> to
> > sufficiently explain all the technical depth/implementation details of
> such
> > complex components in FLIPs to 100%, I feel we have a good overview of
> the
> > algorithm in the FLIP and the implementation should cover all remaining
> > questions. We will have an extended code review phase following the FLIP
> > vote before this make it into the project.
> >
> > I understand your concern regarding the stability of Flink Kubernetes
> > Operator config and metric names. We have decided to not provide
> guarantees
> > there yet but if you feel that it's time for the operator to support such
> > guarantees please open a separate discussion on that topic, I don't want
> to
> > mix the two problems here.
> >
> > Regards,
> > Gyula
> >
> > On Thu, Nov 17, 2022 at 5:07 PM Dong Lin  wrote:
> >
> >> Hi Gyula,
> >>
> >> If I understand correctly, this autopilot proposal is an experimental
> >> feature and its configs/metrics are not mature enough to provide
> backward
> >> compatibility yet. And the proposal provides high-level ideas of the
> >> algorithm but it is probably too complicated to explain it end-to-end.
> >>
> >> On the one hand, I do agree that having an auto-tuning prototype, even
> if
> >> not mature, is better than nothing for Flink users. On the other hand, I
> >> am
> >> concerned that this FLIP seems a bit too experimental, and starting with
> >> an
> >> immature design might make it harder for us to reach a production-ready
> >> and
> >> generally applicable auto-tuner in the future. And introducing too
> >> backward
> >> incompatible changes generally hurts users' trust in the Flink project.
> >>
> >> One alternative might be to develop and experiment with this feature in
> a
> >> non-Flink repo. You can iterate fast without worrying about typically
> >> backward compatibility requirement as required for most Flink public
> >> features. And once the feature is reasonably evaluated and mature
> enough,
> >> it will be much easier to explain the design and address all the issues
> >> mentioned above. For example, Jingsong implemented a Flink Table Store
> >> prototype
> >> 
> >> before
> >> proposing FLIP-188 in this thread
> >> .
> >>
> >> I don't intend to block your progress. Just my two cents. It will be
> great
> >> to hear more from other developers (e.g. in the voting thread).
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Thu, Nov 17, 2022 at 1:24 AM Gyula Fóra 
> wrote:
> >>
> >> > Hi Dong,
> >> >
> >> > Let me address your comments.
> >> >
> >> > Time for scale / backlog processing 

Re: [DISCUSS] FLIP-237: Thrift Format Support in Flink

2022-06-10 Thread Chen Qin
Hi Martjin,

Thank you for good feedback! I might not be able to address them all
especially around catalog table schema inference design.

>* What I'm missing overall, is the section on 'Public Interfaces'. The FLIP
has a large Proposed Changes section
I updated https://cwiki.apache.org/confluence/display/FLINK/FLIP-237
%3A+Thrift+Format+Support as well as google doc with public interfaces and
summary section outline areas we propose to change. Let me know if that
helps. I agree, swallow errors should be opt-in based, user needs to add
code in datastream code and check metrics in their code. Please check
datastrem api section under public interfaces section. I am not aware if we
have way to capture ser/deser errors with DLQ, the best knowledge is having
side output but that's one level up in stack.(

 >I think 'Proposed Changes' needs a summary because you're suggesting
multiple things and it easy to miss one of them.
I categorized based on changes in summary. I agree this FLIP seems more
like a journey, let me know if summary can focus on what audience needs to
pay attention to (aka what need to change)

> * With regards to mapping ENUM to STRING, what is to be expected if Flink
will support ENUM in the future?
Yes, we will move to ENUM as well.

> * Is there a tight dependency between Thrift and Hive? When we externalize
the Hive connector, can the Thrift format still work?
We plan to introduce a flink-thrift in flink-format subproject with proper
thrift.version that users can define in mvn build. Instead of getting
thrift dep from the hive connector implicitly. More detail in public
interfaces section.

* The FLIP mentions usage of Thrift on both DataStream and SQL jobs;
however, the FLIP is very SQL oriented.
>I split a dedicated section in public changes with code examples how
datastream api user can leverage our proposed change.


 * With regards to the Table/View Inference DDL, this is only providing the
Hive metastore as options. I would like to understand how this could work
with Catalogs in general, not with Hive only. What type of
compatibility guarantees (backward, forward, full) does Thrift offer?
> this part needs more work, current implementation is more like a hack as
I described in the proposal doc. Looking for more suggestions if folks are
already considering along inference schema support. I also adds section on
schema compatibility (limit to der/deser, instead of catalog table topic)

Thanks,
Chen


On Thu, Jun 9, 2022 at 12:34 AM Martijn Visser 
wrote:

> Hi Chen,
>
> Thanks for creating the FLIP and opening the discussion. I have a couple of
> questions/remarks:
>
> * What I'm missing overall, is the section on 'Public Interfaces'. The FLIP
> has a large Proposed Changes section, but it reads more like your journey
> when you implemented Thrift in your fork. For example, it mentions corrupt
> Thrift payloads causing issues, but I can't determine if you want to
> propose to deal with this upfront or not. (I would not deal with this
> upfront in the format implementation, because there will be users who want
> to job the fail while others just want it to continue and send something to
> a DLQ)
> * I think 'Proposed Changes' needs a summary because you're suggesting
> multiple things and it easy to miss one of them.
> * With regards to mapping ENUM to STRING, what is to be expected if Flink
> will support ENUM in the future?
> * Is there a tight dependency between Thrift and Hive? When we externalize
> the Hive connector, can the Thrift format still work?
> * The FLIP mentions usage of Thrift on both DataStream and SQL jobs;
> however, the FLIP is very SQL oriented.
> * With regards to the Table/View Inference DDL, this is only providing the
> Hive metastore as options. I would like to understand how this could work
> with Catalogs in general, not with Hive only. What type of
> compatibility guarantees (backward, forward, full) does Thrift offer?
>
> Best regards,
>
> Martijn
>
> Op di 7 jun. 2022 om 18:56 schreef Chen Qin :
>
> > Thanks for the pointers, I moved the proposal to the wiki and updated the
> > FLIP status page.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-237%3A+Thrift+Format+Support
> >
> >
> > Looking forward to getting community feedback.
> >
> > Chen
> >
> >
> >
> > On Tue, Jun 7, 2022 at 12:12 AM Jark Wu  wrote:
> >
> > > Yes. The community recommends keeping content on the wiki page
> > > and discuss in the mailing list. Discussions on the google doc are
> > > not so visible to the community, and "If it didn’t happen on a mailing
> > > list, it didn’t happen."
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 7 Jun 2022 at 14:15, J

Re: [DISCUSS] FLIP-237: Thrift Format Support in Flink

2022-06-07 Thread Chen Qin
Thanks for the pointers, I moved the proposal to the wiki and updated the
FLIP status page.

https://cwiki.apache.org/confluence/display/FLINK/FLIP-237%3A+Thrift+Format+Support


Looking forward to getting community feedback.

Chen



On Tue, Jun 7, 2022 at 12:12 AM Jark Wu  wrote:

> Yes. The community recommends keeping content on the wiki page
> and discuss in the mailing list. Discussions on the google doc are
> not so visible to the community, and "If it didn’t happen on a mailing
> list, it didn’t happen."
>
> Best,
> Jark
>
> On Tue, 7 Jun 2022 at 14:15, Jing Ge  wrote:
>
> > Hi Chen,
> >
> > Thanks for driving this! Afaik, the community has the consensus to *Start
> > a [DISCUSS] thread on the Apache mailing list*[1]. I just walked through
> > some existing FLIPs and didn't find any that have been using google doc
> as
> > the discussion thread. Would you like to follow the current process and
> > move the content to the FLIP Wiki page? Another question would be: could
> > anyone in the community confirm that it is also fine to use google doc to
> > discuss? We'd better clarify it before kicking off the discussion.
> Thanks!
> >
> > By the way, you might want to reserve the FLIP number 237 on [1] in the
> > table *Adopted/Accepted but unreleased FLIPs.*
> >
> > Best regards,
> > Jing
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-WhatshouldbeincludedinaFLIP
> > ?
> >
> > On Tue, Jun 7, 2022 at 6:41 AM Chen Qin  wrote:
> >
> > > Hi there,
> > >
> > > I want to kick off the first round of FLIP-237
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-237%3A+Thrift+Format+Support
> > > >:
> > > Thrift Format Support discussion. Notice for the area marked as WIP, we
> > are
> > > looking for more feedback from folks, those areas would either stay in
> > the
> > > scope of current FLIP or removed based on feedback and discussion
> > results.
> > >
> > > Google Doc
> > > <
> > >
> >
> https://docs.google.com/document/d/1EhHewAW39pm-TX6fuUZLogWHK7vtgJ616WRwH7OOrgg/edit#
> > > >
> > >
> > >
> > > Thanks,
> > > Chen Q
> > >
> >
>


[DISCUSS] FLIP-237: Thrift Format Support in Flink

2022-06-06 Thread Chen Qin
Hi there,

I want to kick off the first round of FLIP-237
:
Thrift Format Support discussion. Notice for the area marked as WIP, we are
looking for more feedback from folks, those areas would either stay in the
scope of current FLIP or removed based on feedback and discussion results.

Google Doc



Thanks,
Chen Q


Re: Table API thrift support

2022-05-30 Thread Chen Qin
On Mon, May 30, 2022 at 7:35 AM Martijn Visser 
wrote:

> Hi Chen,
>
> I think the best starting point would be to create a FLIP [1]. One of the
> important topics from my point of view is to make sure that such changes
> are not only available for SQL users, but are also being considered for
> Table API, DataStream and/or Python. There might be reasons why not to do
> that, but then those considerations should also be captured in the FLIP.
>
> > thanks for piointer, working on Flip-237, stay tune

> Another thing that would be interesting is how Thrift translates into Flink
> connectors & Flink formats. Or is your Thrift implementation only a
> connector?
>
> it's flink-format for most part, hope it can help with pyflink not sure.

>
> Best regards,
>
> Martijn
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
> Op zo 29 mei 2022 om 19:06 schreef Chen Qin :
>
> > Hi there,
> >
> > We would like to discuss and potentially upstream our thrift support
> > patches to flink.
> >
> > For some context, we have been internally patched flink-1.11.2 to support
> > FlinkSQL jobs read/write to thrift encoded kafka source/sink. Over the
> > course of last 12 months, those patches supports a few features not
> > available in open source master, including
> >
> >- allow user defined inference thrift stub class name in table DDL,
> >Thrift binary <-> Row
> >- dynamic overwrite schema type information loaded from HiveCatalog
> >(Table only)
> >- forward compatible when kafka topic encode with new schema (adding
> new
> >field)
> >- backward compatible when job with new schema handles input or state
> >with old schema
> >
> > With more FlinkSQL jobs in production, we expect maintenance of divergent
> > feature sets to increase in the next 6-12 months. Specifically challenges
> > around
> >
> >- lack of systematic way to support inference based table/view ddl
> >(parity with hiveql serde
> ><
> >
> https://cwiki.apache.org/confluence/display/hive/serde#:~:text=SerDe%20Overview,-SerDe%20is%20short=Hive%20uses%20the%20SerDe%20interface,HDFS%20in%20any%20custom%20format
> > .>
> >)
> >- lack of robust mapping from thrift field to row field
> >- dynamic update set of table with same inference class when
> performing
> >schema change (e.g adding new field)
> >- minor lack of handle UNSET case, use NULL
> >
> > Please kindly provide pointers around the challenges section.
> >
> > Thanks,
> > Chen, Pinterest.
> >
>


Table API thrift support

2022-05-29 Thread Chen Qin
Hi there,

We would like to discuss and potentially upstream our thrift support
patches to flink.

For some context, we have been internally patched flink-1.11.2 to support
FlinkSQL jobs read/write to thrift encoded kafka source/sink. Over the
course of last 12 months, those patches supports a few features not
available in open source master, including

   - allow user defined inference thrift stub class name in table DDL,
   Thrift binary <-> Row
   - dynamic overwrite schema type information loaded from HiveCatalog
   (Table only)
   - forward compatible when kafka topic encode with new schema (adding new
   field)
   - backward compatible when job with new schema handles input or state
   with old schema

With more FlinkSQL jobs in production, we expect maintenance of divergent
feature sets to increase in the next 6-12 months. Specifically challenges
around

   - lack of systematic way to support inference based table/view ddl
   (parity with hiveql serde
   

   )
   - lack of robust mapping from thrift field to row field
   - dynamic update set of table with same inference class when performing
   schema change (e.g adding new field)
   - minor lack of handle UNSET case, use NULL

Please kindly provide pointers around the challenges section.

Thanks,
Chen, Pinterest.


[jira] [Created] (FLINK-27726) shad thrift and fb303 in hive connector

2022-05-20 Thread Chen Qin (Jira)
Chen Qin created FLINK-27726:


 Summary: shad thrift and fb303 in hive connector
 Key: FLINK-27726
 URL: https://issues.apache.org/jira/browse/FLINK-27726
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.14.4, 1.14.3, 1.13.6, 1.14.2, 1.13.5, 1.12.7, 1.11.6, 
1.15.0, 1.13.3, 1.13.2, 1.12.5, 1.12.4, 1.13.1, 1.12.3
Reporter: Chen Qin


Hive connector introduced fb303 and thrift version to connect to specific hive 
meta store version. If user code also pull specific thrift version along with 
fb303 that is not same as hive connector introduced, user code will not able to 
connect to hive meta store.

 

This fix has been verified in production environment as part of support thrift 
encoded FlinkSQL for more than 6 months.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


FLINK-11746 work

2022-04-11 Thread Chen Qin
Hi there,

I would like to reboot discussion on FLINK-11746
 work. Over the course
of last two years, we managed to run a large number of critical flink apps
(table/datastream) with underlying thrift format. It would be great if
folks assign this jira to me so we would be able to kick off discussion on
upstream work.

Chen


is task reassignment possible

2022-01-09 Thread Chen Qin
Hi there,

We ran multiple large scale applications YARN clusters, one observation
were those jobs often CPU skewed due to topology or data skew on subtasks.
And for better or worse, the skew leads to a few task managers consuming
large vcores while majority task managers consume much less. Our goal is to
save the total infra budget while keeping the job running smoothly.

Any ongoing discussions in this area? Naively, if we know for sure a few
tasks (uuids) use higher vcore from previous runs, could we request one
last batch of containers with high vcore resource profile and reassign
those tasks?

Thanks,
Chen


Re: handle SUSPENDED in ZooKeeperLeaderRetrievalService

2021-04-23 Thread Chen Qin
Sure, I update jira with exception info. We could follow up from there
for technical discussions.

https://issues.apache.org/jira/browse/FLINK-10052?focusedCommentId=17330858=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17330858

On Thu, Apr 22, 2021 at 10:17 PM tison  wrote:

> The original log (section) is preferred over rephrasing.
> Best,
> tison.
>
>
> tison  于2021年4月23日周五 下午1:15写道:
>
> > Could you show the log about which unhandled exception was thrown?
> >
> > Best,
> > tison.
> >
> >
> > Chen Qin  于2021年4月23日周五 下午1:06写道:
> >
> >> Hi Tison,
> >>
> >> Please read my latest comments in the thread. Using SessionErrorPolicy
> >> mitigated the suspended state issue while it might trigger an unhandled
> zk
> >> client exception in some situations. We would like to get some idea of
> the
> >> root cause of that issue to avoid introducing another issue in the fix.
> >>
> >> Chen
> >>
> >>
> >> On Thu, Apr 22, 2021 at 10:04 AM tison  wrote:
> >>
> >> > > My question is can we get some insight behind this decision and
> could
> >> we
> >> > add
> >> > some tunable configuration for user to decide how long they can endure
> >> such
> >> > uncertain suspended state in their jobs.
> >> >
> >> > For the specific question, Curator provides a configure for session
> >> timeout
> >> > and a
> >> > LOST will be generated if disconnected elapsed longer then the
> >> configured
> >> > timeout.
> >> >
> >> >
> >> >
> >>
> https://github.com/apache/flink/blob/58a7c80fa35424608ad44d1d6691d1407be0092a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L101-L102
> >> >
> >> >
> >> > Best,
> >> > tison.
> >> >
> >> >
> >> > tison  于2021年4月23日周五 上午12:57写道:
> >> >
> >> > > To be concrete, if ZK suspended and reconnected, NodeCache already
> do
> >> > > the reset work for you and if there is a leader epoch updated,
> fencing
> >> > > token
> >> > > a.k.a leader session id would be updated so you will notice it.
> >> > >
> >> > > If ZK permanently lost, I think it is a system-wise fault and you'd
> >> > better
> >> > > restart
> >> > > the job from checkpoint/savepoint with a working ZK ensemble.
> >> > >
> >> > > I am possibly concluding without more detailed investigation though.
> >> > >
> >> > > Best,
> >> > > tison.
> >> > >
> >> > >
> >> > > tison  于2021年4月23日周五 上午12:35写道:
> >> > >
> >> > >> > Unfortunately, we do not have any progress on this ticket.
> >> > >>
> >> > >> Here is a PR[1].
> >> > >>
> >> > >> Here is the base PR[2] I made about one year ago without following
> >> > review.
> >> > >>
> >> > >> qinnc...@gmail.com:
> >> > >>
> >> > >> It requires further investigation about the impact involved by
> >> > >> FLINK-18677[3].
> >> > >> I do have some comments[4] but so far regard it as a stability
> >> problem
> >> > >> instead of
> >> > >> correctness problem.
> >> > >>
> >> > >> FLINK-18677 tries to "fix" an unreasonable scenario where zk lost
> >> > FOREVER,
> >> > >> and I don't want to pay any time before reactions on FLINK-10052
> >> > otherwise
> >> > >> it is highly possibly in vain again from my perspective.
> >> > >>
> >> > >> Best,
> >> > >> tison.
> >> > >>
> >> > >> [1] https://github.com/apache/flink/pull/15675
> >> > >> [2] https://github.com/apache/flink/pull/11338
> >> > >> [3] https://issues.apache.org/jira/browse/FLINK-18677
> >> > >> [4]
> https://github.com/apache/flink/pull/13055#discussion_r615871963
> >> > >>
> >> > >>
> >> > >>
> >> > >> Chen Qin  于2021年4月23日周五 上午12:15写道:
> >> > >>
> >> > >>> Hi there,
> >> > >>>
> >&

Re: handle SUSPENDED in ZooKeeperLeaderRetrievalService

2021-04-22 Thread Chen Qin
Hi Tison,

Please read my latest comments in the thread. Using SessionErrorPolicy
mitigated the suspended state issue while it might trigger an unhandled zk
client exception in some situations. We would like to get some idea of the
root cause of that issue to avoid introducing another issue in the fix.

Chen


On Thu, Apr 22, 2021 at 10:04 AM tison  wrote:

> > My question is can we get some insight behind this decision and could we
> add
> some tunable configuration for user to decide how long they can endure such
> uncertain suspended state in their jobs.
>
> For the specific question, Curator provides a configure for session timeout
> and a
> LOST will be generated if disconnected elapsed longer then the configured
> timeout.
>
>
> https://github.com/apache/flink/blob/58a7c80fa35424608ad44d1d6691d1407be0092a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java#L101-L102
>
>
> Best,
> tison.
>
>
> tison  于2021年4月23日周五 上午12:57写道:
>
> > To be concrete, if ZK suspended and reconnected, NodeCache already do
> > the reset work for you and if there is a leader epoch updated, fencing
> > token
> > a.k.a leader session id would be updated so you will notice it.
> >
> > If ZK permanently lost, I think it is a system-wise fault and you'd
> better
> > restart
> > the job from checkpoint/savepoint with a working ZK ensemble.
> >
> > I am possibly concluding without more detailed investigation though.
> >
> > Best,
> > tison.
> >
> >
> > tison  于2021年4月23日周五 上午12:35写道:
> >
> >> > Unfortunately, we do not have any progress on this ticket.
> >>
> >> Here is a PR[1].
> >>
> >> Here is the base PR[2] I made about one year ago without following
> review.
> >>
> >> qinnc...@gmail.com:
> >>
> >> It requires further investigation about the impact involved by
> >> FLINK-18677[3].
> >> I do have some comments[4] but so far regard it as a stability problem
> >> instead of
> >> correctness problem.
> >>
> >> FLINK-18677 tries to "fix" an unreasonable scenario where zk lost
> FOREVER,
> >> and I don't want to pay any time before reactions on FLINK-10052
> otherwise
> >> it is highly possibly in vain again from my perspective.
> >>
> >> Best,
> >> tison.
> >>
> >> [1] https://github.com/apache/flink/pull/15675
> >> [2] https://github.com/apache/flink/pull/11338
> >> [3] https://issues.apache.org/jira/browse/FLINK-18677
> >> [4] https://github.com/apache/flink/pull/13055#discussion_r615871963
> >>
> >>
> >>
> >> Chen Qin  于2021年4月23日周五 上午12:15写道:
> >>
> >>> Hi there,
> >>>
> >>> Quick dial back here, we have been running load testing and so far
> >>> haven't
> >>> seen suspended state cause job restarts.
> >>>
> >>> Some findings, instead of curator framework capture suspended state and
> >>> active notify leader lost, we have seen task manager propagate
> unhandled
> >>> errors from zk client, most likely due to
> >>> high-availability.zookeeper.client.max-retry-attempts
> >>> were set to 3 and with 5 seconds interval. It would be great if we
> handle
> >>> this exception gracefully with a meaningful exception message. Those
> >>> error
> >>> messages happen when other task managers die due to user code
> exceptions,
> >>> we would like to know more insights on this as well.
> >>>
> >>> For more context, Lu from our team also filed [2] stating issue with
> 1.9,
> >>> so far we haven't seen regression on ongoing load testing jobs.
> >>>
> >>> Thanks,
> >>> Chen
> >>>
> >>> Caused by:
> >>> >
> >>>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException:
> >>> > KeeperErrorCode = ConnectionLoss
> >>> > at
> >>> >
> >>>
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> >>> > at
> >>> >
> >>>
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:862)
> >>>
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-10052
> >>> [2] https://issues.apache.org/jira/browse/FLINK-19985
> >>>
> >>>

Re: handle SUSPENDED in ZooKeeperLeaderRetrievalService

2021-04-22 Thread Chen Qin
Hi there,

Quick dial back here, we have been running load testing and so far haven't
seen suspended state cause job restarts.

Some findings, instead of curator framework capture suspended state and
active notify leader lost, we have seen task manager propagate unhandled
errors from zk client, most likely due to
high-availability.zookeeper.client.max-retry-attempts
were set to 3 and with 5 seconds interval. It would be great if we handle
this exception gracefully with a meaningful exception message. Those error
messages happen when other task managers die due to user code exceptions,
we would like to know more insights on this as well.

For more context, Lu from our team also filed [2] stating issue with 1.9,
so far we haven't seen regression on ongoing load testing jobs.

Thanks,
Chen

Caused by:
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$ConnectionLossException:
> KeeperErrorCode = ConnectionLoss
> at
> org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> at
> org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:862)


[1] https://issues.apache.org/jira/browse/FLINK-10052
[2] https://issues.apache.org/jira/browse/FLINK-19985


On Thu, Apr 15, 2021 at 7:27 PM Yang Wang  wrote:

> Thanks for trying the unfinished PR and sharing the testing results. Glad
> to here that it could work
> and really hope the result of more stringent load testing.
>
> After then I think we could revive this ticket.
>
>
> Best,
> Yang
>
> Chen Qin  于2021年4月16日周五 上午2:01写道:
>
>> Hi there,
>>
>> Thanks for providing points to related changes and jira. Some updates
>> from our side, we applied a path by merging FLINK-10052
>> <https://issues.apache.org/jira/browse/FLINK-10052> with master as well
>> as only handling lost state leveraging SessionConnectionStateErrorPolicy
>>   FLINK-10052 <https://issues.apache.org/jira/browse/FLINK-10052>
>>  introduced.
>>
>> Preliminary results were good, the same workload (240 TM) on the same
>> environment runs stable without frequent restarts due to suspended state
>> (seems false positive). We are working on more stringent load testing as
>> well as chaos testing (blocking zk). Will keep folks posted.
>>
>> Thanks,
>> Chen
>>
>>
>> On Tue, Apr 13, 2021 at 1:34 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Chenqin,
>>>
>>> The current rationale behind assuming a leadership loss when seeing a
>>> SUSPENDED connection is to assume the worst and to be on the safe side.
>>>
>>> Yang Wang is correct. FLINK-10052 [1] has the goal to make the behaviour
>>> configurable. Unfortunately, the community did not have enough time to
>>> complete this feature.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-10052
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Apr 13, 2021 at 8:25 AM Yang Wang  wrote:
>>>
>>> > This might be related with FLINK-10052[1].
>>> > Unfortunately, we do not have any progress on this ticket.
>>> >
>>> > cc @Till Rohrmann 
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > chenqin  于2021年4月13日周二 上午7:31写道:
>>> >
>>> >> Hi there,
>>> >>
>>> >> We observed several 1.11 job running in 1.11 restart due to job leader
>>> >> lost.
>>> >> Dig deeper, the issue seems related to SUSPENDED state handler in
>>> >> ZooKeeperLeaderRetrievalService.
>>> >>
>>> >> ASFAIK, suspended state is expected when zk is not certain if leader
>>> is
>>> >> still alive. It can follow up with RECONNECT or LOST. In current
>>> >> implementation [1] , we treat suspended state same as lost state and
>>> >> actively shutdown job. This pose stability issue on large HA setting.
>>> >>
>>> >> My question is can we get some insight behind this decision and could
>>> we
>>> >> add
>>> >> some tunable configuration for user to decide how long they can endure
>>> >> such
>>> >> uncertain suspended state in their jobs.
>>> >>
>>> >> Thanks,
>>> >> Chen
>>> >>
>>> >> [1]
>>> >>
>>> >>
>>> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L201
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Sent from:
>>> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>>> >>
>>> >
>>>
>>


Re: handle SUSPENDED in ZooKeeperLeaderRetrievalService

2021-04-15 Thread Chen Qin
Hi there,

Thanks for providing points to related changes and jira. Some updates from
our side, we applied a path by merging FLINK-10052
 with master as well as
only handling lost state leveraging SessionConnectionStateErrorPolicy
FLINK-10052  introduced.

Preliminary results were good, the same workload (240 TM) on the same
environment runs stable without frequent restarts due to suspended state
(seems false positive). We are working on more stringent load testing as
well as chaos testing (blocking zk). Will keep folks posted.

Thanks,
Chen


On Tue, Apr 13, 2021 at 1:34 AM Till Rohrmann  wrote:

> Hi Chenqin,
>
> The current rationale behind assuming a leadership loss when seeing a
> SUSPENDED connection is to assume the worst and to be on the safe side.
>
> Yang Wang is correct. FLINK-10052 [1] has the goal to make the behaviour
> configurable. Unfortunately, the community did not have enough time to
> complete this feature.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10052
>
> Cheers,
> Till
>
> On Tue, Apr 13, 2021 at 8:25 AM Yang Wang  wrote:
>
> > This might be related with FLINK-10052[1].
> > Unfortunately, we do not have any progress on this ticket.
> >
> > cc @Till Rohrmann 
> >
> > Best,
> > Yang
> >
> > chenqin  于2021年4月13日周二 上午7:31写道:
> >
> >> Hi there,
> >>
> >> We observed several 1.11 job running in 1.11 restart due to job leader
> >> lost.
> >> Dig deeper, the issue seems related to SUSPENDED state handler in
> >> ZooKeeperLeaderRetrievalService.
> >>
> >> ASFAIK, suspended state is expected when zk is not certain if leader is
> >> still alive. It can follow up with RECONNECT or LOST. In current
> >> implementation [1] , we treat suspended state same as lost state and
> >> actively shutdown job. This pose stability issue on large HA setting.
> >>
> >> My question is can we get some insight behind this decision and could we
> >> add
> >> some tunable configuration for user to decide how long they can endure
> >> such
> >> uncertain suspended state in their jobs.
> >>
> >> Thanks,
> >> Chen
> >>
> >> [1]
> >>
> >>
> https://github.com/apache/flink/blob/release-1.11/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java#L201
> >>
> >>
> >>
> >>
> >> --
> >> Sent from:
> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >>
> >
>


[jira] [Created] (FLINK-22081) Entropy key not resolved if flink-s3-fs-hadoop is added as a plugin

2021-03-31 Thread Chen Qin (Jira)
Chen Qin created FLINK-22081:


 Summary: Entropy key not resolved if flink-s3-fs-hadoop is added 
as a plugin
 Key: FLINK-22081
 URL: https://issues.apache.org/jira/browse/FLINK-22081
 Project: Flink
  Issue Type: Bug
  Components: FileSystems
Reporter: Chen Qin
Assignee: Prem Santosh
 Fix For: 1.10.1, 1.11.0


Using flink 1.10

I added the flink-s3-fs-hadoop jar in plugins dir but I am seeing the 
checkpoints paths like 
{{s3://my_app/__ENTROPY__/app_name-staging/flink/checkpoints/e10f47968ae74934bd833108d2272419/chk-3071}}
 which means the entropy injection key is not being resolved. After some 
debugging I found that in the 
[EntropyInjector|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/EntropyInjector.java#L97]
 we check if the given fileSystem is of type {{SafetyNetWrapperFileSystem}} and 
if so we check if the delegate is of type {{EntropyInjectingFileSystem}} but 
don't check for {{[ClassLoaderFixingFileSystem 
|https://github.com/apache/flink/blob/release-1.10.0/flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java#L65]}}
 which would be the type if S3 file system dependencies are added as a plugin.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


flink 1.11 class loading question

2021-03-13 Thread Chen Qin
Hi there,

We were using flink 1.11.2 in production with a large setting. The job runs
fine for a couple of days and ends up with a restart loop caused by YARN
container memory kill. This is not observed while running against 1.9.1
with the same setting.
Here is JVM environment passed to 1.11 as well as 1.9.1 job


env.java.opts.taskmanager: '-XX:+UseG1GC -XX:MaxGCPauseMillis=500
> -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
> -XX:InitiatingHeapOccupancyPercent=45 -XX:NewRatio=1
> -XX:+PrintClassHistogram -XX:+PrintGCDateStamps -XX:+PrintGCDetails
> -XX:+PrintGCApplicationStoppedTime -Xloggc:/gc.log'
> env.java.opts.jobmanager: '-XX:+UseG1GC -XX:MaxGCPauseMillis=500
> -XX:ParallelGCThreads=20 -XX:ConcGCThreads=5
> -XX:InitiatingHeapOccupancyPercent=45 -XX:NewRatio=1
> -XX:+PrintClassHistogram -XX:+PrintGCDateStamps -XX:+PrintGCDetails
> -XX:+PrintGCApplicationStoppedTime -Xloggc:/gc.log'
>

After primitive investigation, we found this might not be related to jvm
heap space usage nor gc issue. Meanwhile, we observed jvm non heap usage on
some containers keep rising while job fails into restart loop as stated
below.
[image: image.png]

>From a configuration perspective, we would like to learn how the task
manager handles classloading and (unloading?) when we set include-user-jar
to first. Is there suggestions how we can have a better understanding of
how the new memory model introduced in 1.10 affects this issue?


cluster.evenly-spread-out-slots: true
zookeeper.sasl.disable: true
yarn.per-job-cluster.include-user-jar: first
yarn.properties-file.location: /usr/local/hadoop/etc/hadoop/


Thanks,
Chen


Hive Streaming write compaction

2020-11-18 Thread Chen Qin
Hi there,

We are testing out writing Kafka to hive table as parquet format.
Currently, we have seen user has to choose to create lots of small files in
min level folder to gain latency benefits. I recall FF2020 Global folks
mentioned implement compaction logic during the checkpointing time. Wonder
how that goes? Love collaborate on this topic.

Chen
Pinterest


Re: thrift support

2020-07-22 Thread Chen Qin
Thanks, Yu sharing more background on this.

Jark,

We were able to sync with Yu a bit offline. I think we should reuse Jira
and the future on how to reuse code when we get into the implementation
phase.
and continue the discussion maybe share a google doc detail list of work
and options so folks can agree on as first step. Please assign FLINK-11746 to
me account.

As Benchao previously pointed out, Flink SQL thrift seems likely growing
beyond single pr work.
- Ser/Deser, use kryo to customize seralizer or infer POJO from thrift from
source
- TableSchema and Type translation, use DDL to match or use thrift to infer
DDL, will nest column pruning works?
- As most online services use either gRPc or thrift as service endpoint
definition. Is there a proper way to construct a "table" that interact
directly with those online services (v.s async io) ?

Thanks,
Chen

On Tue, Jul 21, 2020 at 12:14 PM Yu Yang  wrote:

> Thanks for the discussion. In https://github.com/apache/flink/pull/8067 we
> made an initial version on adding thrift-format support in flink, and
> haven't got time to finish it. Feel free to take it over and make changes.
> I've also linked this discussion thread in
> https://issues.apache.org/jira/browse/FLINK-11746.
>
> Regards,
> -Yu
>
> On Tue, Jul 21, 2020 at 1:14 AM Jark Wu  wrote:
>
> > Thanks Dawid for the link. I have a glance at the PR.
> >
> > I think we can continue the thrift format based on the PR (would be
> better
> > to reach out to the author).
> >
> > Best,
> > Jark
> >
> > On Tue, 21 Jul 2020 at 15:58, Dawid Wysakowicz 
> > wrote:
> >
> > > Hi,
> > >
> > > I've just spotted this PR that might be helpful in the discussion:
> > > https://github.com/apache/flink/pull/8067
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 20/07/2020 04:30, Benchao Li wrote:
> > > > Hi Chen,
> > > >
> > > > Thanks for bringing up this discussion. We are doing something
> similar
> > > > internally recently.
> > > >
> > > > Our use case is that many services in our company are built with
> > > > thrift protocol, and we
> > > > want to support accessing these RPC services natively with Flink SQL.
> > > > Currently, there are two ways that we aim to support, they are thrift
> > RPC
> > > > Sink and thrift RPC
> > > > temporal table (dimension table).
> > > > Then our scenario is that we need to support both (de)ser with
> > > > thrift format, and accessing
> > > > the thrift RPC service.
> > > >
> > > > Jeff Zhang  于2020年7月19日周日 上午9:43写道:
> > > >
> > > >> Hi Chen,
> > > >>
> > > >> Right, this is what I mean. Could you provide more details about the
> > > >> desr/ser work ? Giving a concrete example or usage scenario would be
> > > >> helpful.
> > > >>
> > > >>
> > > >>
> > > >> Chen Qin  于2020年7月18日周六 下午11:09写道:
> > > >>
> > > >>> Jeff,
> > > >>>
> > > >>> Are you referring something like this SPIP?
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1ug4K5e2okF5Q2Pzi3qJiUILwwqkn0fVQaQ-Q95HEcJQ/edit#heading=h.x97c6tj78zo0
> > > >>> Not at this moment, we are working on desr/ser work at the moment.
> > > Would
> > > >> be
> > > >>> good to starts discussion and learn if folks working on related
> areas
> > > and
> > > >>> align.
> > > >>>
> > > >>> Chen
> > > >>>
> > > >>> On Sat, Jul 18, 2020 at 6:41 AM Jeff Zhang 
> wrote:
> > > >>>
> > > >>>> Hi Chen,
> > > >>>>
> > > >>>> Are building something like hive thrift server ?
> > > >>>>
> > > >>>> Chen Qin  于2020年7月18日周六 上午8:50写道:
> > > >>>>
> > > >>>>> Hi there,
> > > >>>>>
> > > >>>>> Here in Pinterest, we utilize thrift end to end in our tech
> stack.
> > As
> > > >>> we
> > > >>>>> have been building Flink as a service platform, the team spent
> time
> > > >>>> working
> > > >>>>> on supporting Flink jobs with thrift format and successfully
> > > >> launched a
> > > >>>>> good number of important jobs in Production in H1.
> > > >>>>>
> > > >>>>> In H2, we are looking at supporting Flink SQL with native Thrift
> > > >>> support.
> > > >>>>> We have some prototypes already running in development settings
> and
> > > >>> plan
> > > >>>> to
> > > >>>>> move forward on this approach.
> > > >>>>>
> > > >>>>> In the long run, we thought out of box thrift format support
> would
> > > >>>> benefit
> > > >>>>> other folks as well. So the question is if there is already some
> > > >> effort
> > > >>>>> around this space we can sync with?
> > > >>>>>
> > > >>>>> Chen
> > > >>>>> Pinterest Data
> > > >>>>>
> > > >>>>
> > > >>>> --
> > > >>>> Best Regards
> > > >>>>
> > > >>>> Jeff Zhang
> > > >>>>
> > > >>
> > > >> --
> > > >> Best Regards
> > > >>
> > > >> Jeff Zhang
> > > >>
> > > >
> > >
> > >
> >
>


Re: thrift support

2020-07-20 Thread Chen Qin
Jeff

A sample would be you have a Kafka topic stores record in thrift format,
- Flink SQL will not work because it doesn't support thrift format out of
the box,
- table schema can't be inferred so the user might end up handcrafting
field by field mapping
- thrift object serialization fall back to kryo after user write it's own
version of TDSerializer/TBaseSerailizer based implementation.
- thrift RPC needs user do a bit more work and setup.

bonus,
jvm <-> python can share same dataformat with same schema

Chen

Benchao,

Sounds great! Glad to hear folks are working on this area.

On top of my head, lists of iteams could be
- adding support in flink-format (e.g flink-thrift)
- evaluate if TBaseSeralizaer (Kryo) need extra work
- derive table schema out of thrift struct (java/python or .thrift)
- Row / RowTypeInfo related transformations.
- Thrift RPC Table sink v.s Stream sink in Flink SQL
- thrift RPC temporal table (dimension table). (copy from your side)

What do you think?

Thanks,
Chen

On Sun, Jul 19, 2020 at 7:34 PM Benchao Li  wrote:

> Hi Chen,
>
> Thanks for bringing up this discussion. We are doing something similar
> internally recently.
>
> Our use case is that many services in our company are built with
> thrift protocol, and we
> want to support accessing these RPC services natively with Flink SQL.
> Currently, there are two ways that we aim to support, they are thrift RPC
> Sink and thrift RPC
> temporal table (dimension table).
> Then our scenario is that we need to support both (de)ser with
> thrift format, and accessing
> the thrift RPC service.
>
> Jeff Zhang  于2020年7月19日周日 上午9:43写道:
>
> > Hi Chen,
> >
> > Right, this is what I mean. Could you provide more details about the
> > desr/ser work ? Giving a concrete example or usage scenario would be
> > helpful.
> >
> >
> >
> > Chen Qin  于2020年7月18日周六 下午11:09写道:
> >
> > > Jeff,
> > >
> > > Are you referring something like this SPIP?
> > >
> > >
> >
> https://docs.google.com/document/d/1ug4K5e2okF5Q2Pzi3qJiUILwwqkn0fVQaQ-Q95HEcJQ/edit#heading=h.x97c6tj78zo0
> > > Not at this moment, we are working on desr/ser work at the moment.
> Would
> > be
> > > good to starts discussion and learn if folks working on related areas
> and
> > > align.
> > >
> > > Chen
> > >
> > > On Sat, Jul 18, 2020 at 6:41 AM Jeff Zhang  wrote:
> > >
> > > > Hi Chen,
> > > >
> > > > Are building something like hive thrift server ?
> > > >
> > > > Chen Qin  于2020年7月18日周六 上午8:50写道:
> > > >
> > > > > Hi there,
> > > > >
> > > > > Here in Pinterest, we utilize thrift end to end in our tech stack.
> As
> > > we
> > > > > have been building Flink as a service platform, the team spent time
> > > > working
> > > > > on supporting Flink jobs with thrift format and successfully
> > launched a
> > > > > good number of important jobs in Production in H1.
> > > > >
> > > > > In H2, we are looking at supporting Flink SQL with native Thrift
> > > support.
> > > > > We have some prototypes already running in development settings and
> > > plan
> > > > to
> > > > > move forward on this approach.
> > > > >
> > > > > In the long run, we thought out of box thrift format support would
> > > > benefit
> > > > > other folks as well. So the question is if there is already some
> > effort
> > > > > around this space we can sync with?
> > > > >
> > > > > Chen
> > > > > Pinterest Data
> > > > >
> > > >
> > > >
> > > > --
> > > > Best Regards
> > > >
> > > > Jeff Zhang
> > > >
> > >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: thrift support

2020-07-18 Thread Chen Qin
Jeff,

Are you referring something like this SPIP?
https://docs.google.com/document/d/1ug4K5e2okF5Q2Pzi3qJiUILwwqkn0fVQaQ-Q95HEcJQ/edit#heading=h.x97c6tj78zo0
Not at this moment, we are working on desr/ser work at the moment. Would be
good to starts discussion and learn if folks working on related areas and
align.

Chen

On Sat, Jul 18, 2020 at 6:41 AM Jeff Zhang  wrote:

> Hi Chen,
>
> Are building something like hive thrift server ?
>
> Chen Qin  于2020年7月18日周六 上午8:50写道:
>
> > Hi there,
> >
> > Here in Pinterest, we utilize thrift end to end in our tech stack. As we
> > have been building Flink as a service platform, the team spent time
> working
> > on supporting Flink jobs with thrift format and successfully launched a
> > good number of important jobs in Production in H1.
> >
> > In H2, we are looking at supporting Flink SQL with native Thrift support.
> > We have some prototypes already running in development settings and plan
> to
> > move forward on this approach.
> >
> > In the long run, we thought out of box thrift format support would
> benefit
> > other folks as well. So the question is if there is already some effort
> > around this space we can sync with?
> >
> > Chen
> > Pinterest Data
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


thrift support

2020-07-17 Thread Chen Qin
Hi there,

Here in Pinterest, we utilize thrift end to end in our tech stack. As we
have been building Flink as a service platform, the team spent time working
on supporting Flink jobs with thrift format and successfully launched a
good number of important jobs in Production in H1.

In H2, we are looking at supporting Flink SQL with native Thrift support.
We have some prototypes already running in development settings and plan to
move forward on this approach.

In the long run, we thought out of box thrift format support would benefit
other folks as well. So the question is if there is already some effort
around this space we can sync with?

Chen
Pinterest Data


Re: [Discuss] IntervalJoin one side sorted cache

2020-03-06 Thread Chen Qin
Thanks for the expedited response!

   - The cache in IntervalJoin operator maybe not necessary for statebackend
   which has the ability to cache itself

I guess it's yes and no. If we have something like FsBackend, all states
were stored in memory so we don't need this cache. But, we need to have
incremental checkpoiting support for large window (several TB)
Ideally, we need to have a better way to tell state backend what to cache
(align with event time expiration) versus what should go straight to disk
(~200x read speed difference) What do you think?

   - We need a tradeoff when introducing a cache in the operator. Because
   if two input side of IntervalJoin come evenly intersecting with each other.
   e.g (left, right, left, right), the cache would be invalid and reload
   repeatedly.

It's a valid concern, maybe this one side sorted cache is not that useful
to some use cases. Should have flag to turn off by default.
I wish there is a way to tell statebackend which to cache and how long
(event time) to cache without adding into operator. For small stateful
jobs, we can give free memory to all. For large stateful jobs, flink
probably needs some support/control from statebackend.


   - I'm a little confused. If user could tolerant clear up state earlier,
   why not just change [lowerbound, upbound] shorter directly?

It's asymetric cleanup, we handle two streams cleanup not in same way as we
do by default. The rationale is one stream may be offering meta data like
(all social media posts updates) while other is all impressions and likes
over certain post updates. A post can update 10-100 times over a couple of
days and get millions of impressions. We want to optimize lookup from
impression stream while cover the fact that upstream might delay couple of
hours and make recent updates avaliable. In the end, we still want to find
all impressions over give post update in the last 28 days for example.


Thanks,
Chen


On Fri, Mar 6, 2020 at 7:46 AM 张静(槿瑜)  wrote:

> Hi, chenqi.
>   I'm interested in your optimizations about IntervalJoin. And I also have
> a few questions about two points:
>   1. Introduce cache to speed up lookup operation.
>   * The cache in IntervalJoin operator maybe not necessary for
> statebackend which has the ability to cache itself.
>   * We need a tradeoff when introduce cache in operator. Because if
> two input side of IntervalJoin come evenly intersecting with each other.
> e.g (left, right, left, right), the cache would be invalid and reload
> repeatedly.
>
> 2. Add config to earlier cleanup state.
>* I'm a little confused. If user could tolerant clear up state earlier,
> why not just change [lowerbound, upbound] shorter directly?
> Looking forward to your response, thanks.
>
>
> ----------
> 发件人:Chen Qin 
> 发送时间:2020年3月6日(星期五) 10:54
> 收件人:dev 
> 主 题:[Discuss] IntervalJoin one side sorted cache
>
> Hi there,
>
> I would like kick off discussion on
> https://issues.apache.org/jira/browse/FLINK-16392 and discuss what is best
> way moving forward. Here is problem statement and proposal we have in mind.
> Please kindly provide feedback.
>
> Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left
> and right buffer. This design choice reduce minimize heap memory footprint
> while bounded process throughput of single taskmanager iops to rocksdb
> access speed. Here at Pinterest, we have some large use cases where
> developers join large and slow evolving data stream (e.g post updates in
> last 28 days) with web traffic datastream (e.g post views up to 28 days
> after given update).
>
> This post some challenge to current implementation of intervaljoin
>
>- partitioned rocksdb needs to keep both updates and views for 28 days,
>large buffer(especially view stream side) cause rocksdb slow down and
> lead
>to overall interval join performance degregate quickly as state build
> up.
>
>
>- view stream is web scale, even after setting large parallelism it can
>put lot of pressure on each subtask and backpressure entire job
>
> In proposed implementation, we plan to introduce two changes
>
>- support ProcessJoinFunction settings to opt-in earlier cleanup time of
>right stream(e.g view stream don't have to stay in buffer for 28 days
> and
>wait for update stream to join, related post views happens after update
> in
>event time semantic) This optimization can reduce state size to improve
>rocksdb throughput. If extreme case, user can opt-in in flight join and
>skip write into right view stream buffer to save iops budget on each
> subtask
>
>
>- support ProcessJoinFunction settings to expedite keyed lookup of slow
>changing stream. Instead of every post

[Discuss] IntervalJoin one side sorted cache

2020-03-05 Thread Chen Qin
Hi there,

I would like kick off discussion on
https://issues.apache.org/jira/browse/FLINK-16392 and discuss what is best
way moving forward. Here is problem statement and proposal we have in mind.
Please kindly provide feedback.

Native intervaljoin rely on statebackend(e.g rocksdb) to insert/fetch left
and right buffer. This design choice reduce minimize heap memory footprint
while bounded process throughput of single taskmanager iops to rocksdb
access speed. Here at Pinterest, we have some large use cases where
developers join large and slow evolving data stream (e.g post updates in
last 28 days) with web traffic datastream (e.g post views up to 28 days
after given update).

This post some challenge to current implementation of intervaljoin

   - partitioned rocksdb needs to keep both updates and views for 28 days,
   large buffer(especially view stream side) cause rocksdb slow down and lead
   to overall interval join performance degregate quickly as state build up.


   - view stream is web scale, even after setting large parallelism it can
   put lot of pressure on each subtask and backpressure entire job

In proposed implementation, we plan to introduce two changes

   - support ProcessJoinFunction settings to opt-in earlier cleanup time of
   right stream(e.g view stream don't have to stay in buffer for 28 days and
   wait for update stream to join, related post views happens after update in
   event time semantic) This optimization can reduce state size to improve
   rocksdb throughput. If extreme case, user can opt-in in flight join and
   skip write into right view stream buffer to save iops budget on each subtask


   - support ProcessJoinFunction settings to expedite keyed lookup of slow
   changing stream. Instead of every post view pull post updates from rocksdb.
   user can opt-in and having one side buffer cache available in memory. If a
   given post update, cache load recent views from right buffer and use
   sortedMap to find buckets. If a given post view, cache load recent updates
   from left buffer to memory. When another view for that post arrives, flink
   save cost of rocksdb access.

Thanks,
Chen Qin


[jira] [Created] (FLINK-16392) oneside sorted cache in intervaljoin

2020-03-02 Thread Chen Qin (Jira)
Chen Qin created FLINK-16392:


 Summary: oneside sorted cache in intervaljoin
 Key: FLINK-16392
 URL: https://issues.apache.org/jira/browse/FLINK-16392
 Project: Flink
  Issue Type: Improvement
  Components: API / DataStream
Affects Versions: 1.10.0
Reporter: Chen Qin
 Fix For: 1.11.0


IntervalJoin is getting lots of usecases. Those use cases shares following 
similar pattern
 * left stream  pulled from static dataset periodically
 * lookup time range is very large (days weeks)
 * right stream is web traffic with high QPS

In current interval join implementation, we treat both streams equal. 
Specifically as rocksdb fetch and update getting more expensive, performance 
took hit and unblock large use cases.

In proposed implementation, we plan to introduce two changes
 * allow user opt-in in ProcessJoinFunction if they want to skip scan when 
intervaljoin operator receive events from left stream(static data set)
 * build sortedMap from otherBuffer of each seen key granularity
 ** expedite right stream lookup of left buffers without access rocksdb 
everytime
 ** if a key see event from left side, it cleanup buffer and load buffer from 
right side

 

Open discussion
 * how to control cache size?
 ** TBD
 * how to avoid dirty cache
 ** if a given key see insertion from other side, cache will be cleared for 
that key and rebuild. This is a small overhead to populate cache, compare with 
current rocksdb implemenation, we need do full loop at every event. It saves on 
bucket scan logic.
 * what happens when checkpoint/restore
 ** state still persists in statebackend, clear cache and rebuild of each new 
key seen.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: FsStateBackend vs RocksDBStateBackend

2020-02-29 Thread Chen Qin
Hi Robert,

Comments in line

> On Feb 28, 2020, at 2:51 AM, Robert Metzger  wrote:
> 
> Sorry for the late reply.
> 
> There's not much you can do at the moment, as Flink needs to sync on the 
> checkpoint barriers.
> There's something in the making for addressing the issue soon: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-76:+Unaligned+Checkpoints>
If I understand correctly, we need to make sure when snapshot state called, 
inflight records between barriers from different channels needs to be 
"materialized" (processed and pushed to down stream before snapshot called)

Be more specifically, if we honor watermark progression and operator snapshot 
(barriers aligned), drain out of order processed records before actually 
snapshot. Will it work correctly? Detail here 
https://github.com/apache/flink/pull/11267/files 
<https://github.com/apache/flink/pull/11267/files>

> Did you try out using the FsStateBackend?
It’s skewed key causing rocksdb update states slow as far as we know, Ran 
probably can share more in flinkforward 2020 :)
> If you are going to stick with rocks, I would recommend to understand what 
> exactly causes the poor performance. I see the following areas:
> - serialization costs
> - disk / ssd speed
> - network speed (during checkpoint creation) (as Yu mentioned)
> - if you have asynchronous checkpoints enabled, they will also slow down the 
> processing.
> 
> 
> On Sun, Feb 23, 2020 at 8:27 PM Chen Qin  <mailto:c...@pinterest.com>> wrote:
> Just follow up on this thread, it accurately caused by key skew. Given single 
> subtask is single threaded 5% of slow processing cause entire job back 
> pressures on rocksdbstatebackend.
> 
> Robert,
> 
> What is blocking us enable multi threading in processor? I recall it has 
> something todo with barrier and record in order. Can you share more insights 
> on this?
> 
> Chen
> 
>> On Feb 21, 2020, at 4:56 AM, Robert Metzger > <mailto:rmetz...@apache.org>> wrote:
>> 
>> 
>> I would try the FsStateBackend in this scenario, as you have enough memory 
>> available.
>> 
>> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang > <mailto:ranzh...@pinterest.com>> wrote:
>> Hi Gordon,
>> 
>> Thanks for your reply! Regarding state size - we are at 200-300gb but we 
>> have 120 parallelism which will make each task handle ~2 - 3 gb state. (when 
>> we submit the job we are setting tm memory to 15g.) In this scenario what 
>> will be the best fit for statebackend? 
>> 
>> Thanks,
>> Ran
>> 
>> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai > <mailto:tzuli...@apache.org>> wrote:
>> Hi Ran,
>> 
>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang > <mailto:ranzh...@pinterest.com>> wrote:
>> Hi all,
>> 
>> We have a Flink app that uses a KeyedProcessFunction, and in the function it 
>> requires a ValueState(of TreeSet) and the processElement method needs to 
>> access and update it. We tried to use RocksDB as our stateBackend but the 
>> performance is not good, and intuitively we think it was because of the 
>> serialization / deserialization on each processElement call.
>> 
>> As you have already pointed out, serialization behaviour is a major 
>> difference between the 2 state backends, and will directly impact 
>> performance due to the extra runtime overhead in RocksDB.
>> If you plan to continue using the RocksDB state backend, make sure to use 
>> MapState instead of ValueState where possible, since every access to the 
>> ValueState in the RocksDB backend requires serializing / deserializing the 
>> whole value.
>> For MapState, de-/serialization happens per K-V access. Whether or not this 
>> makes sense would of course depend on your state access pattern.
>>  
>> Then we tried to switch to use FsStateBackend (which keeps the in-flight 
>> data in the TaskManager’s memory according to doc), and it could resolve the 
>> performance issue. So we want to understand better what are the tradeoffs in 
>> choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB 
>> in stable state. For now we know one benefits of RocksDB is it supports 
>> incremental checkpoint, but would love to know what else we are losing in 
>> choosing FsStateBackend.
>> 
>> As of now, feature-wise both backends support asynchronous snapshotting, 
>> state schema evolution, and access via the State Processor API.
>> In the end, the major factor for deciding between the two state backends 
>> would be your e

Large intervaljoin related question

2019-12-13 Thread Chen Qin
Hi there,

We had seen growing interest of using large window and interval join operation. 
What is recommended way of handling these use cases?(e.g DeltaLake in Spark)
After some benchmark, we found performance seems a bottleneck (still) on 
support those use cases. 
How is performance improvement https://issues.apache.org/jira/browse/FLINK-7001 
 going?
 
In tuning side, we plan to test giving larger blob cache on rocskdb side ~4GB, 
will this help?
Otherwise, we plan to write to external hive table (seems no partition 
supported yet) and run frequent ETL job there.


Thanks,
Chen

Re: set job level TTL

2019-11-27 Thread Chen Qin
Hi Andrey,

yes, I think flip 34 helps with this use case.

https://flink.apache.org/news/2019/08/22/release-1.9.0.html#stop-with-savepoint-flip-34

Thanks,
Chen


On Tue, Nov 26, 2019 at 7:31 AM Andrey Zagrebin 
wrote:

> Hi Chen,
>
> if I understand correctly, your question is not about the state with TTL,
> correct me if I am wrong.
>
> We could consider adding this feature to Flink: run x time => save point
> => shutdown job but it seems to me that it is something quite application
> specific, not sure how high priority it will get in community.
>
> Would it work for you to schedule an automated cron script to check your
> running job and do those actions over CLI or REST API?
>
> Cheers,
> Andrey
>
> On Mon, Nov 25, 2019 at 7:27 PM Yun Tang  wrote:
>
> > Hi Chen
> >
> > There existed several problems currently:
> > 1. State TTL cannot be applied to Flink SQL
> > 2. No  job-level state TTL
> > 3. Window state should not be limited to TTL if existed a job-level state
> >
> > If your goal is to achieve #2, we could introduce a configuration via
> > flink-conf.yaml so that we could also achieve #1 but leave window state
> not
> > limited to TTL.
> > I think this deserves an issue to track this request.
> >
> > Best
> > Yun Tang
> >
> > On 11/25/19, 8:04 AM, "Chen Qin"  wrote:
> >
> > I am a bit confused. Job level TTL is different from state TTL
> despite
> > share same name.
> >
> > What we want to achieve in Job TTL is basically "run x time => save
> > point
> > => shutdown job"
> >
> >
> > On Sun, Nov 24, 2019 at 9:57 AM Yun Tang  wrote:
> >
> > > Hi Chen
> > >
> > > Currently, state TTL setting is per-state scoped, and there is no
> > > job-level TTL setting so far. Do you want similar settings in
> > flink-conf to
> > > enable Flink SQL could also benefit from state TTL?
> > >
> > > Best
> > > Yun Tang
> > >
> > > On 11/25/19, 12:16 AM, "Chen Qin"  wrote:
> > >
> > > Hi there,
> > >
> > > We have use case of allowing ad hoc user submit a Flink job
> > (mostly
> > > SQL)
> > > and user come back and verify results after period of time (eg
> > 24hours)
> > > without manual intervention. The idea is to have something like
> > set job
> > > level TTL configuration to 24hours. Jobmanager will honor
> > 24hours of
> > > instrument save point and proper shutdown of flink job.
> > >
> > > Just want to know if we have job level TTL setting or parity
> > features
> > > JIRA.
> > >
> > >
> > > Best,
> > > Chen
> > >
> > >
> > >
> >
> >
> >
>


Re: set job level TTL

2019-11-24 Thread Chen Qin
I am a bit confused. Job level TTL is different from state TTL despite
share same name.

What we want to achieve in Job TTL is basically "run x time => save point
=> shutdown job"


On Sun, Nov 24, 2019 at 9:57 AM Yun Tang  wrote:

> Hi Chen
>
> Currently, state TTL setting is per-state scoped, and there is no
> job-level TTL setting so far. Do you want similar settings in flink-conf to
> enable Flink SQL could also benefit from state TTL?
>
> Best
> Yun Tang
>
> On 11/25/19, 12:16 AM, "Chen Qin"  wrote:
>
> Hi there,
>
> We have use case of allowing ad hoc user submit a Flink job (mostly
> SQL)
> and user come back and verify results after period of time (eg 24hours)
> without manual intervention. The idea is to have something like set job
> level TTL configuration to 24hours. Jobmanager will honor 24hours of
> instrument save point and proper shutdown of flink job.
>
> Just want to know if we have job level TTL setting or parity features
> JIRA.
>
>
> Best,
> Chen
>
>
>


set job level TTL

2019-11-24 Thread Chen Qin
Hi there,

We have use case of allowing ad hoc user submit a Flink job (mostly SQL)
and user come back and verify results after period of time (eg 24hours)
without manual intervention. The idea is to have something like set job
level TTL configuration to 24hours. Jobmanager will honor 24hours of
instrument save point and proper shutdown of flink job.

Just want to know if we have job level TTL setting or parity features JIRA.


Best,
Chen


Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-04-28 Thread Chen Qin
Just share some of insights from operating SparkML side at scale
- map reduce may not best way to iterative sync partitioned workers. 
- native hardware accelerations is key to adopt rapid changes in ML 
improvements in foreseeable future.

Chen

On Apr 29, 2019, at 11:02, jincheng sun  wrote:
> 
> Hi Shaoxuan,
> 
> Thanks for doing more efforts for the enhances of the scalability and the
> ease of use of Flink ML and make it one step further. Thank you for sharing
> a lot of context information.
> 
> big +1 for this proposal!
> 
> Here only one suggestion, that is, It has been a short time until the
> release of flink-1.9, so I recommend It's better to add a detailed
> implementation plan to FLIP and google doc.
> 
> What do you think?
> 
> Best,
> Jincheng
> 
> Shaoxuan Wang  于2019年4月29日周一 上午10:34写道:
> 
>> Hi everyone,
>> 
>> Weihua has proposed to rebuild Flink ML pipeline on top of TableAPI several
>> months ago in this mail thread:
>> 
>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>> 
>> Luogen, Becket, Xu, Weihua and I have been working on this proposal
>> offline in
>> the past a few months. Now we want to share the first phase of the entire
>> proposal with a FLIP. In this FLIP-39, we want to achieve several things
>> (and hope those can be accomplished and released in Flink-1.9):
>> 
>>   -
>> 
>>   Provide a new set of ML core interface (on top of Flink TableAPI)
>>   -
>> 
>>   Provide a ML pipeline interface (on top of Flink TableAPI)
>>   -
>> 
>>   Provide the interfaces for parameters management and pipeline/mode
>>   persistence
>>   -
>> 
>>   All the above interfaces should facilitate any new ML algorithm. We will
>>   gradually add various standard ML algorithms on top of these new
>> proposed
>>   interfaces to ensure their feasibility and scalability.
>> 
>> 
>> Part of this FLIP has been present in Flink Forward 2019 @ San Francisco by
>> Xu and Me.
>> 
>> 
>> https://sf-2019.flink-forward.org/conference-program#when-table-meets-ai--build-flink-ai-ecosystem-on-table-api
>> 
>> 
>> https://sf-2019.flink-forward.org/conference-program#high-performance-ml-library-based-on-flink
>> 
>> You can find the videos & slides at
>> https://www.ververica.com/flink-forward-san-francisco-2019
>> 
>> The design document for FLIP-39 can be found here:
>> 
>> 
>> https://docs.google.com/document/d/1StObo1DLp8iiy0rbukx8kwAJb0BwDZrQrMWub3DzsEo
>> 
>> 
>> I am looking forward to your feedback.
>> 
>> Regards,
>> 
>> Shaoxuan
>> 


Re: Hbase state backend in Flink

2018-12-27 Thread Chen Qin
Hi Yu,

Very cool! I might be out of dated of what’s new in Flink already… 
Just wonder If there are efforts to support seconds level barrier alignment?

Chen

> On Dec 27, 2018, at 23:26, Yu Li  wrote:
> 
> FWIW, one major advantage of adopting HBase as Flink statebackend is to
> support direct read/write on DFS, so as to disaggregate storage and compute
> (DisAgg).  DisAgg has several benefits, such as supporting elastic
> computing in cloud, much better (order of magnitude) recovery speed when
> rescaling up/down (as Gyula also mentioned), etc. and we could eliminate
> the performance regression compared to local RW through techniques like
> adding a local L2 cache. More information please refer to our talk
> <https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf>
> at this year's Flink Forward China, and we could discuss more in another
> thread if interested.
> 
> Back to @Naveen's question here, we need to make HBase supporting embedded
> mode first before adopting it as Flink statebackend. We have done some
> initial work and please refer to HBASE-17743
> <https://issues.apache.org/jira/browse/HBASE-17743> and the design doc
> there for more details. And for sure we will upstream our work when ready
> to (smile).
> 
> Best Regards,
> Yu
> 
> 
> On Fri, 28 Dec 2018 at 13:12, Chen Qin  wrote:
> 
>> Hi Naveen,
>> 
>> AFAIK, there are two level of storage in typical statebackend
>> (local/remote). I think it kinda similar to what PC main memory and disk
>> analogy.
>> 
>> Take RocksDB Statebackend as example, window state (typical very large
>> ListState) persisted in partitioned local rocksdb files, adding element to
>> window is localized and cheap.When checkpoint starts, each of those rocksdb
>> do upload to corresponding HDFS directories separately.This is good in a
>> sense when any intermediate states between two successful checkpoints can
>> be overwritten and local snapshots can be done cheaply and asynchronously.
>> 
>> I heard folks tried to build mysqlbackend(deprecated), remote rocksdb as
>> service backend(hard to scale and performance bottleneck) , Cassandra(hard
>> to snapshot). All of which shares same trait on lack of local
>> parallelizable snapshot semantic.
>> 
>> Hope this helps!
>> Chen
>> 
>> On Thu, Dec 27, 2018 at 8:27 AM miki haiat  wrote:
>> 
>>> Did try to use rocksdb[1] as state backend?
>>> 
>>> 
>>> 1.
>>> 
>>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>>> 
>>> 
>>> On Thu, 27 Dec 2018, 18:17 Naveen Kumar >> .invalid
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I am exploring if we can plugin hbase as state backend in Flink. We
>> have
>>>> need for streaming jobs with large window states, high throughput and
>>>> reliability.
>>>> 
>>>> I wanted to know if implementing Flink backend in Hbase or other
>>>> distributed KV store is possible. Any documentation or pointers will be
>>>> helpful.
>>>> 
>>>> Thanks,
>>>> Naveen
>>>> 
>>> 
>> 



Re: Hbase state backend in Flink

2018-12-27 Thread Chen Qin
Hi Naveen,

AFAIK, there are two level of storage in typical statebackend
(local/remote). I think it kinda similar to what PC main memory and disk
analogy.

Take RocksDB Statebackend as example, window state (typical very large
ListState) persisted in partitioned local rocksdb files, adding element to
window is localized and cheap.When checkpoint starts, each of those rocksdb
do upload to corresponding HDFS directories separately.This is good in a
sense when any intermediate states between two successful checkpoints can
be overwritten and local snapshots can be done cheaply and asynchronously.

I heard folks tried to build mysqlbackend(deprecated), remote rocksdb as
service backend(hard to scale and performance bottleneck) , Cassandra(hard
to snapshot). All of which shares same trait on lack of local
parallelizable snapshot semantic.

Hope this helps!
Chen

On Thu, Dec 27, 2018 at 8:27 AM miki haiat  wrote:

> Did try to use rocksdb[1] as state backend?
>
>
> 1.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
>
> On Thu, 27 Dec 2018, 18:17 Naveen Kumar  .invalid
> wrote:
>
> > Hi,
> >
> > I am exploring if we can plugin hbase as state backend in Flink. We have
> > need for streaming jobs with large window states, high throughput and
> > reliability.
> >
> > I wanted to know if implementing Flink backend in Hbase or other
> > distributed KV store is possible. Any documentation or pointers will be
> > helpful.
> >
> > Thanks,
> > Naveen
> >
>


Re: [DISCUSS] Embracing Table API in Flink ML

2018-11-20 Thread Chen Qin
Hi Yun,

Very excited to see Flink ML forward! There are many touch points your
document touched. I couldn't agree more the value of having a (unified)
table API could bring to Flink ecosystem towards running ML workload. Most
ML pipelines we observed starts from single box python scripts or adhoc
tools researcher run to train model on powerful machine. When that proves
successful, they need to hook up with data warehouse and extract features
(SQL kick in). In training phase, the landscape is very segmented. Small to
median sized model can be trained on JVM, while large/deep model needs to
optimize operator per iteration data random shuffle (SGD based DL) often
ends up in JNI/ C++/Cuda and task scheduling.(gang scheduled instead of
hack around map-reduce)

Hope it makes sense. BTW, xgboost (most popular ML competition framework)
has very primitive flink support, might worth check out.
https://github.com/dmlc/xgboost

Chen

On Tue, Nov 20, 2018 at 6:13 PM Weihua Jiang  wrote:

> Hi Yun,
>
> Can't wait to see your design.
>
> Thanks
> Weihua
>
> Yun Gao  于2018年11月21日周三 上午12:43写道:
>
> > Hi Weihua,
> >
> > Thanks for the exciting proposal!
> >
> > I have quickly read through it,  and I really appropriate the idea of
> > providing the ML Pipeline API similar to the commonly used library
> > scikit-learn, since it greatly reduce the learning cost for the AI
> > engineers to transfer to the Flink platform.
> >
> > Currently we are also working on a related issue, namely enhancing
> the
> > stream iteration of Flink to support both SGD and online learning, and it
> > also support batch training as a special case. we have had a rough design
> > and will start a new discussion in the next few days. I think the
> enhanced
> > stream iteration will help to implement Estimators directly in Flink, and
> > it may help to simplify the online learning pipeline by eliminating the
> > requirement to load the models from external file systems.
> >
> > I will read the design doc more carefully. Thanks again for sharing
> > the design doc!
> >
> > Yours sincerely
> > Yun Gao
> >
> >
> > --
> > 发件人:Weihua Jiang 
> > 发送时间:2018年11月20日(星期二) 20:53
> > 收件人:dev 
> > 主 题:[DISCUSS] Embracing Table API in Flink ML
> >
> > ML Pipeline is the idea brought by Scikit-learn
> > . Both Spark and Flink has borrowed
> this
> > idea and made their own implementations [Spark ML Pipeline
> > , Flink ML
> Pipeline
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/libs/ml/pipelines.html
> > >].
> >
> >
> >
> > NOTE: though I am using the term "ML", ML Pipeline shall apply to both ML
> > and DL pipelines.
> >
> >
> > ML Pipeline is quite helpful for model composition (i.e. using model(s)
> for
> > feature engineering) . And it enables logic reuse in train and inference
> > phases (via pipeline persistence and load), which is essential for AI
> > engineering. ML Pipeline can also be a good base for Flink based AI
> > engineering platform if we can make ML Pipeline have good tooling support
> > (i.e. meta data human readable).
> >
> >
> > As the Table API will be the unified high level API for both stream and
> > batch processing, I want to initiate the design discussion of new Table
> > based Flink ML Pipeline.
> >
> >
> > I drafted a design document [1] for this discussion. This design tries to
> > create a new ML Pipeline implementation so that concrete ML/DL algorithms
> > can fit to this new API to achieve interoperability.
> >
> >
> > Any feedback is highly appreciated.
> >
> >
> > Thanks
> >
> > Weihua
> >
> >
> > [1]
> >
> >
> https://docs.google.com/document/d/1PLddLEMP_wn4xHwi6069f3vZL7LzkaP0MN9nAB63X90/edit?usp=sharing
> >
> >
>


Re: [ANNOUNCE] New committer Gary Yao

2018-09-07 Thread Chen Qin
Congrats!

Chen

> On Sep 7, 2018, at 10:51, Xingcan Cui  wrote:
> 
> Congratulations, Gary!
> 
> Xingcan
> 
>> On Sep 7, 2018, at 11:20 PM, Hequn Cheng  wrote:
>> 
>> Congratulations Gary!
>> 
>> Hequn
>> 
>>> On Fri, Sep 7, 2018 at 11:16 PM Matthias J. Sax  wrote:
>>> 
>>> Congrats!
>>> 
 On 09/07/2018 08:15 AM, Timo Walther wrote:
 Congratulations, Gary!
 
 Timo
 
 
> Am 07.09.18 um 16:46 schrieb Ufuk Celebi:
> Great addition to the committers. Congrats, Gary!
> 
> – Ufuk
> 
> 
> On Fri, Sep 7, 2018 at 4:45 PM, Kostas Kloudas
>  wrote:
>> Congratulations Gary! Well deserved!
>> 
>> Cheers,
>> Kostas
>> 
>>> On Sep 7, 2018, at 4:43 PM, Fabian Hueske  wrote:
>>> 
>>> Congratulations Gary!
>>> 
>>> 2018-09-07 16:29 GMT+02:00 Thomas Weise :
>>> 
 Congrats, Gary!
 
 On Fri, Sep 7, 2018 at 4:17 PM Dawid Wysakowicz
 
 wrote:
 
> Congratulations Gary! Well deserved!
> 
>> On 07/09/18 16:00, zhangmingleihe wrote:
>> Congrats Gary!
>> 
>> Cheers
>> Minglei
>> 
>>> 在 2018年9月7日,下午9:59,Andrey Zagrebin
>>>  写道:
>>> 
>>> Congratulations Gary!
>>> 
 On 7 Sep 2018, at 15:45, Stefan Richter
  wrote:
 Congrats Gary!
 
> Am 07.09.2018 um 15:14 schrieb Till Rohrmann
>  :
> Hi everybody,
> 
> On behalf of the PMC I am delighted to announce Gary Yao as a
>>> new
> Flink
> committer!
> 
> Gary started contributing to the project in June 2017. He helped
 with
> the
> Flip-6 implementation, implemented many of the new REST
>>> handlers,
> fixed
> Mesos issues and initiated the Jepsen-based distributed test
> suite
> which
> uncovered several serious issues. Moreover, he actively helps
> community
> members on the mailing list and with PR reviews.
> 
> Please join me in congratulating Gary for becoming a Flink
 committer!
> Cheers,
> Till
> 
 
>>> 
>>> 
> 


Re: is Flink's recovery speed still slow?

2018-07-24 Thread Chen Qin
As far as I learned from folks with better understanding than myself , barrier 
alignment might be only path to get deterministic output. 

Any state or outcome between barrier alignments requires second thought(like 
UDP packages from network). Currently, alignment is used only do heavyweight 
checkpointing. If folks decided to improve algorithm and use in other ways like 
auto scaling or secondary task shadowing is still TBD.

Chen

> On Jul 24, 2018, at 18:57, vino yang  wrote:
> 
> Hi jiaxl,
> 
> The paper you mentioned was published at 2017. I think it doesn't have much
> reference value now.
> Over time, both frameworks are constantly evolving.
> At the end of May this year, Flink has supported the major feature of local
> recovery in the latest release of version 1.5.
> This greatly improves the speed of recovery.
> Flink has not stopped the improvement of state recovery and fault
> tolerance.
> I think you can verify it yourself.
> 
> Thanks, vino.
> 
> 
> 2018-07-24 23:15 GMT+08:00 jiaxl :
> 
>> From conclusion of this paper  https://dl.acm.org/citation.cfm?id=3132750
>>   , Flink's recovery
>> speed is slower than that of Spark Streaming, which will be a problem in
>> large scale deployment where fault happens frequently.
>> I'd like to know whether this is still a problem or not. Any advices are
>> appreciated.
>> 
>> 
>> 
>> --
>> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>> 


Re: [ANNOUNCE] New committer: Sihua Zhou

2018-06-22 Thread Chen Qin
Congrats!

> On Jun 22, 2018, at 9:48 AM, Ted Yu  wrote:
> 
> Congratulations Sihua!
> 
>> On Fri, Jun 22, 2018 at 6:42 AM, zhangminglei <18717838...@163.com> wrote:
>> 
>> Congrats! Sihua
>> 
>> Cheers
>> Minglei.
>> 
>>> 在 2018年6月22日,下午9:17,Till Rohrmann  写道:
>>> 
>>> Hi everybody,
>>> 
>>> On behalf of the PMC I am delighted to announce Sihua Zhou as a new Flink
>>> committer!
>>> 
>>> Sihua has been an active member of our community for several months.
>> Among
>>> other things, he helped developing Flip-6, improved Flink's state
>> backends
>>> and fixed a lot of major and minor issues. Moreover, he is helping the
>>> Flink community reviewing PRs, answering users on the mailing list and
>>> proposing new features.
>>> 
>>> Please join me in congratulating Sihua for becoming a Flink committer!
>>> 
>>> Cheers,
>>> Till
>> 
>> 
>> 


Re: [ANNOUNCE] New committer: Haohui Mai

2017-11-01 Thread Chen Qin
Congratulations!

On Wed, Nov 1, 2017 at 2:41 AM, Aljoscha Krettek 
wrote:

> Congratulations! 
>
> > On 1. Nov 2017, at 10:13, Shaoxuan Wang  wrote:
> >
> > Congratulations!
> >
> > On Wed, Nov 1, 2017 at 4:36 PM, Till Rohrmann 
> wrote:
> >
> >> Congrats and welcome on board :-)
> >>
> >> On Wed, Nov 1, 2017 at 9:14 AM, Fabian Hueske 
> wrote:
> >>
> >>> Hi everybody,
> >>>
> >>> On behalf of the PMC I am delighted to announce Haohui Mai as a new
> Flink
> >>> committer!
> >>>
> >>> Haohui has been an active member of our community for several months.
> >>> Among other things, he made major contributions in ideas and code to
> the
> >>> SQL and Table APIs.
> >>>
> >>> Please join me in congratulating Haohui for becoming a Flink committer!
> >>>
> >>> Cheers,
> >>> Fabian
> >>>
> >>
>
>


[jira] [Created] (FLINK-7954) sideoutput in async function

2017-10-31 Thread Chen Qin (JIRA)
Chen Qin created FLINK-7954:
---

 Summary: sideoutput in async function
 Key: FLINK-7954
 URL: https://issues.apache.org/jira/browse/FLINK-7954
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.2
 Environment: similar to FLINK-7635,adding support of sideoutput to 
asynFunction 

Reporter: Chen Qin
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Unable to write snapshots to S3 on EMR

2017-10-07 Thread Chen Qin
Attached my side project verified working to deploy jobmanager and
taskmanager as stateless service(non yarn/mesos), configuration here

https://github.com/chenqin/flink-jar/tree/master/config/hadoop

more detail here
https://github.com/chenqin/flink-jar/blob/master/src/main/java/FlinkBootstrap.java#L49

On Fri, Oct 6, 2017 at 10:26 PM, Bowen Li  wrote:

> Hi Andy,
>
> I believe it's because you didn't set your s3 impl correctly. Try to set
> your core-site.xml by following https://ci.apache.org/
> projects/flink/flink-docs-release-1.4/ops/deployment/
> aws.html#s3afilesystem-
> recommended
>
> Bowen
>
> On Fri, Oct 6, 2017 at 7:59 AM, Andy M.  wrote:
>
> > Hi Till,
> >
> > Seems like everything is in line there.  hadoop-common.jar ->
> > hadoop-common-2.7.3-amzn-3.jar
> >
> > And when i decompiled that jar I see  public void
> addResource(Configuration
> > conf) in org/apache/hadoop/conf/Configuration.java
> >
> > I agree that an incorrect version of the jar is probably being run, is
> > there a way to limit the classpath for the TaskManager when starting the
> > job?
> >
> > Thank you
> >
> > On Fri, Oct 6, 2017 at 6:49 AM, Till Rohrmann 
> > wrote:
> >
> > > Hi Andy,
> > >
> > > could you check which Hadoop version this jar
> > > /usr/lib/hadoop/hadoop-common.jar is? Maybe also checking whether the
> > > contained hadoop Configuration class has the method
> > > Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V.
> > Maybe
> > > this jar is the culprit because it comes from a different Hadoop
> version.
> > >
> > > Cheers,
> > > Till
> > > ​
> > >
> > > On Thu, Oct 5, 2017 at 4:22 PM, Andy M.  wrote:
> > >
> > > > Hi Till,
> > > >
> > > > I believe this is what you are looking for, classpath is much bigger
> > for
> > > > the task manager.  I can also post the whole log file if needed:
> > > >
> > > > 2017-10-05 14:17:53,038 INFO  org.apache.flink.yarn.
> > > YarnTaskManagerRunner
> > > >  -  Classpath:
> > > > flink-consumer.jar:lib/flink-dist_2.11-1.3.2.jar:lib/flink-
> > > > python_2.11-1.3.2.jar:lib/flink-shaded-hadoop2-uber-1.3.
> > > > 2.jar:lib/log4j-1.2.17.jar:lib/slf4j-log4j12-1.7.7.jar:
> > > > log4j.properties:logback.xml:flink.jar:flink-conf.yaml::/
> > > > etc/hadoop/conf:/usr/lib/hadoop/hadoop-common-2.7.3-
> > > > amzn-3-tests.jar:/usr/lib/hadoop/hadoop-annotations-2.7.
> > > > 3-amzn-3.jar:/usr/lib/hadoop/hadoop-distcp.jar:/usr/lib/
> > > > hadoop/hadoop-auth-2.7.3-amzn-3.jar:/usr/lib/hadoop/hadoop-
> > > > nfs-2.7.3-amzn-3.jar:/usr/lib/hadoop/hadoop-streaming-2.7.3-
> > > > amzn-3.jar:/usr/lib/hadoop/hadoop-ant-2.7.3-amzn-3.jar:/
> > > > usr/lib/hadoop/hadoop-distcp-2.7.3-amzn-3.jar:/usr/lib/
> > > > hadoop/hadoop-datajoin.jar:/usr/lib/hadoop/hadoop-
> > > > streaming.jar:/usr/lib/hadoop/hadoop-common.jar:/usr/lib/
> > > > hadoop/hadoop-ant.jar:/usr/lib/hadoop/hadoop-sls.jar:/
> > > > usr/lib/hadoop/hadoop-azure-2.7.3-amzn-3.jar:/usr/lib/
> > > > hadoop/hadoop-nfs.jar:/usr/lib/hadoop/hadoop-extras-2.7.
> > > > 3-amzn-3.jar:/usr/lib/hadoop/hadoop-gridmix.jar:/usr/lib/
> > > > hadoop/hadoop-common-2.7.3-amzn-3.jar:/usr/lib/hadoop/
> > > > hadoop-annotations.jar:/usr/lib/hadoop/hadoop-openstack-2.
> > > > 7.3-amzn-3.jar:/usr/lib/hadoop/hadoop-archives-2.7.3-
> > > > amzn-3.jar:/usr/lib/hadoop/hadoop-azure.jar:/usr/lib/
> > > > hadoop/hadoop-extras.jar:/usr/lib/hadoop/hadoop-openstack.
> > > > jar:/usr/lib/hadoop/hadoop-rumen.jar:/usr/lib/hadoop/
> > > > hadoop-aws-2.7.3-amzn-3.jar:/usr/lib/hadoop/hadoop-
> > > > datajoin-2.7.3-amzn-3.jar:/usr/lib/hadoop/hadoop-
> > > > archives.jar:/usr/lib/hadoop/hadoop-aws.jar:/usr/lib/
> > > > hadoop/hadoop-auth.jar:/usr/lib/hadoop/hadoop-rumen-2.7.3-
> > > > amzn-3.jar:/usr/lib/hadoop/hadoop-sls-2.7.3-amzn-3.jar:/
> > > > usr/lib/hadoop/hadoop-gridmix-2.7.3-amzn-3.jar:/usr/lib/
> > > > hadoop/lib/jettison-1.1.jar:/usr/lib/hadoop/lib/jaxb-api-2.
> > > > 2.2.jar:/usr/lib/hadoop/lib/htrace-core-3.1.0-incubating.
> > > > jar:/usr/lib/hadoop/lib/protobuf-java-2.5.0.jar:/usr/
> > > > lib/hadoop/lib/httpclient-4.5.3.jar:/usr/lib/hadoop/lib/
> > > > httpcore-4.4.4.jar:/usr/lib/hadoop/lib/snappy-java-1.0.4.
> > > > 1.jar:/usr/lib/hadoop/lib/commons-beanutils-core-1.8.0.
> > > > jar:/usr/lib/hadoop/lib/jsp-api-2.1.jar:/usr/lib/hadoop/
> > > > lib/activation-1.1.jar:/usr/lib/hadoop/lib/jersey-server-
> > > > 1.9.jar:/usr/lib/hadoop/lib/commons-beanutils-1.7.0.jar:/
> > > > usr/lib/hadoop/lib/guava-11.0.2.jar:/usr/lib/hadoop/lib/
> > > > gson-2.2.4.jar:/usr/lib/hadoop/lib/commons-digester-1.
> > > > 8.jar:/usr/lib/hadoop/lib/jackson-xc-1.9.13.jar:/usr/
> > > > lib/hadoop/lib/paranamer-2.3.jar:/usr/lib/hadoop/lib/
> > > > apacheds-i18n-2.0.0-M15.jar:/usr/lib/hadoop/lib/commons-
> > > > httpclient-3.1.jar:/usr/lib/hadoop/lib/curator-client-2.7.
> > > > 1.jar:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar:/usr/
> > > > 

Re: partial upgrade

2017-10-03 Thread Chen Qin
Hi Timo,

Thanks for update. Flink pipeline deployment is very ad hoc and hard to
maintain as a service platform.
Each team may update their pipeline frequently which was completely
blackbox and managed. Idea situation would be have a repo where all code
lives and mapped to running pipelines in configuration. When new code
landed ( without break topology compatibility), job manager should be able
notified to pick up and load new class.

Is there any doc I can follow to wire userclassloader up to prototype :)

Thanks,
Chen

On Mon, Oct 2, 2017 at 2:20 AM, Timo Walther <twal...@apache.org> wrote:

> Hi Chen,
>
> I think in a long-term perspective it makes sense to support things like
> this. The next big step is dynamic scaling without stopping the execution.
> Partial upgrades could be addressed afterwards, but I'm not aware of any
> plans.
>
> Until then, I would recommend a different architecture by using connect()
> and stream in a new logic dynamically. This is especially interesting for
> ML models etc.
>
> Regards,
> Timo
>
>
> Am 10/1/17 um 3:03 AM schrieb Chen Qin:
>
>> Hi there,
>>
>> So far, flink job is interpreted and deployed during bootstrap phase. Once
>> pipeline runs, it's very hard to do partial upgrade without stop
>> execution.
>> (like savepoint is heavy) Is there any plan to allow upload annotated jar
>> package which hints which stream tasks implementation CAN BE partial
>> upgraded after next checkpoint succeed without worry about backfill.
>>
>>
>> Thanks,
>> Chen
>>
>>
>


partial upgrade

2017-09-30 Thread Chen Qin
Hi there,

So far, flink job is interpreted and deployed during bootstrap phase. Once
pipeline runs, it's very hard to do partial upgrade without stop execution.
(like savepoint is heavy) Is there any plan to allow upload annotated jar
package which hints which stream tasks implementation CAN BE partial
upgraded after next checkpoint succeed without worry about backfill.


Thanks,
Chen


[jira] [Created] (FLINK-7635) support sideOutput in ProcessWindowFunciton & ProcessAllWindowFunction

2017-09-17 Thread Chen Qin (JIRA)
Chen Qin created FLINK-7635:
---

 Summary: support sideOutput in ProcessWindowFunciton & 
ProcessAllWindowFunction
 Key: FLINK-7635
 URL: https://issues.apache.org/jira/browse/FLINK-7635
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Scala API
Reporter: Chen Qin
Priority: Minor
 Fix For: 1.4.0, 1.3.3


[FLINK-4460|https://issues.apache.org/jira/browse/FLINK-4460] only implemented 
output to ProcessFunction Context. It would be nice to add support to 
ProcessWindow and ProcessAllWindow functions as well. [email 
threads|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/question-on-sideoutput-from-ProcessWindow-function-td15500.html]

[~aljoscha] I thought this is good warm up task for ppl to learn how window 
function works in general. Otherwise feel free to assign back to me.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] Flink 1.4 and time based release

2017-08-22 Thread Chen Qin
I would be great to avoid immediate 1.x1 bug fixing release. It cause confusion 
and raise quality concerns.

Also, is there already way to communicate with Amazon EMR for latest release 
speedy available? I may try to find someone work there is needed.

Thanks
Chen

> On Aug 22, 2017, at 9:32 AM, Stephan Ewen  wrote:
> 
> Hi all!
> 
> I want to bring up this discussion because we are approaching the date when
> there would be a feature freeze following the time based release schedule.
> 
> To make it short, I would suggest to not follow the time-based schedule for
> that release. There are a bunch of reasons bringing me to that view:
> 
>  - 1.3.0, which was very much pushed by the time-based schedule was not
> the best release we ever made. In fact, it had quite a few open issues that
> required an immediate 1.3.1 followup and only 1.3.2 fixed some of them.
> 
>  - 1.3.2, which is in some sense what 1.3.0 should have been is only 2
> weeks back
> 
>  - The delta since the last release is still quite small. One could argue
> to make a quick release and then soon another release after that, but
> releases still tie up quite a good amount of resources, so that would
> introduce a delay for much of the ongoing work. I am doubtful if this is a
> good idea at this point.
> 
>  - The current master has still quite a bit of "ongoing work" that is not
> in perfect shape for a release, but could use some more weeks to provide
> real value to users. Examples are the dependency reworking, network stack
> enhancements, speedier state restore efforts, flip-6, exactly-once
> sinks/side-effects, and others.
> 
> 
> Alternatively, we could do what we did for 1.1 and 1.2, which is making now
> a list of features we want in the release, and then projecting based on
> that when we fork off the 1.4 release branch.
> 
> 
> What do you think?
> 
> 
> Cheers,
> Stephan


Re: Make SubmittedJobGraphStore configurable

2017-07-25 Thread Chen Qin
Hi Till,

As far as I know there is interests of keep job graphs recoverable from shared 
zk hiccups. Or standalone mode with customized leader election. 

I plan to spend a bit time prototyping back up to Amazon S3. Will keep folks 
updated as along as I got happy pass going.

Thanks,
Chen

> On Jul 25, 2017, at 6:07 AM, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> If there is a need for this, then we can definitely make this configurable.
> The interface SubmittedJobGraphStore is already there.
> 
> Cheers,
> Till
> 
> 
>> On Fri, Jul 7, 2017 at 6:32 AM, Chen Qin <qinnc...@gmail.com> wrote:
>> 
>> Sure,
>>  I would imagine couple of extra lines within flink.conf
>> ...
>> graphstore.type: customized/zookeeper
>> graphstore.class:
>> org
>> .
>> apache.flink.contrib
>> .MyS3SubmittedJobGraphStoreImp
>> graphstore.endpoint: s3.amazonaws.com
>> graphstore.path.root: s3://my root/
>> 
>> which overwrites initiation of
>> 
>> *org.apache.flink.runtime.highavailability.HighAvailabilityServices*
>> 
>> /**
>> * Gets the submitted job graph store for the job manager
>> *
>> * @return Submitted job graph store
>> * @throws Exception if the submitted job graph store could not be created
>> */
>> 
>> SubmittedJobGraphStore *getSubmittedJobGraphStore*() throws Exception;
>> 
>> In this case, user implemented their own s3 backed job graph store and
>> stores job graphs in s3 instead of zookeeper(high availability) or
>> never(nonha)
>> 
>> I find [1] is somehow related and more focus on life cycle and dependency
>> aspect of graph-store and checkpoint-store. FLINK-7106 in this case limited
>> to enable user implemented their own jobgraphstore instead of hardcoded to
>> zookeeper.
>> 
>> Thanks,
>> Chen
>> 
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-6626
>> 
>> 
>>> On Thu, Jul 6, 2017 at 2:47 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>> 
>>> The sample config entries are broken into multiple lines.
>>> 
>>> Can you send the config again with one config on one line ?
>>> 
>>> Cheers
>>> 
>>>> On Wed, Jul 5, 2017 at 10:19 PM, Chen Qin <qinnc...@gmail.com> wrote:
>>>> 
>>>> Hi there,
>>>> 
>>>> I would like to propose/discuss median level refactor work to make
>>>> submittedJobGraphStore configurable and extensible.
>>>> 
>>>> The rationale behind is to allow users offload those meta data to
>> durable
>>>> cross dc read after write strong consistency storage and decouple with
>> zk
>>>> quorum.
>>>> 
>>>> 
>>>> https://issues.apache.org/jira/browse/FLINK-7106
>>>> 
>>>> <https://issues.apache.org/jira/browse/FLINK-7106>
>>>> New configurable setting in flink.conf
>>>>  looks like following
>>>> 
>>>> g
>>>> raph
>>>> -s
>>>> tore:
>>>> customized/zookeeper
>>>> g
>>>> raph
>>>> -s
>>>> tore.class: xx.yy.MyS3SubmittedJobGraphStoreImp
>>>> 
>>>> g
>>>> raph
>>>> -s
>>>> tore.
>>>> endpoint
>>>> : s3.amazonaws.com
>>>> g
>>>> raph
>>>> -s
>>>> tore.path.root:
>>>> s3:/
>>>> 
>>>> /
>>>> my root/
>>>> 
>>>> Thanks,
>>>> Chen
>>>> 
>>> 
>> 


Re: Make SubmittedJobGraphStore configurable

2017-07-06 Thread Chen Qin
Sure,
​ I would imagine ​couple of extra lines within flink.conf
​...​
graphstore.type: customized/zookeeper
graphstore.class:
​org​
.
​apache.flink.contrib​
.MyS3SubmittedJobGraphStoreImp
graphstore.endpoint: s3.amazonaws.com
graphstore.path.root: s3://my root/

which overwrites initiation of

*org.apache.flink.runtime.highavailability​.​HighAvailabilityServices*

/**
* Gets the submitted job graph store for the job manager
*
* @return Submitted job graph store
* @throws Exception if the submitted job graph store could not be created
*/

SubmittedJobGraphStore *getSubmittedJobGraphStore*() throws Exception;

In this case, user implemented their own s3 backed job graph store and
stores job graphs in s3 instead of zookeeper(high availability) or
never(nonha)

​I find [1] is somehow related and more focus on life cycle and dependency
aspect of graph-store and checkpoint-store. FLINK-7106 in this case limited
to enable user implemented their own jobgraphstore instead of hardcoded to
zookeeper.

Thanks,
Chen​


[1] https://issues.apache.org/jira/browse/FLINK-6626


On Thu, Jul 6, 2017 at 2:47 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> The sample config entries are broken into multiple lines.
>
> Can you send the config again with one config on one line ?
>
> Cheers
>
> On Wed, Jul 5, 2017 at 10:19 PM, Chen Qin <qinnc...@gmail.com> wrote:
>
> > ​Hi there,
> >
> > ​I would like to propose/discuss median level refactor work to make
> > submittedJobGraphStore configurable and extensible.
> >
> > The rationale behind is to allow users offload those meta data to durable
> > cross dc read after write strong consistency storage and decouple with zk
> > quorum.
> > ​
> >
> > https://issues.apache.org/jira/browse/FLINK-7106
> >
> > <https://issues.apache.org/jira/browse/FLINK-7106>
> > New configurable setting in flink.conf
> > ​ looks like following
> >
> > g​
> > raph
> > ​-s
> > tore:
> > ​customized/zookeeper
> > g​
> > raph
> > ​-s
> > tore​.class: xx.yy.MyS3SubmittedJobGraphStore​Imp
> >
> > g​
> > raph
> > ​-s
> > tore.
> > ​endpoint
> > : s3.amazonaws.com
> > g​
> > raph
> > ​-s
> > tore.path.root:
> > ​s3:/
> > ​
> > /
> > ​my root/​
> >
> > Thanks,
> > Chen
> >
>


Make SubmittedJobGraphStore configurable

2017-07-05 Thread Chen Qin
​Hi there,

​I would like to propose/discuss median level refactor work to make
submittedJobGraphStore configurable and extensible.

The rationale behind is to allow users offload those meta data to durable
cross dc read after write strong consistency storage and decouple with zk
quorum.
​

https://issues.apache.org/jira/browse/FLINK-7106


New configurable setting in flink.conf
​ looks like following

g​
raph
​-s
tore:
​customized/zookeeper
g​
raph
​-s
tore​.class: xx.yy.MyS3SubmittedJobGraphStore​Imp

g​
raph
​-s
tore.
​endpoint
: s3.amazonaws.com
g​
raph
​-s
tore.path.root:
​s3:/
​
/
​my root/​

Thanks,
Chen


[jira] [Created] (FLINK-7106) Make SubmittedJobGraphStore configurable

2017-07-05 Thread Chen Qin (JIRA)
Chen Qin created FLINK-7106:
---

 Summary: Make SubmittedJobGraphStore configurable
 Key: FLINK-7106
 URL: https://issues.apache.org/jira/browse/FLINK-7106
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib, Local Runtime
Reporter: Chen Qin


Current SubmittedJobGraphStore is hardcoded to store in zookeeper if user 
choose HAMode. The goal of this task is to allow user build their own 
implementation and pass configuration from flink.conf and define how/where 
those information stores. (e.g rocksdb statebackend)

P.S I think this would be interesting to see how flink in HA mode can fallback 
to Standalone when zk suffers temp outages.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Flink as a Service (FaaS)

2017-03-24 Thread Chen Qin
Here is a working draft doc, feel free to comment out :)

https://docs.google.com/document/d/1MSsTOz7xUu50dAf_8v3gsQFfJFFy9LKnULdIl26yj0o/edit?usp=sharing

On Thu, Mar 23, 2017 at 5:00 PM, Chen Qin <qinnc...@gmail.com> wrote:

> Quick capture comments on FLINK-6085, we want to have rpc source that
> accept requests from clients and reroute response (callback to
> corresponding rpc source)
>
> ​
>
> On Tue, Mar 21, 2017 at 10:47 PM, Chen Qin <qinnc...@gmail.com> wrote:
>
>> Hi Radu/jinkui,
>>
>> Thanks for your input!
>>
>> I filed a master task to track discussion on this front
>> https://issues.apache.org/jira/browse/FLINK-6085
>>
>> Since this is a very broad topic, I would like to kick start with a tiny
>> deployment helper project.
>> What it try to address is to leverage various of service continuous
>> deployment pipelines in various of companies (amazon/facebook/uber) and
>> deploy/update jobmanager/taskmanagers as a high available micro service
>> (via zk and aws s3)
>>
>> I run this service in prod for a month (2 dc, 2 job managers per dc, 8-64
>> task managers per dc depending on workload) for testing usage.
>> Haven't seen problem so far.
>>
>> https://github.com/chenqin/flink-jar
>>
>> Thanks,
>> Chen
>>
>>
>>
>>
>> On Thu, Mar 16, 2017 at 2:05 AM, Radu Tudoran <radu.tudo...@huawei.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I propose that we consider also the type of connectivity to be supported
>>> in the Flink API Gateway. I would propose to support a couple of calls
>>> option to ingest also events. I am thinking of:
>>> - callback mechanism
>>> - REST
>>> - RPC
>>>
>>>
>>>
>>>
>>> -Original Message-
>>> From: Chen Qin [mailto:qinnc...@gmail.com]
>>> Sent: Wednesday, March 15, 2017 7:31 PM
>>> To: dev@flink.apache.org
>>> Subject: Re: Flink as a Service (FaaS)
>>>
>>> Hi jinkui,
>>>
>>> I haven't go down to that deep yet. Sounds like you have better idea
>>> what needs to be in place.
>>> Can you try to come up with a doc and may be draw some diagram so we can
>>> discuss from there?
>>>
>>> My original intention is to discuss general function gap of running lots
>>> of micro services(like thousands of services as I observed). I feel flink
>>> low level has potential to fit in to highly critical services space and do
>>> good job fill those gaps.
>>>
>>>
>>> mobile apps
>>> -------
>>> front end request router
>>> --
>>> service A| service B  | service C
>>> database A |database B| database C
>>> ---
>>>  Flink as a service
>>> 
>>> serviceD | serviceE |service F
>>> database D | database E |database F
>>>
>>> Thanks,
>>> Chen
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 14, 2017 at 12:01 AM, shijinkui <shijin...@huawei.com>
>>> wrote:
>>>
>>> > Hi, Chen Qin
>>> >
>>> > We also met your end-to-end use case. A RPC Source and Sink such as
>>> > netty source sink can fit such requirements. I’ve submit a natty
>>> > module in bahir-flink project which only a demo.
>>> > If use connector source instead of Kafka, how do we make the data
>>> > persistent? One choice is distributedlog project developed by twitter.
>>> >
>>> > The idea of micro service is very good. Playframework is better choice
>>> > to provide micro-service of Flink instead of Flink Monitor which
>>> > implemented by netty.
>>> > Submit Flink job in the Mesos cluster, at the same time deploy the
>>> > micro-service by marathon to the same Mesos cluster, and enable
>>> > mesos-dns for service discovery.
>>> >
>>> > The the micro-service can be a API Gateway for:
>>> > 1. receiving data from device
>>> > 2. Sending the data to the Flink Job Source(Netty Source with
>>> > distributedlog)
>>> > 3. At same time, the sink send the streaming result data to the API
>>> > Gateway 4. API Gateway support streaming invoke: send the sink result
>>> > data to the device channel
>>> >
>>> > So this plan can guarant

Re: Flink as a Service (FaaS)

2017-03-23 Thread Chen Qin
Quick capture comments on FLINK-6085, we want to have rpc source that
accept requests from clients and reroute response (callback to
corresponding rpc source)

​

On Tue, Mar 21, 2017 at 10:47 PM, Chen Qin <qinnc...@gmail.com> wrote:

> Hi Radu/jinkui,
>
> Thanks for your input!
>
> I filed a master task to track discussion on this front
> https://issues.apache.org/jira/browse/FLINK-6085
>
> Since this is a very broad topic, I would like to kick start with a tiny
> deployment helper project.
> What it try to address is to leverage various of service continuous
> deployment pipelines in various of companies (amazon/facebook/uber) and
> deploy/update jobmanager/taskmanagers as a high available micro service
> (via zk and aws s3)
>
> I run this service in prod for a month (2 dc, 2 job managers per dc, 8-64
> task managers per dc depending on workload) for testing usage.
> Haven't seen problem so far.
>
> https://github.com/chenqin/flink-jar
>
> Thanks,
> Chen
>
>
>
>
> On Thu, Mar 16, 2017 at 2:05 AM, Radu Tudoran <radu.tudo...@huawei.com>
> wrote:
>
>> Hi,
>>
>> I propose that we consider also the type of connectivity to be supported
>> in the Flink API Gateway. I would propose to support a couple of calls
>> option to ingest also events. I am thinking of:
>> - callback mechanism
>> - REST
>> - RPC
>>
>>
>>
>>
>> -Original Message-
>> From: Chen Qin [mailto:qinnc...@gmail.com]
>> Sent: Wednesday, March 15, 2017 7:31 PM
>> To: dev@flink.apache.org
>> Subject: Re: Flink as a Service (FaaS)
>>
>> Hi jinkui,
>>
>> I haven't go down to that deep yet. Sounds like you have better idea what
>> needs to be in place.
>> Can you try to come up with a doc and may be draw some diagram so we can
>> discuss from there?
>>
>> My original intention is to discuss general function gap of running lots
>> of micro services(like thousands of services as I observed). I feel flink
>> low level has potential to fit in to highly critical services space and do
>> good job fill those gaps.
>>
>>
>> mobile apps
>> ---
>> front end request router
>> --
>> service A| service B  | service C
>> database A |database B| database C
>> -------
>>  Flink as a service
>> 
>> serviceD | serviceE |service F
>> database D | database E |database F
>>
>> Thanks,
>> Chen
>>
>>
>>
>>
>>
>> On Tue, Mar 14, 2017 at 12:01 AM, shijinkui <shijin...@huawei.com> wrote:
>>
>> > Hi, Chen Qin
>> >
>> > We also met your end-to-end use case. A RPC Source and Sink such as
>> > netty source sink can fit such requirements. I’ve submit a natty
>> > module in bahir-flink project which only a demo.
>> > If use connector source instead of Kafka, how do we make the data
>> > persistent? One choice is distributedlog project developed by twitter.
>> >
>> > The idea of micro service is very good. Playframework is better choice
>> > to provide micro-service of Flink instead of Flink Monitor which
>> > implemented by netty.
>> > Submit Flink job in the Mesos cluster, at the same time deploy the
>> > micro-service by marathon to the same Mesos cluster, and enable
>> > mesos-dns for service discovery.
>> >
>> > The the micro-service can be a API Gateway for:
>> > 1. receiving data from device
>> > 2. Sending the data to the Flink Job Source(Netty Source with
>> > distributedlog)
>> > 3. At same time, the sink send the streaming result data to the API
>> > Gateway 4. API Gateway support streaming invoke: send the sink result
>> > data to the device channel
>> >
>> > So this plan can guarantee the end-user invoke the service
>> > synchronized,
>> > and don’t care about Flink Job’s data processing.
>> >
>> > By the way, X as a Service actually is called by SAAS/PAAS in the
>> > cloud platform, such as AWS/Azure. We can call it Flink micro
>> > service.:)
>> >
>> > Best Regards
>> > Jinkui Shi
>> >
>> > 在 2017/3/14 下午2:13, "Chen Qin" <qinnc...@gmail.com> 写入:
>> >
>> > >Hi there,
>> > >
>> > >I am very happy about Flink 1.2 release. It was much more robust and
>> > >feature rich compare to previous versions. In the follo

Re: Flink as a Service (FaaS)

2017-03-21 Thread Chen Qin
Hi Radu/jinkui,

Thanks for your input!

I filed a master task to track discussion on this front
https://issues.apache.org/jira/browse/FLINK-6085

Since this is a very broad topic, I would like to kick start with a tiny
deployment helper project.
What it try to address is to leverage various of service continuous
deployment pipelines in various of companies (amazon/facebook/uber) and
deploy/update jobmanager/taskmanagers as a high available micro service
(via zk and aws s3)

I run this service in prod for a month (2 dc, 2 job managers per dc, 8-64
task managers per dc depending on workload) for testing usage.
Haven't seen problem so far.

https://github.com/chenqin/flink-jar

Thanks,
Chen




On Thu, Mar 16, 2017 at 2:05 AM, Radu Tudoran <radu.tudo...@huawei.com>
wrote:

> Hi,
>
> I propose that we consider also the type of connectivity to be supported
> in the Flink API Gateway. I would propose to support a couple of calls
> option to ingest also events. I am thinking of:
> - callback mechanism
> - REST
> - RPC
>
>
>
>
> -Original Message-
> From: Chen Qin [mailto:qinnc...@gmail.com]
> Sent: Wednesday, March 15, 2017 7:31 PM
> To: dev@flink.apache.org
> Subject: Re: Flink as a Service (FaaS)
>
> Hi jinkui,
>
> I haven't go down to that deep yet. Sounds like you have better idea what
> needs to be in place.
> Can you try to come up with a doc and may be draw some diagram so we can
> discuss from there?
>
> My original intention is to discuss general function gap of running lots
> of micro services(like thousands of services as I observed). I feel flink
> low level has potential to fit in to highly critical services space and do
> good job fill those gaps.
>
>
> mobile apps
> ---
> front end request router
> --
> service A| service B  | service C
> database A |database B| database C
> ---
>  Flink as a service
> 
> serviceD | serviceE |service F
> database D | database E |database F
>
> Thanks,
> Chen
>
>
>
>
>
> On Tue, Mar 14, 2017 at 12:01 AM, shijinkui <shijin...@huawei.com> wrote:
>
> > Hi, Chen Qin
> >
> > We also met your end-to-end use case. A RPC Source and Sink such as
> > netty source sink can fit such requirements. I’ve submit a natty
> > module in bahir-flink project which only a demo.
> > If use connector source instead of Kafka, how do we make the data
> > persistent? One choice is distributedlog project developed by twitter.
> >
> > The idea of micro service is very good. Playframework is better choice
> > to provide micro-service of Flink instead of Flink Monitor which
> > implemented by netty.
> > Submit Flink job in the Mesos cluster, at the same time deploy the
> > micro-service by marathon to the same Mesos cluster, and enable
> > mesos-dns for service discovery.
> >
> > The the micro-service can be a API Gateway for:
> > 1. receiving data from device
> > 2. Sending the data to the Flink Job Source(Netty Source with
> > distributedlog)
> > 3. At same time, the sink send the streaming result data to the API
> > Gateway 4. API Gateway support streaming invoke: send the sink result
> > data to the device channel
> >
> > So this plan can guarantee the end-user invoke the service
> > synchronized,
> > and don’t care about Flink Job’s data processing.
> >
> > By the way, X as a Service actually is called by SAAS/PAAS in the
> > cloud platform, such as AWS/Azure. We can call it Flink micro
> > service.:)
> >
> > Best Regards
> > Jinkui Shi
> >
> > 在 2017/3/14 下午2:13, "Chen Qin" <qinnc...@gmail.com> 写入:
> >
> > >Hi there,
> > >
> > >I am very happy about Flink 1.2 release. It was much more robust and
> > >feature rich compare to previous versions. In the following section,
> > >I would like to discuss a non typical use case in flink community.
> > >
> > >With ever increasing popularity of micro services[1] to scale out
> > >popular online services. Various aspect of source of truth is stored
> > >(a.k.a
> > >partitioned) behind various of service rpc endpoints. There is a
> > >general need of managing events traversal and enrichment throughout
> > >org SOA systems. (SOA) It is no longer part of data infrastructure
> > >scope, where traditionally known as batched slow and analytic(small %
> lossy is okay).
> > >Flink might also find a fit into core services as well.
> > >
> > >It's p

Re: [ANNOUNCE] New committer: Theodore Vasiloudis

2017-03-21 Thread Chen Qin
Congrats and looking forward to see great stuff in flink ml side!

Chen

> On Mar 21, 2017, at 19:13, 伍翀(云邪)  wrote:
> 
> Congrats, Theo!
> 
>> 在 2017年3月22日,上午8:30,Chiwan Park  写道:
>> 
>> Congratulations, Theo!
>> 
>> Regards,
>> Chiwan Park
>> 
>>> On 03/22/2017 03:06 AM, Ted Yu wrote:
>>> Congratulations !
>>> 
>>> 
 On Tue, Mar 21, 2017 at 11:00 AM, Matthias J. Sax  wrote:
 
 -BEGIN PGP SIGNED MESSAGE-
 Hash: SHA512
 
 Congrats!
 
> On 3/21/17 8:59 AM, Greg Hogan wrote:
> Welcome, Theo, and great to have you onboard with Flink and ML!
> 
> 
>> On Mar 21, 2017, at 4:35 AM, Robert Metzger 
>> wrote:
>> 
>> Hi everybody,
>> 
>> On behalf of the PMC I am delighted to announce Theodore
>> Vasiloudis as a new Flink committer!
>> 
>> Theo has been a community member for a very long time and he is
>> one of the main drivers of the currently ongoing ML discussions
>> in Flink.
>> 
>> 
>> Welcome Theo and congratulations again for becoming a Flink
>> committer!
>> 
>> 
>> Regards, Robert
> 
 -BEGIN PGP SIGNATURE-
 Comment: GPGTools - https://gpgtools.org
 
 iQIYBAEBCgAGBQJY0WpUAAoJELz8Z8hxAGOiwsUP337d/4nIcb0OaUC6/S23HOJs
 0VvQJtpZ1KpjimJAxLo6Z5LaSozbdSJJRBtkLp0bNdw3pTTiHGV7jRA+nPJW7/+7
 qHMOhLYED5WxEzQZDyaBMauxYfOO9fbRXEfblLHnq3yXQQOTeJisx9rBxpiPTa+K
 RPnkUZF/76RHiZXNggYpahqho9KwiARqiUWOJkuAiTM118a2Xj47vBNVekCs9YpI
 5yKEH+8f9ADc8j1dHmQmpu9xKjMmm39SMJm7XSRGIrMFPwJmj3N94Uv75lEuMYJa
 qs+RqKADAVRZHCZt6LdOb1uViLR9fN2/q14lKahNnK4V0TtRsyhZ7evwT4Myy3WW
 bUuslmB1Wix0Ysq5T6s2STgwtHyITtT0A1Ur6BbJu6VKi5r4d37kluASrFHziwXL
 kzTf4KFWfR6807VDh93TlAWG1ONkz72lZqGU103r8gFE08l3Wr95Vz6zXcLRkmy0
 KaTAdMlJyys7Vtep9GvFHO1wzGSIEPAJ3TmfRSsWDKvVhGCXLPfX2aiqXvN++HQ3
 rB+C8gYWIneTA1C9J/Sv0iLuK/M4Jq+WAQ090Z8C/5Tqi11C5Ez6g5g5Md3Ij2gi
 OYvEcFbJlPAnvQ4vs8gBEwejerNYnsufVRKfPG6yV1F0iOmMPOm0eqEwLKVViPb2
 Uxi4txWBrpsAHDk=
 =mH4x
 -END PGP SIGNATURE-
 
> 


[jira] [Created] (FLINK-6113) Implement split/select with OutputTag

2017-03-18 Thread Chen Qin (JIRA)
Chen Qin created FLINK-6113:
---

 Summary: Implement split/select with OutputTag
 Key: FLINK-6113
 URL: https://issues.apache.org/jira/browse/FLINK-6113
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.3.0
Reporter: Chen Qin
Priority: Minor


With completion of FLINK-4460(side outputs), this is one of follow up item 
towards deprecate string tag based split/select with OutputTag based 
split/select.

In Flink 2.0, we might consider eventually deprecate split/select 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Machine Learning on Flink - Next steps

2017-03-17 Thread Chen Qin
Hi there,

I am not a machine learning expert :) But in recent, I see more and more
adoption and trends towards tensor flow[1], which is backed by google and
big vendors.

If flink and somehow compatible and run tensor flow pipelines (with some
modifications is fine) I think the adoption would be faster.

Thanks,
Chen

[1] https://github.com/tensorflow/tensorflow

On Fri, Mar 17, 2017 at 7:44 AM, Theodore Vasiloudis <
theodoros.vasilou...@gmail.com> wrote:

> >
> > What should be the way of work here? We could have sketches for the
> > separate projects in Gdocs, then the shepherds could make a proposal out
> of
> > it. Would that be feasible?
>
>
> That's what I was thinking as well. It's the responsibility of the shepherd
> to engage the people motivated to work
> on a project, starting with a rough Gdocs document and gradually transition
> it to a proper design doc.
>
> As an example use-case (for both online and "fast-batch") I would recommend
> an ad click scenario: Predicting CTR.
>
> The are multiple reasons I like this application:
>
>- it's a very popular application
>- it's directly tied to revenue so even small improvements are relevant,
>- it can often be a very large-scale problem in data and model size,
>- there are good systems out there already to benchmark against, like
>Vowpal Wabbit.
>- At least one one large-scale dataset exists [1],
>- We could even place a pre-processing pipeline to emulate a real
>application, and show the full benefits of using Flink as your
>one-stop-shop for an integrated prediction pipeline (up until model
> serving
>for now).
>
> We are still missing someone to take the lead on the model serving project,
> if somebody would be interested to
> coordinate that let us know.
>
> Regards,
> Theodore
>
> [1] Criteo click-through data (1TB):
> http://www.criteo.com/news/press-releases/2015/06/criteo-
> releases-industrys-largest-ever-dataset/
>
> On Thu, Mar 16, 2017 at 11:50 PM, Gábor Hermann 
> wrote:
>
> > @Theodore: thanks for bringing the discussion together.
> > I think it's reasonable to go on all the three directions, just as you
> > suggested. I agree we should concentrate our efforts, but we can do a
> > low-effort evaluation of all the three.
> >
> > I would like to volunteer for shepherding *Offline learning on
> Streaming*.
> > I am already working on related issues, and I believe I have a fairly
> good
> > overview on the streaming API and its limitations. However, we need to
> find
> > a good use-case to aim for, and I don't have one in mind yet, so please
> > help with that if you can. I absolutely agree with Theodore, that setting
> > the scope is the most important here.
> >
> > We should find a simple use-case for incremental learning. As Flink is
> > really strong in low-latency data processing, the best would be a
> use-case
> > where rapidly adapting the model to new data provides a value. We should
> > also consider low-latency serving for such a use-case, as there is not
> much
> > use in fast model updates if we cannot serve the predictions that fast.
> Of
> > course, it's okay to simply implement offline algorithms, but showcasing
> > would be easier if we could add prediction serving for the model in the
> > same system.
> >
> > What should be the way of work here? We could have sketches for the
> > separate projects in Gdocs, then the shepherds could make a proposal out
> of
> > it. Would that be feasible?
> >
> > @Stephan:
> > Thanks for your all insights. I also like the approach of aiming for new
> > and somewhat unexplored areas. I guess we can do that with both the
> > serving/evaluation and incremental training (that should be in scope of
> the
> > offline ML on streaming).
> >
> > I agree GPU acceleration is an important issue, however it might be
> > out-of-scope for the prototypes of these new ML directions. What do you
> > think?
> >
> > Regarding your comments on the other thread, I'm really glad PMC is
> > working towards growing the community. This is crucial to have anything
> > merged in Flink while keeping the code quality. However, for the
> > prototypes, I'd prefer Theodore's suggestion, to do it in a separate
> > repository, to make initial development faster. After the prototypes have
> > proven their usability we could merge them, and continue working on them
> > inside the Flink repository. But we can decide that later.
> >
> > Cheers,
> > Gabor
> >
> >
> >
> > On 2017-03-14 21:04, Stephan Ewen wrote:
> >
> >> Thanks Theo. Just wrote some comments on the other thread, but it looks
> >> like you got it covered already.
> >>
> >> Let me re-post what I think may help as input:
> >>
> >> *Concerning Model Evaluation / Serving *
> >>
> >> - My personal take is that the "model evaluation" over streams will
> be
> >> happening in any case - there
> >>   is genuine interest in that and various users have built that
> >> themselves already.
> >>   I 

[jira] [Created] (FLINK-6085) flink as micro service

2017-03-16 Thread Chen Qin (JIRA)
Chen Qin created FLINK-6085:
---

 Summary: flink as micro service
 Key: FLINK-6085
 URL: https://issues.apache.org/jira/browse/FLINK-6085
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, JobManager
Reporter: Chen Qin
Priority: Minor


Track discussion around run flink as a micro service, includes but not limited 
to 
- RPC (web service endpoint) source
  as web service endpoint accept RPC call, ingest to the streaming job(only one)
- callback mechanism 
- task assignment should honor deployment group (web tier hosts should be 
isolated from rest of task assignment)




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Flink as a Service (FaaS)

2017-03-15 Thread Chen Qin
Hi jinkui,

I haven't go down to that deep yet. Sounds like you have better idea what
needs to be in place.
Can you try to come up with a doc and may be draw some diagram so we can
discuss from there?

My original intention is to discuss general function gap of running lots of
micro services(like thousands of services as I observed). I feel flink low
level has potential to fit in to highly critical services space and do good
job fill those gaps.


mobile apps
---
front end request router
--
service A| service B  | service C
database A |database B| database C
---
 Flink as a service

serviceD | serviceE |service F
database D | database E |database F

Thanks,
Chen





On Tue, Mar 14, 2017 at 12:01 AM, shijinkui <shijin...@huawei.com> wrote:

> Hi, Chen Qin
>
> We also met your end-to-end use case. A RPC Source and Sink such as netty
> source sink can fit such requirements. I’ve submit a natty module in
> bahir-flink project which only a demo.
> If use connector source instead of Kafka, how do we make the data
> persistent? One choice is distributedlog project developed by twitter.
>
> The idea of micro service is very good. Playframework is better choice to
> provide micro-service of Flink instead of Flink Monitor which implemented
> by netty.
> Submit Flink job in the Mesos cluster, at the same time deploy the
> micro-service by marathon to the same Mesos cluster, and enable mesos-dns
> for service discovery.
>
> The the micro-service can be a API Gateway for:
> 1. receiving data from device
> 2. Sending the data to the Flink Job Source(Netty Source with
> distributedlog)
> 3. At same time, the sink send the streaming result data to the API Gateway
> 4. API Gateway support streaming invoke: send the sink result data to the
> device channel
>
> So this plan can guarantee the end-user invoke the service synchronized,
> and don’t care about Flink Job’s data processing.
>
> By the way, X as a Service actually is called by SAAS/PAAS in the cloud
> platform, such as AWS/Azure. We can call it Flink micro service.:)
>
> Best Regards
> Jinkui Shi
>
> 在 2017/3/14 下午2:13, "Chen Qin" <qinnc...@gmail.com> 写入:
>
> >Hi there,
> >
> >I am very happy about Flink 1.2 release. It was much more robust and
> >feature rich compare to previous versions. In the following section, I
> >would like to discuss a non typical use case in flink community.
> >
> >With ever increasing popularity of micro services[1] to scale out popular
> >online services. Various aspect of source of truth is stored (a.k.a
> >partitioned) behind various of service rpc endpoints. There is a general
> >need of managing events traversal and enrichment throughout org SOA
> >systems. (SOA) It is no longer part of data infrastructure scope, where
> >traditionally known as batched slow and analytic(small % lossy is okay).
> >Flink might also find a fit into core services as well.
> >
> >It's part of online production services, serving directly from mobile
> >client events more importantly services database post commit logs and
> >orchestrate adhoc stream toplogies to transform and transfer between
> >online
> >services(usually backed by databases and serving request response with
> >stragent latency requirement)
> >
> >Use case:
> >user updates comes from mobile client via kafka topic, which consumed both
> >by user service as well as streaming job. When streaming job do RPC and
> >trying to enrich user information, it cause race condition which turns out
> >database persistence is not as speedy as streaming job.
> >
> >In general, streaming job should consume user service commit logs instead
> >of karfka topic which defines as source of truth in term of user
> >information. Is there a general way to couple with these issues?
> >
> >P.S I was able to build task manager as jar package and deployed to
> >production environment. Instead of using YARN to manage warehouse
> >machines.
> >Utilize same deployment environment as other online services as docker. So
> >far, it seems running smoothly.
> >
> >Thanks,
> >Chen
> >
> >
> >[1] https://en.wikipedia.org/wiki/Microservices
> >[2] https://martinfowler.com/eaaDev/EventSourcing.html
>
>


Flink as a Service (FaaS)

2017-03-14 Thread Chen Qin
Hi there,

I am very happy about Flink 1.2 release. It was much more robust and
feature rich compare to previous versions. In the following section, I
would like to discuss a non typical use case in flink community.

With ever increasing popularity of micro services[1] to scale out popular
online services. Various aspect of source of truth is stored (a.k.a
partitioned) behind various of service rpc endpoints. There is a general
need of managing events traversal and enrichment throughout org SOA
systems. (SOA) It is no longer part of data infrastructure scope, where
traditionally known as batched slow and analytic(small % lossy is okay).
Flink might also find a fit into core services as well.

It's part of online production services, serving directly from mobile
client events more importantly services database post commit logs and
orchestrate adhoc stream toplogies to transform and transfer between online
services(usually backed by databases and serving request response with
stragent latency requirement)

Use case:
user updates comes from mobile client via kafka topic, which consumed both
by user service as well as streaming job. When streaming job do RPC and
trying to enrich user information, it cause race condition which turns out
database persistence is not as speedy as streaming job.

In general, streaming job should consume user service commit logs instead
of karfka topic which defines as source of truth in term of user
information. Is there a general way to couple with these issues?

P.S I was able to build task manager as jar package and deployed to
production environment. Instead of using YARN to manage warehouse machines.
Utilize same deployment environment as other online services as docker. So
far, it seems running smoothly.

Thanks,
Chen


[1] https://en.wikipedia.org/wiki/Microservices
[2] https://martinfowler.com/eaaDev/EventSourcing.html


Re: [DISCUSS] Side Outputs and Split/Select

2017-02-24 Thread Chen Qin
Hi Jamie,

I think it does make consuming late arriving events more explicit! At cost of 
fix a predefined OutputTag which user have no control nor definition
an extra UDF which essentially filter out all mainOutputs and only let 
sideOutput pass (like filterFunction)

Thanks,
Chen

> On Feb 24, 2017, at 1:17 PM, Jamie Grier  wrote:
> 
> I prefer the ProcessFunction and side outputs solution over split() and
> select() which I've never liked primarily due to the lack of type safety
> and it also doesn't really seem to fit with the rest of Flink's API.
> 
> On the late data question I strongly prefer the late data concept being
> explicit in the API.  Could we not also do something like:
> 
> WindowedStream<> windowedStream = input
>  .keyBy(...)
>  .window(...);
> 
> DataStream<> mainOutput = windowedStream
>  .apply(...);
> 
> DataStream<> lateOutput = windowStream
>  .lateStream();
> 
> 
> 
> 
> 
> 
> 
> On Thu, Feb 23, 2017 at 7:08 AM, Gyula Fóra  wrote:
> 
>> Hi,
>> 
>> Thanks for the nice proposal, I like the idea of side outputs, and it would
>> make a lot of topologies much simpler.
>> 
>> Regarding the API I think we should come up with a way of making side
>> otuputs accessible from all sort of operators in a similar way. For
>> instance through the RichFunction interface with a special collector that
>> we invalidate when the user should not be collecting to it. (just a quick
>> idea)
>> 
>> I personally wouldn't deprecate the "universal" Split/Select API that can
>> be used on any  DataStream in favor of functionality that is only
>> accessible trhough the process function/ or a few select operators. I think
>> the Split/Select pattern is also very nice and I use it in many different
>> contexts to get efficient multiway filtering (after map/co operators for
>> examples).
>> 
>> Regards,
>> Gyula
>> 
>> Aljoscha Krettek  ezt írta (időpont: 2017. febr. 23.,
>> Cs, 15:42):
>> 
>>> Hi Folks,
>>> Chen and I have been working for a while now on making FLIP-13 (side
>>> outputs) [1] a reality. We think we have a pretty good internal
>>> implementation and also a proposal for an API but now we need to discuss
>>> how we want to go forward with this, especially how we should deal with
>>> split/select which does some of the same things side outputs can do. I'll
>>> first quickly describe what the split/select API looks like, so that
>> we're
>>> all on the same page. Then I'll present the new proposed side output API
>>> and then I'll present new API for getting dropped late data from a
>> windowed
>>> operation, which was the original motivation for adding side outputs.
>>> 
>>> Split/select consists of two API calls: DataStream.split(OutputSelector)
>>> and SplitStream.select(). You can use it like this:
>>> 
>>> DataStreamSource input = env.fromElements(1, 2, 3);
>>> 
>>> final String EVEN_SELECTOR = "even";
>>> final String ODD_SELECTOR = "odd";
>>> 
>>> SplitStream split = input.split(
>>>new OutputSelector() {
>>>@Override
>>>public Iterable select(Integer value) {
>>>if (value % 2 == 0) {
>>>return Collections.singleton(EVEN_SELECTOR);
>>>} else {
>>>return Collections.singleton(ODD_SELECTOR);
>>>}
>>>}
>>>});
>>> 
>>> DataStream evenStream = split.select(EVEN_SELECTOR);
>>> DataStream oddStream = split.select(ODD_SELECTOR);
>>> 
>>> The stream is split according to an OutputSelector that returns an
>> Iterable
>>> of Strings. Then you can use select() to get a new stream that only
>>> contains elements with the given selector. Notice how the element type
>> for
>>> all the split streams is the same.
>>> 
>>> The new side output API proposal adds a new type OutputTag and relies
>> on
>>> extending ProcessFunction to allow emitting data to outputs besides the
>>> main output. I think it's best explained with an example as well:
>>> 
>>> DataStreamSource input = env.fromElements(1, 2, 3);
>>> 
>>> final OutputTag sideOutput1 = new OutputTag<>("side-output-1"){}
>> ;
>>> final OutputTag sideOutput2 = new OutputTag<>("side-output-2"){}
>> ;
>>> 
>>> SingleOutputStreamOperator mainOutputStream = input
>>>.process(new ProcessFunction() {
>>> 
>>>@Override
>>>public void processElement(
>>>Integer value,
>>>Context ctx,
>>>Collector out) throws Exception {
>>> 
>>>ctx.output(sideOutput1, "WE GOT: " + value);
>>>ctx.output(sideOutput2, value);
>>>out.collect("MAIN OUTPUT: " + value);
>>>}
>>> 
>>>});
>>> 
>>> DataStream sideOutputStream1 =
>>> mainOutputStream.getSideOutput(sideOutput1);
>>> DataStream sideOutputStream2 =
>>> mainOutputStream.getSideOutput(sideOutput2);
>>> 
>>> Notice how the OutputTags are anonymous inner 

Re: [ANNOUNCE] Welcome Jark Wu and Kostas Kloudas as committers

2017-02-07 Thread Chen Qin
Congrats! 

> On Feb 7, 2017, at 17:52, Zhuoluo Yang  wrote:
> 
> Congrats! Good job guys!
> 
> Thanks,
> 
> Zhuoluo 
> 
> 
> 
> 
> 
>> 在 2017年2月8日,上午4:59,Greg Hogan  写道:
>> 
>> Welcome Jark and Kostas! Thank you for your contributions and many more to
>> come.
>> 
>>> On Tue, Feb 7, 2017 at 3:16 PM, Fabian Hueske  wrote:
>>> 
>>> Hi everybody,
>>> 
>>> I'm very happy to announce that Jark Wu and Kostas Kloudas accepted the
>>> invitation of the Flink PMC to become committers of the Apache Flink
>>> project.
>>> 
>>> Jark and Kostas are longtime members of the Flink community.
>>> Both are actively driving Flink's development and contributing to its
>>> community in many ways.
>>> 
>>> Please join me in welcoming Kostas and Jark as committers.
>>> 
>>> Fabian
>>> 
> 


Re: States split over to external storage

2017-01-25 Thread Chen Qin
Hi Stephan ,Fabian, Liuxin,

Looks like it's already solved with dynamic scale keygroup along with
incremental policy.

Our use case is like a workflow model where high volume events (million
TPS) & long holding window (24 hours), very small percentage of events will
be forwarded to next operator. Wether spend lots of nodes mostly idle to
hold majority of "cold states" in key groups or split over to external
storage can be discussed. But I feel this is not typical use scenario so I
am fine with first approach(as long as incremental checkpointing in place)!

Thanks,
Chen


On Fri, Jan 20, 2017 at 3:25 AM, Stephan Ewen <se...@apache.org> wrote:

> There are works on different approaches of incremental policies underways
> (more soon in some design proposals),
> but the point raised here sounded different to me.
>
> Maybe Chen Qin can describe in some more detail what he was having in
> mind...
>
> On Fri, Jan 20, 2017 at 12:15 PM, liuxinchun <liuxinc...@huawei.com>
> wrote:
>
> > I think the current backup strategy checkpoints the while whindow
> > everytime,when the window size is very large, it's time and storage
> > consuming. An increamental policy should be consided.
> >
> > Sent from HUAWEI AnyOffice
> > *发件人:*Stephan Ewen
> > *收件人:*dev@flink.apache.org,
> > *抄送:*iuxinc...@huawei.com,Aljoscha Krettek,时金魁,
> > *时间:*2017-01-20 18:35:46
> > *主题:*Re: States split over to external storage
> >
> > Hi!
> >
> > This is an interesting suggestion.
> > Just to make sure I understand it correctly: Do you design this for cases
> > where the state per machine is larger than that machines memory/disk? And
> > in that case, you cannot solve the problem by scaling out (having more
> > machines)?
> >
> > Stephan
> >
> >
> > On Tue, Jan 17, 2017 at 6:29 AM, Chen Qin <c...@uber.com> wrote:
> >
> > > Hi there,
> > >
> > > I would like to discuss split over local states to external storage.
> The
> > > use case is NOT another external state backend like HDFS, rather just
> to
> > > expand beyond what local disk/ memory can hold when large key space
> > exceeds
> > > what task managers could handle. Realizing FLINK-4266 might be hard to
> > > tacking all-in-one, I would live give a shot to split-over first.
> > >
> > > An intuitive approach would be treat HeapStatebackend as LRU cache and
> > > split over to external key/value storage when threshold triggered. To
> > make
> > > this happen, we need minor refactor to runtime and adding set/get
> logic.
> > > One nice thing of keeping HDFS to store snapshots would be avoid
> > versioning
> > > conflicts. Once checkpoint restore happens, partial write data will be
> > > overwritten with previously checkpointed value.
> > >
> > > Comments?
> > >
> > > --
> > > -Chen Qin
> > >
> >
>



-- 
-Chen Qin


Re: 答复: States split over to external storage

2017-01-17 Thread Chen Qin
Hi liuxinchun,

Thanks for expedite feedback!

I think if dev community find it makes sense to invest on this feature,
allowing user config eviction strategy(2) makes sense to me. Given the
nature how flink job states increase various a lot, there might be a
interface allow state backend decide which state can be evicted or
restored.

Regarding to (1), I see there are optimizations can give performance boost
immediately. I would suggest raise a jira and discuss with whole dev
community. There might be cases it will conflict with upcoming refactors.
Notice Flink devs are super busy releasing 1.2 so expecting late response :)

Thanks,
Chen


>
> (1) The organization form of current sliding 
> window(SlidingProcessingTimeWindow
> and SlidingEventTimeWindow) have a drawback: When using ListState, a
> element may be kept in multiple windows (size / slide). It's time consuming
> and waste storage when checkpointing.
>   Opinion: I think this is a optimal point. Elements can be organized
> according to the key and split(maybe also can called as pane). When
> triggering cleanup, only the oldest split(pane) can be cleanup.
> (2) Incremental backup strategy. In original idea, we plan to only backup
> the new coming element, and that means a whole window may span several
> checkpoints, and we have develop this idea in our private SPS. But in
> Flink, the window may not keep raw data(for example, ReducingState and
> FoldingState). The idea of Chen Qin maybe a candidate strategy. We can keep
> in touch and exchange our respective strategy.
> -邮件原件-
> 发件人: Chen Qin [mailto:c...@uber.com]
> 发送时间: 2017年1月17日 13:30
> 收件人: dev@flink.apache.org
> 抄送: iuxinc...@huawei.com; Aljoscha Krettek; shijinkui
> 主题: States split over to external storage
>
> Hi there,
>
> I would like to discuss split over local states to external storage. The
> use case is NOT another external state backend like HDFS, rather just to
> expand beyond what local disk/ memory can hold when large key space exceeds
> what task managers could handle. Realizing FLINK-4266 might be hard to
> tacking all-in-one, I would live give a shot to split-over first.
>
> An intuitive approach would be treat HeapStatebackend as LRU cache and
> split over to external key/value storage when threshold triggered. To make
> this happen, we need minor refactor to runtime and adding set/get logic.
> One nice thing of keeping HDFS to store snapshots would be avoid
> versioning conflicts. Once checkpoint restore happens, partial write data
> will be overwritten with previously checkpointed value.
>
> Comments?
>
> --
> -Chen Qin
>



-- 
-Chen Qin


States split over to external storage

2017-01-16 Thread Chen Qin
Hi there,

I would like to discuss split over local states to external storage. The
use case is NOT another external state backend like HDFS, rather just to
expand beyond what local disk/ memory can hold when large key space exceeds
what task managers could handle. Realizing FLINK-4266 might be hard to
tacking all-in-one, I would live give a shot to split-over first.

An intuitive approach would be treat HeapStatebackend as LRU cache and
split over to external key/value storage when threshold triggered. To make
this happen, we need minor refactor to runtime and adding set/get logic.
One nice thing of keeping HDFS to store snapshots would be avoid versioning
conflicts. Once checkpoint restore happens, partial write data will be
overwritten with previously checkpointed value.

Comments?

-- 
-Chen Qin


Minor Cleanup of StreamGraphGeneator

2016-12-12 Thread Chen Qin
Hi There,

I am thinking of doing a minor refactor of StreamGraphGenerator.

There are some transforms share same logic that can potentially reusable.
Also, long list of if else seems hard to read, might better off abstract
interface & offer default implementation.

Bonus point would be allow configuration overwrite of this "transformer"
with job level customized implementation.

Any suggestions?

Thanks,
Chen


Re: [Discuss] FLIP-13 Side Outputs in Flink

2016-12-09 Thread Chen Qin
Dear Flink community members,

Please review and comment on https://github.com/apache/flink/pull/2982.

Thanks,
Chen



--
View this message in context: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-13-Side-Outputs-in-Flink-tp14204p14938.html
Sent from the Apache Flink Mailing List archive. mailing list archive at 
Nabble.com.


Re: [Discuss] State Backend use external HBase storage

2016-11-19 Thread Chen Qin
Hi Jinkui,
Remote state backend is in discussion phase, we send out design for some
times.
Due to the fact it will be impacted with dynamic scaling and expected non
partition state changes, we decided to revisit after dusts settled.

Thanks,
Chen

On Thu, Nov 17, 2016 at 2:23 AM, sjk <shijinkui...@163.com> wrote:

> Hi, Chen Qin
> I fount this issue. Does it kicked off?  What’s the current progress?
> https://issues.apache.org/jira/browse/FLINK-4266
>
> On Nov 16, 2016, at 19:35, Till Rohrmann <trohrm...@apache.org> wrote:
>
> Hi Jinkui,
>
> the file system state backend and the RocksDB state backend can be
> configured (and usually should be) such that they store their checkpoint
> data on a reliable storage system such as HDFS. Then you also have the
> reliability guarantees.
>
> Of course, one can start adding more state backends to Flink. At some point
> in time there was the idea to write a Cassandra backed state backend [1],
> for example. Similarly, one could think about a HBase backed state backend.
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Cassandra-
> statebackend-td12690.html
>
>
> Cheers,
> Till
>
> On Wed, Nov 16, 2016 at 3:10 AM, shijinkui <shijin...@huawei.com> wrote:
>
> Hi, All
>
> At present flink have three state backend: memory, file system, rocksdb.
> MemoryStateBackend will tansform the snapshot to jobManager, 5MB limited
> default. Even setting it bigger, that not suitable for very big state
> storage.
> HDFS can meet the reliability guarantee, but It's slow. File System and
> RocksDB are fast, but they are have no reliability guarantee.
> Three state backend all have no reliability guarantee.
>
> Can we have a Hbase state backend, providing reliability guarantee of
> state snapshot?
> For user, only new a HbaseStateBackend object, provide hbase parameter and
> optimization configure.
> Maybe Hbase or other distributed key-value storage is heavyweight storage,
> we only use hbase client to read/write asynchronously.
>
> -Jinkui Shi
>
>
>


Re: [Discuss] FLIP-13 Side Outputs in Flink

2016-11-03 Thread Chen Qin
Adding another abstract method to Collector interface is also considerably
easier from API backward compatibility point of view.

The cost could be either

1) many class with empty implementation of * void collect(OutputTag
tag, S value) *method

2) split streamrecord related classes that implement Collector interface
from graph generator related classes. For streamrecord ones, we might be
able to implement *collect(T out)* by calling * void
collect(OutputTag tag, S value). *For graph generator keep it as it is.


On Wed, Nov 2, 2016 at 8:14 PM, Chen Qin <c...@uber.com> wrote:

> Hi Fabian
>
> Thanks for your feedback. sorry for late reply.
> Some of comments inline. Will update FLIP-13 wiki reflect your comments.
>
>
> - Will multiple side outputs of the same type be supported?
>
> > It wasn't implemented in prototype. But should be easy to support, we
> have unique id in stream record.
>
> - If I got it right, the FLIP proposes to change the signatures of many
>
> user-defined functions (FlatMapFunction, WindowFunction, ...). Most of
>
> these interfaces/classes are annotated with @Public, which means we cannot
>
> change them in the Flink 1.x release line. What would be alternatives? I
>
> can think of
> a) casting the Collector into a RichCollector (as you do in
>
> your prototype) or
> > This is like a private magic API. Should be 100% compatible but not good
> implementation.
>
> b) retrieve the RichCollector from the RuntimeContext
>
> > It seems better option, yet many highly used Function like FlatMap will
> not get support. To get support, we need to create some redundant classes
> inherited from RichFunction( like implement RichFlatMap etc) [we might put
> these in different package and isolate impact of this change)
>
> that a RichFunction provides.
>
>
> I'm not so familiar with the internals of the DataStream API, so I leave
>
> comments on that to other.
>
>
> Best, Fabian
>
> On Tue, Oct 25, 2016 at 9:00 AM, Chen Qin <c...@uber.com> wrote:
>
>> Hey folks,
>>
>> Please give feedback on FLIP-13!
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+
>> Side+Outputs+in+Flink
>> JIRA task link to google doc https://issues.apache.org/
>> jira/browse/FLINK-4460
>>
>> Thanks,
>> Chen Qin
>>
>
>
>
> --
> -Chen Qin
>



-- 
-Chen Qin


Re: [Discuss] FLIP-13 Side Outputs in Flink

2016-11-02 Thread Chen Qin
 Hi Fabian

Thanks for your feedback. sorry for late reply.
Some of comments inline. Will update FLIP-13 wiki reflect your comments.


- Will multiple side outputs of the same type be supported?

> It wasn't implemented in prototype. But should be easy to support, we
have unique id in stream record.

- If I got it right, the FLIP proposes to change the signatures of many

user-defined functions (FlatMapFunction, WindowFunction, ...). Most of

these interfaces/classes are annotated with @Public, which means we cannot

change them in the Flink 1.x release line. What would be alternatives? I

can think of
a) casting the Collector into a RichCollector (as you do in

your prototype) or
> This is like a private magic API. Should be 100% compatible but not good
implementation.

b) retrieve the RichCollector from the RuntimeContext

> It seems better option, yet many highly used Function like FlatMap will
not get support. To get support, we need to create some redundant classes
inherited from RichFunction( like implement RichFlatMap etc) [we might put
these in different package and isolate impact of this change)

that a RichFunction provides.


I'm not so familiar with the internals of the DataStream API, so I leave

comments on that to other.


Best, Fabian

On Tue, Oct 25, 2016 at 9:00 AM, Chen Qin <c...@uber.com> wrote:

> Hey folks,
>
> Please give feedback on FLIP-13!
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 13+Side+Outputs+in+Flink
> JIRA task link to google doc https://issues.apache.org/
> jira/browse/FLINK-4460
>
> Thanks,
> Chen Qin
>



-- 
-Chen Qin


[Discuss] FLIP-13 Side Outputs in Flink

2016-10-25 Thread Chen Qin
Hey folks,

Please give feedback on FLIP-13!
https://cwiki.apache.org/confluence/display/FLINK/FLIP-13+Side+Outputs+in+Flink
JIRA task link to google doc
https://issues.apache.org/jira/browse/FLINK-4460

Thanks,
Chen Qin


Re: expose side output stream

2016-08-13 Thread Chen Qin
Stephan & Aljoscha,

What I did was a API hack without much thoughts into architect at beginning
:-)
patch attached, I think it would be "tricky" to achieve backward
compatibility with current architect.
We might need to encapsulate "Collectors" to ProcessContext as starting
point.

I would try to work on a FLIP next week.

Best Regards,
Chen


On Fri, Aug 12, 2016 at 2:13 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Chen,
> could you maybe share the code that you have so far?
>
> If you wan't you can start a google doc and then we can work together on
> fleshing out an API/implementation that we can present to the Flink
> community as a FLIP.
>
> Cheers,
> Aljoscha
>
> On Thu, 11 Aug 2016 at 14:40 Stephan Ewen <se...@apache.org> wrote:
>
> > Hi!
> >
> > This is a very big change, both on the semantics, the runtime classes.
> > These changes are tricky to get in, and usually work best if you document
> > the changes and all implications well.
> >
> > Something like a deep design doc, or a FLIP would be great for this.
> >
> > https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals
> >
> > Greetings,
> > Stephan
> >
> >
> > On Thu, Aug 11, 2016 at 12:41 AM, Chen Qin <qinnc...@gmail.com> wrote:
> >
> > > Hi there,
> > >
> > > I am thinking of implement sideOutput into Flink which seems missing
> > > support.
> > > https://cloud.google.com/dataflow/model/par-do#side-outputs
> > >
> > > It is useful because it will help pipeline author redirect corrputed
> > input/
> > > code bug to a side stream or write to a table and reconsile afterwards.
> > >
> > > After some hack prototyping, I were able to get it works for simple
> > tests.
> > > Basically, It allows env to register a side output typeInfo which will
> be
> > > passed to configurations during graph building; Adding a new transform
> > > which similar to selection transform but holding different input type;
> > > StreamEdge will has a boolean to see if that is side output edge, if
> so,
> > > create output writer loads side output type serializer and emit record
> > only
> > > when sideOutput is called.
> > >
> > > I have some problem passing side output type as template to each data
> > > stream. It means it will have to expose any output stream with two type
> > > parameters. As you can imagine, the API interface change will be
> sizable.
> > >
> > > Any suggestion?
> > >
> > > Chen
> > >
> >
>



-- 
-Chen Qin


Re: expose side output stream

2016-08-11 Thread Chen Qin
Stephan

Humm... I see.
Back off one step, how do Flink deal with corrupted input data right now,
like a dead letter queue?

Thanks,
Chen

On Thu, Aug 11, 2016 at 5:40 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> This is a very big change, both on the semantics, the runtime classes.
> These changes are tricky to get in, and usually work best if you document
> the changes and all implications well.
>
> Something like a deep design doc, or a FLIP would be great for this.
> https://cwiki.apache.org/confluence/display/FLINK/
> Flink+Improvement+Proposals
>
> Greetings,
> Stephan
>
>
> On Thu, Aug 11, 2016 at 12:41 AM, Chen Qin <qinnc...@gmail.com> wrote:
>
> > Hi there,
> >
> > I am thinking of implement sideOutput into Flink which seems missing
> > support.
> > https://cloud.google.com/dataflow/model/par-do#side-outputs
> >
> > It is useful because it will help pipeline author redirect corrputed
> input/
> > code bug to a side stream or write to a table and reconsile afterwards.
> >
> > After some hack prototyping, I were able to get it works for simple
> tests.
> > Basically, It allows env to register a side output typeInfo which will be
> > passed to configurations during graph building; Adding a new transform
> > which similar to selection transform but holding different input type;
> > StreamEdge will has a boolean to see if that is side output edge, if so,
> > create output writer loads side output type serializer and emit record
> only
> > when sideOutput is called.
> >
> > I have some problem passing side output type as template to each data
> > stream. It means it will have to expose any output stream with two type
> > parameters. As you can imagine, the API interface change will be sizable.
> >
> > Any suggestion?
> >
> > Chen
> >
>



-- 
-Chen Qin


expose side output stream

2016-08-10 Thread Chen Qin
Hi there,

I am thinking of implement sideOutput into Flink which seems missing
support.
https://cloud.google.com/dataflow/model/par-do#side-outputs

It is useful because it will help pipeline author redirect corrputed input/
code bug to a side stream or write to a table and reconsile afterwards.

After some hack prototyping, I were able to get it works for simple tests.
Basically, It allows env to register a side output typeInfo which will be
passed to configurations during graph building; Adding a new transform
which similar to selection transform but holding different input type;
StreamEdge will has a boolean to see if that is side output edge, if so,
create output writer loads side output type serializer and emit record only
when sideOutput is called.

I have some problem passing side output type as template to each data
stream. It means it will have to expose any output stream with two type
parameters. As you can imagine, the API interface change will be sizable.

Any suggestion?

Chen


Re: Cassandra statebackend

2016-08-08 Thread Chen Qin
Aljoscha,

Sure thing, will do after key/group feature in place when we got bandwith :)

Gyula,

That's where we started, many terms are copied over(logical timestamp,
compaction, lazy restore). we have to use Cassandra which offer less in
transaction and consistency to gain availability and cross data center
replication. It leverage RocksDB as pre-checkpoint cache and evict/lazy
restore from Cassandra.

Chen




On Mon, Aug 8, 2016 at 2:51 AM, Gyula Fóra <gyf...@apache.org> wrote:

> Hi,
>
> I have done something similar in the past for storing state in sharded
> MySql databases. We used this for a while for state size scaling reasons
> but have switched to RocksDB later and therefore this statebackend has been
> removed from Flink to cut some maintenance costs.
>
> You can find the initial PR here that contains the description:
> https://github.com/apache/flink/pull/1305
>
> Maybe it helps a little, I don't know :)
>
> Cheers,
> Gyula
>
> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont: 2016. aug. 8.,
> H,
> 11:41):
>
> > Hi,
> > thanks for sharing the design doc, these are valuable ideas.
> >
> > We might have to revisit the specifics once the re-sharding/key-group
> > changes are in Flink and once you actually want to start working on this.
> >
> > Cheers,
> > Aljoscha
> >
> > On Sat, 6 Aug 2016 at 07:32 Chen Qin <qinnc...@gmail.com> wrote:
> >
> > > Aljoscha
> > >
> > > Sorry about late reply.
> > >
> > > David and I drafted a design doc with some diagrams. We may not work on
> > it
> > > immediately, but we thought it would be valuable to share our thoughts
> > and
> > > hear feedbacks.
> > >
> > >
> > >
> > https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-
> FlFjSHcL8J3YR2xLdk/edit#heading=h.12fh7saw98iz
> > >
> > > >about sate lineage:
> > >
> > > One approach might add pointer to keep data lineage between updated key
> > in
> > > first checkpoint and t's restored checkpoint_id correspondent. It
> assume
> > > restore from a save point will not cause job manager re instrument
> > already
> > > used checkpoint id.
> > >
> > > >clean up old states,
> > >
> > > Since job manager already knew save points and latests successful
> > > checkpoint. When a save point is created, it could be good time for job
> > > manager to instrument clean up message and ask each states to move
> > > effective key/values up to current save point and delete anything
> before.
> > > That's doesn't need to be synchronized since both before and after
> > > compaction will not change states value but location where that value
> > > stored. Delete a save point / checkpoint can also trigger compaction.
> > >
> > > Thanks,
> > > Chen
> > >
> > > On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek <aljos...@apache.org
> >
> > > wrote:
> > >
> > > > Hi,
> > > > thanks for opening the Jira issue. I'll continue the discussion here
> > > > instead of in the Jira, I hope that's OK.
> > > >
> > > > That last paragraph of yours is the most interesting. We will have to
> > > adapt
> > > > the way that checkpoints are stored to accommodate state backends
> that
> > > > store state in some external system, such as Cassandra. Right now,
> each
> > > > Checkpoint/Savepoint is stored in isolation and the system does not
> > know
> > > > about any relation between them. We have to introduce such a
> relation,
> > > > basically putting the checkpoints into a graph structure that shows
> the
> > > > lineage of the checkpoints. Then, when we are cleaning up old
> > checkpoints
> > > > we check the ranges of (logical) timestamps of the checkpoints that
> we
> > > can
> > > > remove and instruct the StateBackend to remove the relevant ranges.
> > > >
> > > > This leads to another interesting thing. We might need to have a
> > > > StateBackend component running in the JobManager that we can invoke
> to
> > > > delete ranges of checkpoints. Right now, a StateBackend only lives on
> > the
> > > > TaskManager, in the operators. Cleanup of time ranges, however,
> should
> > > > probably happen in some centralized location.
> > > >
> > > > Cheers,
> > > > Aljoscha
> > > >
> > &g

Re: Cassandra statebackend

2016-08-05 Thread Chen Qin
Aljoscha

Sorry about late reply.

David and I drafted a design doc with some diagrams. We may not work on it
immediately, but we thought it would be valuable to share our thoughts and
hear feedbacks.

https://docs.google.com/document/d/1diHQyOPZVxgmnmYfiTa6glLf-FlFjSHcL8J3YR2xLdk/edit#heading=h.12fh7saw98iz

>about sate lineage:

One approach might add pointer to keep data lineage between updated key in
first checkpoint and t's restored checkpoint_id correspondent. It assume
restore from a save point will not cause job manager re instrument already
used checkpoint id.

>clean up old states,

Since job manager already knew save points and latests successful
checkpoint. When a save point is created, it could be good time for job
manager to instrument clean up message and ask each states to move
effective key/values up to current save point and delete anything before.
That's doesn't need to be synchronized since both before and after
compaction will not change states value but location where that value
stored. Delete a save point / checkpoint can also trigger compaction.

Thanks,
Chen

On Thu, Jul 28, 2016 at 6:59 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> thanks for opening the Jira issue. I'll continue the discussion here
> instead of in the Jira, I hope that's OK.
>
> That last paragraph of yours is the most interesting. We will have to adapt
> the way that checkpoints are stored to accommodate state backends that
> store state in some external system, such as Cassandra. Right now, each
> Checkpoint/Savepoint is stored in isolation and the system does not know
> about any relation between them. We have to introduce such a relation,
> basically putting the checkpoints into a graph structure that shows the
> lineage of the checkpoints. Then, when we are cleaning up old checkpoints
> we check the ranges of (logical) timestamps of the checkpoints that we can
> remove and instruct the StateBackend to remove the relevant ranges.
>
> This leads to another interesting thing. We might need to have a
> StateBackend component running in the JobManager that we can invoke to
> delete ranges of checkpoints. Right now, a StateBackend only lives on the
> TaskManager, in the operators. Cleanup of time ranges, however, should
> probably happen in some centralized location.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Jul 2016 at 22:38 Chen Qin <qinnc...@gmail.com> wrote:
>
> > Hi Aljoscha,
> >
> > Cool! I created a JIRA for this.
> > https://issues.apache.org/jira/browse/FLINK-4266
> > Some comments inline.
> >
> > Chen
> >
> > On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Hi,
> > > I thought there was a Jira for that but I looked and couldn't find it.
> If
> > > you'd like you can create one and we can discuss the design. Do you
> have
> > > any ideas yet?
> > >
> > > The tricky things I see in this are:
> > >  - Knowing which data is the current data. This will require some kind
> of
> > > timestamps or increasing IDs.
> > >
> >
> > ​We are thinking of leveraging client assigned timestamp from
> > checkpoint_timestamp.
> > ​
> >
> > >  - Knowing when you can retire data from Cassandra
> > >
> > ​That's interesting part, each state checkpoint snapshot might reference
> > t's previous snapshot​. Delete/Consolidate rows previous snapshot with
> > eventual consistency can be tricky.
> >  ​
> >
> > > Some of these might require some changes to how Flink handles
> checkpoints
> > > and it somewhat goes into the direction of incremental checkpoints.
> That
> > > last part is especially important once you deal with savepoints, which
> > can
> > > stay around indefinitely.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Mon, 25 Jul 2016 at 08:31 Tai Gordon <tzuli...@gmail.com> wrote:
> > >
> > > > Hi Chen,
> > > >
> > > > AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> > > > Cassandra state backend. I think it’ll definitely by interesting to
> > have
> > > > one in Flink.
> > > >
> > > > Regards,
> > > > Gordon
> > > >
> > > >
> > > > On July 25, 2016 at 10:24:32 AM, Chen Qin (qinnc...@gmail.com)
> wrote:
> > > >
> > > > ​Hi there,
> > > >
> > > > Is there any design docs or on going efforts there?
> > > >
> > > > Thanks,
> > > > Chen ​
> > > >
> > >
> >
>


Re: Cassandra statebackend

2016-07-25 Thread Chen Qin
Hi Aljoscha,

Cool! I created a JIRA for this.
https://issues.apache.org/jira/browse/FLINK-4266
Some comments inline.

Chen

On Mon, Jul 25, 2016 at 2:41 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> I thought there was a Jira for that but I looked and couldn't find it. If
> you'd like you can create one and we can discuss the design. Do you have
> any ideas yet?
>
> The tricky things I see in this are:
>  - Knowing which data is the current data. This will require some kind of
> timestamps or increasing IDs.
>

​We are thinking of leveraging client assigned timestamp from
checkpoint_timestamp.
​

>  - Knowing when you can retire data from Cassandra
>
​That's interesting part, each state checkpoint snapshot might reference
t's previous snapshot​. Delete/Consolidate rows previous snapshot with
eventual consistency can be tricky.
 ​

> Some of these might require some changes to how Flink handles checkpoints
> and it somewhat goes into the direction of incremental checkpoints. That
> last part is especially important once you deal with savepoints, which can
> stay around indefinitely.
>
> Cheers,
> Aljoscha
>
> On Mon, 25 Jul 2016 at 08:31 Tai Gordon <tzuli...@gmail.com> wrote:
>
> > Hi Chen,
> >
> > AFAIK, there currently isn’t any FLIP / JIRA / work currently for a
> > Cassandra state backend. I think it’ll definitely by interesting to have
> > one in Flink.
> >
> > Regards,
> > Gordon
> >
> >
> > On July 25, 2016 at 10:24:32 AM, Chen Qin (qinnc...@gmail.com) wrote:
> >
> > ​Hi there,
> >
> > Is there any design docs or on going efforts there?
> >
> > Thanks,
> > Chen ​
> >
>


[jira] [Created] (FLINK-4266) Cassandra Statebackend

2016-07-25 Thread Chen Qin (JIRA)
Chen Qin created FLINK-4266:
---

 Summary: Cassandra Statebackend
 Key: FLINK-4266
 URL: https://issues.apache.org/jira/browse/FLINK-4266
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing
Affects Versions: 1.0.3, 1.2.0
Reporter: Chen Qin
Priority: Minor


Current FileSystem statebackend limits whole state size to disk space. 
For long running task that hold window content for long period of time, it 
needs to split out states to durable remote storage and replicated across data 
centers.

We look into implementation from leverage checkpoint timestamp as versioning 
and do range query to get current state; we also want to reduce "hot states" 
hitting remote db per every update between adjacent checkpoints by tracking 
update logs and merge them, do batch updates only when checkpoint; lastly, we 
are looking for eviction policy that can identify "hot keys" in k/v state and 
lazy load those "cold keys" from Cassandra.

For now, we don't have good story regarding to data retirement. We might have 
to keep forever until manually run command and clean per job related state 
data. Some of features might related to incremental checkpointing feature, we 
hope to align with effort there also.

Welcome comments, I will try to put a draft design doc after gathering some 
feedback.









--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Cassandra statebackend

2016-07-24 Thread Chen Qin
​Hi there,

Is there any design docs or on going efforts there?

Thanks,
Chen ​


Re: [DISCUSS] Allowed Lateness in Flink

2016-07-18 Thread Chen Qin
BTW, do you have rough timeline in term of roll out it to production?

Thanks,
Chen


On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> Chen commented this on the doc (I'm mirroring here so everyone can follow):
> "It would be cool to be able to access last snapshot of window states
> before it get purged. Pipeline author might consider put it to external
> storage and deal with late arriving events by restore corresponding
> window."
>
> My answer:
> This is partially covered by the section called "What Happens at
> Window-Cleanup Time, Who Decides When to Purge". What I want to introduce
> is that the window can have one final emission if there is new data in the
> buffers at cleanup time.
>
> The work on this will also depend on this proposal:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> With
> this, the WindowFunction can get meta data about the window firing so it
> could be informed that this is the last firing before a cleanup and that
> there already was an earlier, on-time firing.
>
> Does this cover your concerns, Chen?
>
> Cheers,
> Aljoscha
>
> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote:
>
> > Sure. Currently, it looks like any element assigned to a too late window
> > will be dropped silently ?
> >
> > Having a late window stream imply somehow Flink needs to add one more
> state
> > to window and split window state cleanup from window retirement.
> > I would suggest as simple as adding a function in trigger called
> > OnLateElement and always fire_purge it would enable developer aware and
> > handle this case.
> >
> > Chen
> >
> >
> >
> > On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > @Chen I added a section at the end of the document regarding access to
> > the
> > > elements that are dropped as late. Right now, the section just mentions
> > > that we have to do this but there is no real proposal yet for how to do
> > it.
> > > Only a rough sketch so that we don't forget about it.
> > >
> > > On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote:
> > >
> > > > +1 for allowedLateness scenario.
> > > >
> > > > The rationale behind is there are backfills or data issues hold
> > in-window
> > > > data till watermark pass end time. It cause sink write partial
> output.
> > > >
> > > > Allow high allowedLateness threshold makes life easier to merge those
> > > > results and overwrite partial output with correct output at sink. But
> > > yeah,
> > > > pipeline author is at risk of blow up statebackend with huge states.
> > > >
> > > > Alternatively, In some case, if sink allows read-check-merge
> operation,
> > > > window can explicit call out events ingested after allowedLateness.
> It
> > > asks
> > > > pipeline author mitigated these events in a way outside of flink
> > > ecosystem.
> > > > The catch is that since everywhere in a flink job can replay and
> > > > checkpoint, notification should somehow includes these info as well.
> > > >
> > > > Thanks
> > > > Chen
> > > >
> > > > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > > > k.klou...@data-artisans.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > In the effort to move the discussion to the mailing list, rather
> than
> > > the
> > > > > doc,
> > > > > there was a comment in the doc:
> > > > >
> > > > > “It seems this proposal marries the allowed lateness of events and
> > the
> > > > > discarding of window state. In most use cases this should be
> > > sufficient,
> > > > > but there are instances where having independent control of these
> may
> > > be
> > > > > useful.
> > > > >
> > > > > For instance, you may have a job that computes some aggregate,
> like a
> > > > sum.
> > > > > You may want to keep the window state around for a while, but not
> too
> > > > long.
> > > > > Yet you may want to continue processing late events after you
> > discarded
> > > > the
> > > > > window state. It is possible that your stre

Re: [DISCUSS] Allowed Lateness in Flink

2016-07-18 Thread Chen Qin
Aljoscha,

Yes, that would works for our case!

Chen


On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> Chen commented this on the doc (I'm mirroring here so everyone can follow):
> "It would be cool to be able to access last snapshot of window states
> before it get purged. Pipeline author might consider put it to external
> storage and deal with late arriving events by restore corresponding
> window."
>
> My answer:
> This is partially covered by the section called "What Happens at
> Window-Cleanup Time, Who Decides When to Purge". What I want to introduce
> is that the window can have one final emission if there is new data in the
> buffers at cleanup time.
>
> The work on this will also depend on this proposal:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata
> With
> this, the WindowFunction can get meta data about the window firing so it
> could be informed that this is the last firing before a cleanup and that
> there already was an earlier, on-time firing.
>
> Does this cover your concerns, Chen?
>
> Cheers,
> Aljoscha
>
> On Sun, 10 Jul 2016 at 21:24 Chen Qin <qinnc...@gmail.com> wrote:
>
> > Sure. Currently, it looks like any element assigned to a too late window
> > will be dropped silently ?
> >
> > Having a late window stream imply somehow Flink needs to add one more
> state
> > to window and split window state cleanup from window retirement.
> > I would suggest as simple as adding a function in trigger called
> > OnLateElement and always fire_purge it would enable developer aware and
> > handle this case.
> >
> > Chen
> >
> >
> >
> > On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > @Chen I added a section at the end of the document regarding access to
> > the
> > > elements that are dropped as late. Right now, the section just mentions
> > > that we have to do this but there is no real proposal yet for how to do
> > it.
> > > Only a rough sketch so that we don't forget about it.
> > >
> > > On Fri, 8 Jul 2016 at 07:47 Chen Qin <qinnc...@gmail.com> wrote:
> > >
> > > > +1 for allowedLateness scenario.
> > > >
> > > > The rationale behind is there are backfills or data issues hold
> > in-window
> > > > data till watermark pass end time. It cause sink write partial
> output.
> > > >
> > > > Allow high allowedLateness threshold makes life easier to merge those
> > > > results and overwrite partial output with correct output at sink. But
> > > yeah,
> > > > pipeline author is at risk of blow up statebackend with huge states.
> > > >
> > > > Alternatively, In some case, if sink allows read-check-merge
> operation,
> > > > window can explicit call out events ingested after allowedLateness.
> It
> > > asks
> > > > pipeline author mitigated these events in a way outside of flink
> > > ecosystem.
> > > > The catch is that since everywhere in a flink job can replay and
> > > > checkpoint, notification should somehow includes these info as well.
> > > >
> > > > Thanks
> > > > Chen
> > > >
> > > > On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas <
> > > > k.klou...@data-artisans.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > In the effort to move the discussion to the mailing list, rather
> than
> > > the
> > > > > doc,
> > > > > there was a comment in the doc:
> > > > >
> > > > > “It seems this proposal marries the allowed lateness of events and
> > the
> > > > > discarding of window state. In most use cases this should be
> > > sufficient,
> > > > > but there are instances where having independent control of these
> may
> > > be
> > > > > useful.
> > > > >
> > > > > For instance, you may have a job that computes some aggregate,
> like a
> > > > sum.
> > > > > You may want to keep the window state around for a while, but not
> too
> > > > long.
> > > > > Yet you may want to continue processing late events after you
> > discarded
> > > > the
> > > > > window state. It is possible that your stream sinks can make use of
> > > this
> > &

custom control messages from source

2016-07-17 Thread Chen Qin
Hi there,

So far, checkpoint trigger is hardcoded in CheckpointCorrdinator which
triggered periodically and push control messages to task managers. It was
implemented orthogonal to business logics implemented in jobs.

Our scenario requires master pipeline flow control messages along with
events in distributed queue. When follower pipeline source detect a control
message(checkpoint_barrier_id / watermark / restore_to_checkpoint_id) it
will block itself and send message to checkpoint coordinator and request a
specific checkpoint. Once ack from checkpoint coordinator, and acked back
checkpoint coordinator, source unblock itself and keep going. It would
contains certain message de-dupe logic which always honor first seeing
control message and discard latter duplicates given the assumption that
events consumed in distributed queue is strongly ordered.

Pros:

   - Allow pipeline author define customized checkpointing logic
   - Allow break down large pipeline into smaller ones and connect via
   streaming queue
  - leverage data locality and run sub pipeline near it's dependency.
  - pipe late arriving events to a side pipeline and consolidate there

Cons:

   - Overhead of blocking follower pipeline source
   - Overhead of distributed queue latency
   - Overhead of writing control messages to distributed queue


Does that makes sense?

Thanks,
Chen


Re: [DISCUSS] Allowed Lateness in Flink

2016-07-07 Thread Chen Qin
+1 for allowedLateness scenario.

The rationale behind is there are backfills or data issues hold in-window
data till watermark pass end time. It cause sink write partial output.

Allow high allowedLateness threshold makes life easier to merge those
results and overwrite partial output with correct output at sink. But yeah,
pipeline author is at risk of blow up statebackend with huge states.

Alternatively, In some case, if sink allows read-check-merge operation,
window can explicit call out events ingested after allowedLateness. It asks
pipeline author mitigated these events in a way outside of flink ecosystem.
The catch is that since everywhere in a flink job can replay and
checkpoint, notification should somehow includes these info as well.

Thanks
Chen

On Thu, Jul 7, 2016 at 12:14 AM, Kostas Kloudas  wrote:

> Hi,
>
> In the effort to move the discussion to the mailing list, rather than the
> doc,
> there was a comment in the doc:
>
> “It seems this proposal marries the allowed lateness of events and the
> discarding of window state. In most use cases this should be sufficient,
> but there are instances where having independent control of these may be
> useful.
>
> For instance, you may have a job that computes some aggregate, like a sum.
> You may want to keep the window state around for a while, but not too long.
> Yet you may want to continue processing late events after you discarded the
> window state. It is possible that your stream sinks can make use of this
> data. For instance, they may be writing to a data store that returns an
> error if a row already exists, which allow the sink to read the existing
> row and update it with the new data."
>
> To which I would like to reply:
>
> If I understand your use-case correctly, I believe that the proposed
> binding of the allowed lateness to the state purging does not impose any
> problem. The lateness specifies the upper time bound, after which the state
> will be discarded. Between the start of a window and its (end +
> allowedLateness) you can write custom triggers that fire, purge the state,
> or do nothing. Given this, I suppose that, at the most extreme case, you
> can specify an allowed lateness of Long.MaxValue and do the purging of the
> state "manually". By doing this, you remove the safeguard of letting the
> system purge the state at some point in time, and you can do your own
> custom state management that fits your needs.
>
> Thanks,
> Kostas
>
> > On Jul 6, 2016, at 5:43 PM, Aljoscha Krettek 
> wrote:
> >
> > @Vishnu Funny you should ask that because I have a design doc lying
> around.
> > I'll open a new mail thread to not hijack this one.
> >
> > On Wed, 6 Jul 2016 at 17:17 Vishnu Viswanath <
> vishnu.viswanat...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I was going through the suggested improvements in window, and I have
> >> few questions/suggestion on improvement regarding the Evictor.
> >>
> >> 1) I am having a use case where I have to create a custom Evictor that
> will
> >> evict elements from the window based on the value (e.g., if I have
> elements
> >> are of case class Item(id: Int, type:String) then evict elements that
> has
> >> type="a"). I believe this is not currently possible.
> >> 2) this is somewhat related to 1) where there should be an option to
> evict
> >> elements from anywhere in the window. not only from the beginning of the
> >> window. (e.g., apply the delta function to all elements and remove all
> >> those don't pass. I checked the code and evict method just returns the
> >> number of elements to be removed and processTriggerResult just skips
> those
> >> many elements from the beginning.
> >> 3) Add an option to enables the user to decide if the eviction should
> >> happen before the apply function or after the apply function. Currently
> it
> >> is before the apply function, but I have a use case where I need to
> first
> >> apply the function and evict afterward.
> >>
> >> I am doing these for a POC so I think I can modify the flink code base
> to
> >> make these changes and build, but I would appreciate any suggestion on
> >> whether these are viable changes or will there any performance issue if
> >> these are done. Also any pointer on where to start(e.g, do I create a
> new
> >> class similar to EvictingWindowOperator that extends WindowOperator?)
> >>
> >> Thanks and Regards,
> >> Vishnu Viswanath,
> >>
> >> On Wed, Jul 6, 2016 at 9:39 AM, Aljoscha Krettek 
> >> wrote:
> >>
> >>> I did:
> >>>
> >>>
> >>
> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3ccanmxww0abttjjg9ewdxrugxkjm7jscbenmvrzohpt2qo3pq...@mail.gmail.com%3e
> >>> ;-)
> >>>
> >>> On Wed, 6 Jul 2016 at 15:31 Ufuk Celebi  wrote:
> >>>
>  On Wed, Jul 6, 2016 at 3:19 PM, Aljoscha Krettek  >
>  wrote:
> > In the future, it might be good to to discussions directly on the ML
> >>> and
> > then 

support launch more than one pipeline in a yarn session

2016-06-27 Thread Chen Qin
Hi there,

We are researching launch more than one pipeline in a flink yarn session.
It's sort of like cluster mode where user pick off multiple pipelines on a
job manager. Is there any plan to support this use case? If not, why not?
Is it because of better isolation per pipeline or some implementation
insights? What would be recommended way to add this part if some one plan
to work on it?

Speaking of our use case, we want to allow runs arbitrary numbers of
pipeline in a given flink yarn session so that people don't need to chase
rabbits and manage lots of web dashboards at same time.

Thanks,
Chen


Re: incremental Checkpointing , Rocksdb HA

2016-06-18 Thread Chen Qin
Thanks everyone, we were reasoning about the expense of drawing snapshots
of large state as a major benenfits to using rocksdb compare to jdbc
backend.

Our use case is money related event processing. It requires keeping weeks
long large window, major data source ingestion QPS is around hundreds,
another source that instrument data validation comes in out of order
fashion, often largely delayed till payout window cut off force relative
small amount of related messaged eviction.

Again, we are very excited to hear about design and glad to provide some
feedback.

Thanks,
Chen


On Fri, Jun 10, 2016 at 7:24 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> The incremental checkpointing is still being worked upon. Aljoscha, Till
> and me have thought through this a lot and have now a pretty good
> understanding how we want to do it with respect to coordination,
> savepoints, restore, garbage collecting unneeded checkpoints, etc.
>
> We want to put this into a design doc as soon as possible, and we'd be
> happy to take input and discussion on the design. Please stay tuned for a
> little bit...
>
> Greetings,
> Stephan
>
>
> On Thu, Jun 9, 2016 at 8:42 PM, Nick Dimiduk <ndimi...@gmail.com> wrote:
>
> > IIRC, all the above support data locality from back in the MR days. Not
> > sure how much data you're planning to checkpoint though -- is locality
> > really that important for transient processor state?
> >
> > On Thu, Jun 9, 2016 at 11:06 AM, CPC <acha...@gmail.com> wrote:
> >
> > > Cassandra backend would be interesting especially  if flink could
> benefit
> > > from cassandra data locality. Cassandra/spark integration is using this
> > for
> > > information to schedule spark tasks.
> > >
> > > On 9 June 2016 at 19:55, Nick Dimiduk <ndimi...@gmail.com> wrote:
> > >
> > > > You might also consider support for a Bigtable
> > > > backend: HBase/Accumulo/Cassandra. The data model should be similar
> > > > (identical?) to RocksDB and you get HA, recoverability, and support
> for
> > > > really large state "for free".
> > > >
> > > > On Thursday, June 9, 2016, Chen Qin <qinnc...@gmail.com> wrote:
> > > >
> > > > > Hi there,
> > > > >
> > > > > What is progress on incremental checkpointing? Does flink dev has
> > plan
> > > to
> > > > > work on this or JIRA to track this? super interested to know.
> > > > >
> > > > > I also research and consider use rocksdbstatebackend without
> running
> > > HDFS
> > > > > cluster nor talk to S3. Some primitive idea is to use ZK to store /
> > > > notify
> > > > > state propagation progress and propagate via implement chain
> > > replication
> > > > on
> > > > > top of YARN provisioned storage node.
> > > > >
> > > > > Thanks,
> > > > > Chen
> > > > >
> > > >
> > >
> >
>


incremental Checkpointing , Rocksdb HA

2016-06-09 Thread Chen Qin
Hi there,

What is progress on incremental checkpointing? Does flink dev has plan to
work on this or JIRA to track this? super interested to know.

I also research and consider use rocksdbstatebackend without running HDFS
cluster nor talk to S3. Some primitive idea is to use ZK to store / notify
state propagation progress and propagate via implement chain replication on
top of YARN provisioned storage node.

Thanks,
Chen