RE: [ANNOUNCE] Chengxiang Li added as committer

2016-01-19 Thread Li, Chengxiang
Thanks everyone, it's always great to collaborate with you guys, look forward 
to contribute more on Flink.

Thanks
Chengxiang 

-Original Message-
From: Paris Carbone [mailto:par...@kth.se] 
Sent: Tuesday, January 19, 2016 9:24 PM
To: dev@flink.apache.org
Subject: Re: [ANNOUNCE] Chengxiang Li added as committer

Congrats Chengxiang! Really pleased to have you on board

> On 19 Jan 2016, at 13:16, Matthias J. Sax  wrote:
> 
> Congrats and welcome Chengxiang!! :)
> 
> On 01/19/2016 12:56 PM, Kostas Tzoumas wrote:
>> Welcome Chengxiang!!
>> 
>> On Tue, Jan 19, 2016 at 12:31 PM, Stephan Ewen  wrote:
>> 
>>> Good to have you on board!
>>> 
>>> On Tue, Jan 19, 2016 at 11:29 AM, Maximilian Michels 
>>> wrote:
>>> 
 Pleased to have you with us Chengxiang!
 
 Cheers,
 Max
 
 On Tue, Jan 19, 2016 at 11:13 AM, Chiwan Park 
 wrote:
> Congrats! Welcome Chengxiang Li!
> 
>> On Jan 19, 2016, at 7:13 PM, Vasiliki Kalavri <
 vasilikikala...@gmail.com> wrote:
>> 
>> Congratulations! Welcome Chengxiang Li!
>> 
>> On 19 January 2016 at 11:02, Fabian Hueske  wrote:
>> 
>>> Hi everybody,
>>> 
>>> I'd like to announce that Chengxiang Li accepted the PMC's offer to
 become
>>> a committer of the Apache Flink project.
>>> 
>>> Please join me in welcoming Chengxiang Li!
>>> 
>>> Best, Fabian
>>> 
> 
> Regards,
> Chiwan Park
> 
 
>>> 
>> 
> 



RE: [DISCUSS] Git force pushing and deletion of branchs

2016-01-13 Thread Li, Chengxiang
+1 on the original style.
Master branch disable force pushing in case of misusing and feature branch 
enable force pushing for flexible developing.

-Original Message-
From: Gyula Fóra [mailto:gyf...@apache.org] 
Sent: Wednesday, January 13, 2016 6:36 PM
To: dev@flink.apache.org
Subject: Re: [DISCUSS] Git force pushing and deletion of branchs

+1 for protecting the master branch.

I also don't see any reason why anyone should force push there

Gyula

Fabian Hueske  ezt írta (időpont: 2016. jan. 13., Sze,
11:07):

> Hi everybody,
>
> Lately, ASF Infra has changed the write permissions of all Git 
> repositories twice.
>
> Originally, it was not possible to force into the master branch.
> A few weeks ago, infra disabled also force pushing into other branches.
>
> Now, this has changed again after the issue was discussed with the ASF 
> board.
> The current situation is the following:
> - force pushing is allowed on all branched, including master
> - branches and tags can be deleted (not sure if this applies as well 
> for the master branch)
> - "the 'protected' portions of git to primarily focus on refs/tags/rel 
> - thus any tags under rel, will have their entire commit history."
>
> I am not 100% sure which exact parts of the repository are protected 
> now as I am not very much into the details of Git.
> However, I believe we need to create new tags under rel for our 
> previous releases to protect them.
>
> In addition, I would like to propose to ask Infra to add protection 
> for the master branch. I can only recall very few situations where 
> changes had to be reverted. I am much more in favor of a reverting 
> commit now and then compared to a branch that can be arbitrarily changed.
>
> What do you think about this?
>
> Best, Fabian
>


RE: Effort to add SQL / StreamSQL to Flink

2016-01-07 Thread Li, Chengxiang
Very cool work, look forward to contribute.

-Original Message-
From: Chiwan Park [mailto:chiwanp...@apache.org] 
Sent: Friday, January 8, 2016 9:36 AM
To: dev@flink.apache.org
Subject: Re: Effort to add SQL / StreamSQL to Flink

Really good! Many people want to use SQL. :)

> On Jan 8, 2016, at 2:36 AM, Kostas Tzoumas  wrote:
> 
> Wow! Thanks Fabian, this looks fantastic!
> 
> On Thu, Jan 7, 2016 at 4:35 PM, Stephan Ewen  wrote:
> 
>> Super, thanks for that detailed effort, Fabian!
>> 
>> On Thu, Jan 7, 2016 at 3:40 PM, Matthias J. Sax  wrote:
>> 
>>> Pretty cool!
>>> 
>>> On 01/07/2016 03:05 PM, Fabian Hueske wrote:
 Hi everybody,
 
 in the last days, Timo and I refined the design document for adding 
 a
>>> SQL /
 StreamSQL interface on top of Flink that was started by Stephan.
 
 The document proposes an architecture that is centered around 
 Apache Calcite. Calcite is an Apache top-level project and includes 
 a SQL
>>> parser,
 a semantic validator for relational queries, and a rule- and 
 cost-based relational optimizer. Calcite is used by Apache Hive and 
 Apache Drill (among other projects). In a nutshell, the plan is to 
 translate Table
>> API
 and SQL queries into Calcite's relational expression trees, 
 optimize
>>> these
 trees, and translate them into DataSet and DataStream programs.The
>>> document
 breaks down the work into several tasks and subtasks.
 
 Please review the design document and comment.
 
 -- >
 
>>> 
>> https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjP
>> cp1h2TVqdI/edit?usp=sharing
 
 Unless there are major concerns with the design, Timo and I want to
>> start
 next week to move the current Table API on top of Apache Calcite 
 (Task
>> 1
>>> in
 the document). The goal of this task is to have the same 
 functionality
>> as
 currently, but with Calcite in the translation process. This is a
>>> blocking
 task that we hope to complete soon. Afterwards, we can 
 independently
>> work
 on different aspects such as extending the Table API, adding a SQL 
 interface (basically just a parser), integration with external data 
 sources, better code generation, optimization rules, streaming 
 support
>>> for
 the Table API, StreamSQL, etc..
 
 Timo and I plan to work on a WIP branch to implement Task 1 and 
 merge
>> it
>>> to
 the master branch once the task is completed. Of course, everybody 
 is welcome to contribute to this effort. Please let us know such 
 that we
>> can
 coordinate our efforts.
 
 Thanks,
 Fabian
 
>>> 
>>> 
>> 

Regards,
Chiwan Park




RE: The null in Flink

2015-12-07 Thread Li, Chengxiang
Hi, 
Summary of our discussion about NULL value handling in FLink:
1. Read from data source to Table/Row DataSet directly is necessary for NULL 
value handling.
2. NULL value representation in Row object, this would change its binary data 
layout, so we would need new Row Serializer/Comparator(and its dependency) 
which aware of this new binary data layout. Tuple and Case Class 
serializer/Comparator should remain the same.
3. NULL value handling in operations. We would follow the SQL standard as 
default, but these are not concluded yet, any more input would be welcomed. 

I've created an umbrella JIRA(https://issues.apache.org/jira/browse/FLINK-3139) 
for this, the following subtasks based on the previous 3 aspects would be 
created as well, so anyone interested could contribute and comment on all 
subtasks. And we could also move discussion on specified issues to JIRA system.

Thanks
Chengxiang 

-Original Message-
From: Li, Chengxiang [mailto:chengxiang...@intel.com] 
Sent: Thursday, December 3, 2015 4:43 PM
To: dev@flink.apache.org
Subject: RE: The null in Flink

Hi, Stephan
Treat UNKOWN as FALSE may works if the Boolean expression is used in filter 
operation, but for other operations, such as select and groupBy, it does not 
make sense anymore, we should need UNKNOWN value(or unified as NULL) to 
distinguish with TRUE/FALSE .

Thanks
Chengxiang 

-Original Message-
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Wednesday, December 2, 2015 6:27 PM
To: dev@flink.apache.org
Subject: Re: The null in Flink

Hi Chenliang!

I have to dig into this again, it was a while back. I think (vaguely) the 
reason why this worked was that in the end (at the root of a tree that is a 
logical expression) if the result is UNKNOWN, it is treated like FALSE.

For example a predicate like "WHERE t.a > 10 && t.b == 'pending' ". If one 
boolean atom is UNKNOWN, the other is TRUE, the whole term becomes UNKNOWN and 
the row is filtered out (as if the predicate was false) - the result of the 
query contains no rows where predicate results are UNKNOWN.

Stephan



On Tue, Dec 1, 2015 at 4:09 AM, Li, Chengxiang <chengxiang...@intel.com>
wrote:

> Stephen,
> For the 3rd topic, you mentioned that "If the boolean expressions are 
> monotonous (have no NOT), then the UNKNOWN value can be the same as 
> FALSE ", as UNKNOWN means it could be TRUE as well, does it a proper 
> way to handle it just as FALSE?
>
> Aljoscha,
> I agree with you, Table can only be transformed from Tuple/Case Class 
> DataSet now, and Tuple/Case Class does not allow null field value, so 
> read files from data source to Row DataSet is necessary for NULL value 
> handling.
>
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Friday, November 27, 2015 6:41 PM
> To: dev@flink.apache.org
> Subject: Re: The null in Flink
>
> Oh, this is probably the Jira for what I mentioned:
> https://issues.apache.org/jira/browse/FLINK-2988
>
> > On 27 Nov 2015, at 11:02, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > Hi,
> > just some information. The Table API code generator already has
> preliminary support for generating code that is NULL-aware. So for 
> example if you have expressions like 1 + NULL the result would also be null.
> >
> > I think one of the missing pieces is a way to get data that contains
> null values into the system. For example, right now the expected way 
> to read csv files is via tuples and they don’t support null values. I 
> think we need a way to directly read CSV files into a Row DataSet (or Table).
> >
> > Cheers,
> > Aljoscha
> >> On 26 Nov 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote:
> >>
> >> Hi!
> >>
> >> Thanks for the good discussion! Here are some thoughts from my side:
> >>
> >> 1)
> >> I would agree with Chengxiang that it helps to have as much NULL 
> >> handling in the table API as possible, since most SQL constructs 
> >> will be permitted there are well.
> >>
> >> 2)
> >> A question that I have is whether we want to actually follow the 
> >> SQL standard exactly. There is a lot of criticism on NULL in the 
> >> SQL standard, and there have been many good proposals for more 
> >> meaningful semantics (for example differentiate between the 
> >> meanings "value missing", "value unknown", "value not applicable", etc).
> >>
> >> Going with the SQL way is easiest and makes SQL addition on top of 
> >> the table API much easier. Also, there is only one type of NULL, 
> >> meaning that null-values can be encoded efficiently in

RE: The null in Flink

2015-12-03 Thread Li, Chengxiang
Hi, Stephan
Treat UNKOWN as FALSE may works if the Boolean expression is used in filter 
operation, but for other operations, such as select and groupBy, it does not 
make sense anymore, we should need UNKNOWN value(or unified as NULL) to 
distinguish with TRUE/FALSE .

Thanks
Chengxiang 

-Original Message-
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Wednesday, December 2, 2015 6:27 PM
To: dev@flink.apache.org
Subject: Re: The null in Flink

Hi Chenliang!

I have to dig into this again, it was a while back. I think (vaguely) the 
reason why this worked was that in the end (at the root of a tree that is a 
logical expression) if the result is UNKNOWN, it is treated like FALSE.

For example a predicate like "WHERE t.a > 10 && t.b == 'pending' ". If one 
boolean atom is UNKNOWN, the other is TRUE, the whole term becomes UNKNOWN and 
the row is filtered out (as if the predicate was false) - the result of the 
query contains no rows where predicate results are UNKNOWN.

Stephan



On Tue, Dec 1, 2015 at 4:09 AM, Li, Chengxiang <chengxiang...@intel.com>
wrote:

> Stephen,
> For the 3rd topic, you mentioned that "If the boolean expressions are 
> monotonous (have no NOT), then the UNKNOWN value can be the same as 
> FALSE ", as UNKNOWN means it could be TRUE as well, does it a proper 
> way to handle it just as FALSE?
>
> Aljoscha,
> I agree with you, Table can only be transformed from Tuple/Case Class 
> DataSet now, and Tuple/Case Class does not allow null field value, so 
> read files from data source to Row DataSet is necessary for NULL value 
> handling.
>
> -Original Message-
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Friday, November 27, 2015 6:41 PM
> To: dev@flink.apache.org
> Subject: Re: The null in Flink
>
> Oh, this is probably the Jira for what I mentioned:
> https://issues.apache.org/jira/browse/FLINK-2988
>
> > On 27 Nov 2015, at 11:02, Aljoscha Krettek <aljos...@apache.org> wrote:
> >
> > Hi,
> > just some information. The Table API code generator already has
> preliminary support for generating code that is NULL-aware. So for 
> example if you have expressions like 1 + NULL the result would also be null.
> >
> > I think one of the missing pieces is a way to get data that contains
> null values into the system. For example, right now the expected way 
> to read csv files is via tuples and they don’t support null values. I 
> think we need a way to directly read CSV files into a Row DataSet (or Table).
> >
> > Cheers,
> > Aljoscha
> >> On 26 Nov 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote:
> >>
> >> Hi!
> >>
> >> Thanks for the good discussion! Here are some thoughts from my side:
> >>
> >> 1)
> >> I would agree with Chengxiang that it helps to have as much NULL 
> >> handling in the table API as possible, since most SQL constructs 
> >> will be permitted there are well.
> >>
> >> 2)
> >> A question that I have is whether we want to actually follow the 
> >> SQL standard exactly. There is a lot of criticism on NULL in the 
> >> SQL standard, and there have been many good proposals for more 
> >> meaningful semantics (for example differentiate between the 
> >> meanings "value missing", "value unknown", "value not applicable", etc).
> >>
> >> Going with the SQL way is easiest and makes SQL addition on top of 
> >> the table API much easier. Also, there is only one type of NULL, 
> >> meaning that null-values can be encoded efficiently in bitmaps.
> >> Further more, the fact that the Table API users have the power of a 
> >> programming language at hand (rather than the limited set of SQL 
> >> operators), they should be able to easily define their own 
> >> constants for special meanings like "value not applicable" or so.
> >>
> >> Just curious if anyone has experience with some of the other 
> >> null-sematic proposals that have been around.
> >>
> >> 3)
> >> One comment concerning the three-value-logic for boolean expressions:
> >>
> >> A while back, I worked on a SQL engine, and we were able to not 
> >> implement three-value logic with trick. If I recall correctly, it 
> >> was
> like this:
> >>
> >> If the boolean expressions are monotonous (have no NOT), then the 
> >> UNKNOWN value can be the same as FALSE. So the query planner had to 
> >> rewrite all expression trees to have no NOT, which means pushing 
> >> the NOT

RE: The null in Flink

2015-11-30 Thread Li, Chengxiang
Stephen,
For the 3rd topic, you mentioned that "If the boolean expressions are 
monotonous (have no NOT), then the UNKNOWN value can be the same as FALSE ", as 
UNKNOWN means it could be TRUE as well, does it a proper way to handle it just 
as FALSE?

Aljoscha, 
I agree with you, Table can only be transformed from Tuple/Case Class DataSet 
now, and Tuple/Case Class does not allow null field value, so read files from 
data source to Row DataSet is necessary for NULL value handling. 

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Friday, November 27, 2015 6:41 PM
To: dev@flink.apache.org
Subject: Re: The null in Flink

Oh, this is probably the Jira for what I mentioned: 
https://issues.apache.org/jira/browse/FLINK-2988

> On 27 Nov 2015, at 11:02, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> just some information. The Table API code generator already has preliminary 
> support for generating code that is NULL-aware. So for example if you have 
> expressions like 1 + NULL the result would also be null.
> 
> I think one of the missing pieces is a way to get data that contains null 
> values into the system. For example, right now the expected way to read csv 
> files is via tuples and they don’t support null values. I think we need a way 
> to directly read CSV files into a Row DataSet (or Table).
> 
> Cheers,
> Aljoscha
>> On 26 Nov 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote:
>> 
>> Hi!
>> 
>> Thanks for the good discussion! Here are some thoughts from my side:
>> 
>> 1)
>> I would agree with Chengxiang that it helps to have as much NULL 
>> handling in the table API as possible, since most SQL constructs will 
>> be permitted there are well.
>> 
>> 2)
>> A question that I have is whether we want to actually follow the SQL 
>> standard exactly. There is a lot of criticism on NULL in the SQL 
>> standard, and there have been many good proposals for more meaningful 
>> semantics (for example differentiate between the meanings "value 
>> missing", "value unknown", "value not applicable", etc).
>> 
>> Going with the SQL way is easiest and makes SQL addition on top of 
>> the table API much easier. Also, there is only one type of NULL, 
>> meaning that null-values can be encoded efficiently in bitmaps. 
>> Further more, the fact that the Table API users have the power of a 
>> programming language at hand (rather than the limited set of SQL 
>> operators), they should be able to easily define their own constants 
>> for special meanings like "value not applicable" or so.
>> 
>> Just curious if anyone has experience with some of the other 
>> null-sematic proposals that have been around.
>> 
>> 3)
>> One comment concerning the three-value-logic for boolean expressions:
>> 
>> A while back, I worked on a SQL engine, and we were able to not 
>> implement three-value logic with trick. If I recall correctly, it was like 
>> this:
>> 
>> If the boolean expressions are monotonous (have no NOT), then the 
>> UNKNOWN value can be the same as FALSE. So the query planner had to 
>> rewrite all expression trees to have no NOT, which means pushing the 
>> NOT down into the leaf comparison operations (for example push NOT into == 
>> to become !=).
>> These leaf comparison operators needed to be NULL aware to return 
>> FALSE on comparisons with NULL.
>> 
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Thu, Nov 26, 2015 at 6:41 AM, Li, Chengxiang 
>> <chengxiang...@intel.com>
>> wrote:
>> 
>>> Thanks, Timo.
>>> We may put the NULL related function support to SQL API, but for 
>>> Scalar expression and Boolean expression, it already been supported 
>>> in Table API, without NULL value handling support, query with Scalar 
>>> expression and Boolean expression would fail while encounter NULL value.
>>> 
>>> Thanks
>>> Chengxiang
>>> 
>>> -Original Message-
>>> From: Timo Walther [mailto:twal...@apache.org]
>>> Sent: Wednesday, November 25, 2015 7:33 PM
>>> To: dev@flink.apache.org
>>> Subject: Re: The null in Flink
>>> 
>>> Hi Chengxiang,
>>> 
>>> I totally agree that the Table API should fully support NULL values. 
>>> The Table API is a logical API and therefore we should be as close 
>>> to ANSI SQL as possible. Rows need to be nullable in the near future.
>>> 
>>> 2. i, ii, iii and iv sound reasonable. But v, vi and vii sound to 
&g

RE: The null in Flink

2015-11-25 Thread Li, Chengxiang
Hi
In this mail list, there are some discussions about null value handling in 
Flink, and I saw several related JIRAs as well(like FLINK-2203, FLINK-2210), 
but unfortunately, got reverted due to immature design, and no further action 
since then. I would like to pick this topic up here, as it's quite an important 
part of data analysis and many features depend on it. Hopefully, through a 
plenary discussion, we can generate an acceptable solution and move forward. 
Stephan has explained very clearly about how and why Flink handle "Null values 
in the Programming Language APIs", so I mainly talk about the second part of 
"Null values in the high-level (logical) APIs ".

1. Why should Flink support Null values handling in Table API?
i.  Data source may miss column value in many cases, if no Null values 
handling in Table API, user need to write an extra ETL to handle missing values 
manually.
ii. Some Table API operators generate Null values on their own, like 
Outer Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table 
API is the prerequisite of these features.

2. The semantic of Null value handling in Table API.
Fortunately, there are already mature DBMS  standards we can follow for Null 
value handling, I list several semantic of Null value handling here. To be 
noted that, this may not cover all the cases, and the semantics may vary in 
different DBMSs, so it should totally open to discuss.
I,  NULL compare. In ascending order, NULL is smaller than any other 
value, and NULL == NULL return false. 
ii. NULL exists in GroupBy Key, all NULL values are grouped as a single 
group.
iii. NULL exists in Aggregate columns, ignore NULL in aggregation 
function.
iv. NULL exists in both side Join key, refer to #i, NULL == 
NULL return false, no output for NULL Join key.
v.  NULL in Scalar expression, expression within NULL(eg. 1 + 
NULL) return NULL. 
vi. NULL in Boolean expression, add an extra result: UNKNOWN, 
more semantic for Boolean expression in reference #1.
vii. More related function support, like COALESCE, NVL, NANVL, 
and so on.

3. NULL value storage in Table API.
  Just set null to Row field value. To mark NULL value in serialized binary 
record data, normally it use extra flag for each field to mark whether its 
value is NULL, which would change the data layout of Row object. So any logic 
that access serialized Row data directly should updated to sync with new data 
layout, for example, many methods in RowComparator.

Reference:
1. Nulls: Nothing to worry about: 
http://www.oracle.com/technetwork/issue-archive/2005/05-jul/o45sql-097727.html.
2. Null related functions: 
https://oracle-base.com/articles/misc/null-related-functions

-Original Message-
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Thursday, June 18, 2015 8:43 AM
To: dev@flink.apache.org
Subject: Re: The null in Flink

Hi!

I think we actually have two discussions here, both of them important:

--
1) Null values in the Programming Language APIs
--

Fields in composite types may simply be null pointers.

In object types:
  - primitives members are naturally non-nullable
  - all other members are nullable

=> If you want to avoid the overhead of nullability, go with primitive types.

In Tuples, and derives types (Scala case classes):
  - Fields are non-nullable.

=> The reason here is that we initially decided to keep tuples as a very fast 
data type. Because tuples cannot hold primitives in Java/Scala, we would not 
have a way to make fast non-nullable fields. The performance of nullable fields 
affects the key-operations, especially on normalized keys.
We can work around that with some effort, but have not one it so far.

=> In Scala, the Option types is a natural way of elegantly working around that.


--
2) Null values in the high-level (logial) APIs
--

This is mainly what Ted was referring to, if I understood him correctly.

Here, we need to figure out what form of semantical null values in the Table 
API and later, in SQL.

Besides deciding what semantics to follow here in the logical APIs, we need to 
decide what these values confert to/from when switching between 
logical/physical APIs.






On Mon, Jun 15, 2015 at 10:07 AM, Ted Dunning  wrote:

> On Mon, Jun 15, 2015 at 8:45 AM, Maximilian Michels 
> wrote:
>
> > Just to give an idea what null values could cause in Flink:
> DataSet.count()
> > returns the number of elements of all values in a Dataset (null or 
> > not) while #834 would ignore null values and aggregate the DataSet 
> > without
> them.
> >
>
> Compare R's 

RE: The null in Flink

2015-11-25 Thread Li, Chengxiang
Thanks, Timo. 
We may put the NULL related function support to SQL API, but for Scalar 
expression and Boolean expression, it already been supported in Table API, 
without NULL value handling support, query with Scalar expression and Boolean 
expression would fail while encounter NULL value.

Thanks
Chengxiang 

-Original Message-
From: Timo Walther [mailto:twal...@apache.org] 
Sent: Wednesday, November 25, 2015 7:33 PM
To: dev@flink.apache.org
Subject: Re: The null in Flink

Hi Chengxiang,

I totally agree that the Table API should fully support NULL values. The Table 
API is a logical API and therefore we should be as close to ANSI SQL as 
possible. Rows need to be nullable in the near future.

2. i, ii, iii and iv sound reasonable. But v, vi and vii sound to much like SQL 
magic. I think all other SQL magic (DBMS specific corner cases) should be 
handled by the SQL API on top of the Table API.

Regards,
Timo


On 25.11.2015 11:31, Li, Chengxiang wrote:
> Hi
> In this mail list, there are some discussions about null value handling in 
> Flink, and I saw several related JIRAs as well(like FLINK-2203, FLINK-2210), 
> but unfortunately, got reverted due to immature design, and no further action 
> since then. I would like to pick this topic up here, as it's quite an 
> important part of data analysis and many features depend on it. Hopefully, 
> through a plenary discussion, we can generate an acceptable solution and move 
> forward. Stephan has explained very clearly about how and why Flink handle 
> "Null values in the Programming Language APIs", so I mainly talk about the 
> second part of "Null values in the high-level (logical) APIs ".
>
> 1. Why should Flink support Null values handling in Table API?
>   i.  Data source may miss column value in many cases, if no Null values 
> handling in Table API, user need to write an extra ETL to handle missing 
> values manually.
>   ii. Some Table API operators generate Null values on their own, like 
> Outer Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table 
> API is the prerequisite of these features.
>
> 2. The semantic of Null value handling in Table API.
> Fortunately, there are already mature DBMS  standards we can follow for Null 
> value handling, I list several semantic of Null value handling here. To be 
> noted that, this may not cover all the cases, and the semantics may vary in 
> different DBMSs, so it should totally open to discuss.
>   I,  NULL compare. In ascending order, NULL is smaller than any other 
> value, and NULL == NULL return false.
>   ii. NULL exists in GroupBy Key, all NULL values are grouped as a single 
> group.
>   iii. NULL exists in Aggregate columns, ignore NULL in aggregation 
> function.
>  iv. NULL exists in both side Join key, refer to #i, NULL == 
> NULL return false, no output for NULL Join key.
>  v.  NULL in Scalar expression, expression within NULL(eg. 1 
> + NULL) return NULL.
>  vi. NULL in Boolean expression, add an extra result: 
> UNKNOWN, more semantic for Boolean expression in reference #1.
>  vii. More related function support, like COALESCE, NVL, 
> NANVL, and so on.
>
> 3. NULL value storage in Table API.
>Just set null to Row field value. To mark NULL value in serialized binary 
> record data, normally it use extra flag for each field to mark whether its 
> value is NULL, which would change the data layout of Row object. So any logic 
> that access serialized Row data directly should updated to sync with new data 
> layout, for example, many methods in RowComparator.
>
> Reference:
> 1. Nulls: Nothing to worry about: 
> http://www.oracle.com/technetwork/issue-archive/2005/05-jul/o45sql-097727.html.
> 2. Null related functions: 
> https://oracle-base.com/articles/misc/null-related-functions
>
> -Original Message-
> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf 
> Of Stephan Ewen
> Sent: Thursday, June 18, 2015 8:43 AM
> To: dev@flink.apache.org
> Subject: Re: The null in Flink
>
> Hi!
>
> I think we actually have two discussions here, both of them important:
>
> --
> 1) Null values in the Programming Language APIs
> --
>
> Fields in composite types may simply be null pointers.
>
> In object types:
>- primitives members are naturally non-nullable
>- all other members are nullable
>
> => If you want to avoid the overhead of nullability, go with primitive types.
>
> In Tuples, and derives types (Scala case classes):
>- Fields are non-nullable.
>
> => The reason here 

RE: A proposal about skew data handling in Flink

2015-10-19 Thread Li, Chengxiang
Thanks a lot for the comments, Fabian. I agree with you on the plan mostly, 
just add some more thoughts about  Non-Range-Equally-Splittable case here.
1. Let's assume a case which 10% data is skewed on certain key, in this case, 
as long as the parallelism is larger than 10, it would fit into 
Non-Range-Equally-Splittable case. So it should not be very corner case of skew 
issue.
2. In proposal, the solution of Non-Range-Equally-Splittable case is based on 2 
new RangePartitioner and little optimizer logic, which has been touched already 
in the plan #1, #2. It does not require to change anything about the operator 
semantics, so if we have a good partitioner abstraction, I think it does not 
add much complexity for Flink to handle this kind of issue. 
It should not block anything, after finished the simple case, we would have 
more knowledge about the implementation details, then we can look back at this 
issue, and decide whether it's deserved to be resolved at the cost.

Thanks
Chengxiang 
-Original Message-
From: Fabian Hueske [mailto:fhue...@gmail.com] 
Sent: Monday, October 19, 2015 7:15 PM
To: dev@flink.apache.org
Subject: Re: A proposal about skew data handling in Flink

Hi,

First of all, thanks a lot for this extensive proposal! It contains a lot of 
good observations and techniques how to address data skew.

I have a few remarks:

1) The terms Input and Output Contract were introduced in the first scientific 
publications and are not used anymore. Input Contract are what we call 
operators or transformations today, the concept of output contract is 
completely gone.
In the current code, we have operators like Map, Reduce, and Join that describe 
how data needs to be organized (by key, etc.) and UDFs that process the data.

2) I would categorize skew as follows:

- UDF Call Complexity Skew: The input cardinalities of UDF calls differ (only 
applicable to group-based operators such as GroupReduce and CoGroup) or the 
computational complexity of UDF calls depends on the data and varies a lot. UDF 
calls are the smallest parallelizable unit. It is not possible to change that 
without changing the semantics. Combiners can help to reduce the effect of skew 
for group-based operators.

- Input Partition Skew: The cardinality of parallel partitions varies. This is 
handled by Flink as follows:
- Lazy split assignment for data sources
- Operators that do not require special partitioning (Map, Filter, Cross, 
etc.) just consume the output partitions of the preceding operator.
Rebalance() can be used to enforce round-robin partitioning to equalize size of 
all partitions.
- Operators that require key-based partitioning use hash partitioning.
Range partitioning can help address significant data skew.

- UDF Call Skew: The number of UDF calls per parallel partition varies.
This can be an issue for n-m joins which essentially result in Cartesian 
products.
- UDF Call Skew is most relevant for Joins
- UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by 
controlling Input Partition Skew

3) I agree that we should not try to detect and automatically fix data skew (at 
the moment) but give users tools to manually manage skew.

4) I would focus on addressing the Input Partition Skew problem. UDF Call 
Complexity Skew cannot be addressed because it would change the semantics of 
operators. UDF Call Skew is only affecting joins and much harder to solve.

5) I wonder how much the practical gain is to address the 
Non-Range-Equally-Splittable case compared to the added code complexity. In 
general, tackling skew is a very good idea, but solving corner cases with quite 
complex methods might make future features more complicated to add.
Hence, I would propose to focus on the common and "easy" cases first.

I would address Input Partition Skew first and ignore the 
Non-Range-Equally-Splittable case for now. We can do this in two steps:

1) Add the "simple" range partitioner as in your pull request for unary 
operators (explicit range partitioning, total order, groupBy). Once the 
sampling happens online, this is a very good addition to Flink.
2) Add the "simple" range partitioner also for binary operators (join, 
coGroup). This will be a bit more tricky, because we need to do a coordinated 
decision for both inputs.
3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe 
through optimizer hints.

Since we want to have this transparently handled by the API and engine, we need 
to add a lot of these features into the optimizer, or JobGraphGenerator to be 
more precisely.

Does that make sense to you?

Cheers, Fabian

2015-10-16 17:13 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> Hi,
>
> thanks for starting a discussion about data skew! I agree, it's a 
> important issue that can cause a lot of problems.
> I'll have a look at your proposal and add comments soon.
>
> Thanks, Fabian
>
> 201

RE: [Proposal] Create a separate sub module for benchmark test

2015-09-22 Thread Li, Chengxiang
Thanks for the correctness, Stephan.
For the micro benchmark, I think it should be better to make a new maven 
project inside Flink. 

Thanks
Chengxiang 

-Original Message-
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Tuesday, September 22, 2015 8:14 PM
To: dev@flink.apache.org
Subject: Re: [Proposal] Create a separate sub module for benchmark test

Sounds like a nice idea!

Do you want to make this a new maven project as part of the Flink repository, 
or create a dedicated repository for that?

BTW: We are currently not mixing microbenchmarks with test execution. The code 
for these benchmarks resides in the test scope of the projects (so it is not 
packaged), but it is not executed as part of the UnitTests or IntegrationTests.

Greetings,
Stephan


On Tue, Sep 22, 2015 at 12:22 PM, Li, Chengxiang <chengxiang...@intel.com>
wrote:

> Hi, folks
> During work on Flink, I found several micro benchmarks which come from 
> different modules, these benchmarks measure on manual, annotated with 
> Junit annotations, so they got executed during unit test as well. 
> There are some shortage on current implementation:
>
> 1.   Benchmark test performance instead of feature, and normally, it
> takes much more time than unit test. Mixed benchmark with unit test 
> would expand the CI check time.
>
> 2.   With mixed with other tests, no warm up, no standalone process...
> these benchmarks result may not very accurate.
> Although looks easy, there are actually many pitfalls about benchmark, 
> so I suggest we create a new sub module for all benchmark test, and 
> import JMH(
> http://openjdk.java.net/projects/code-tools/jmh/) as the benchmark 
> framework. With the help of JMH, we should get:
>
> 1.   More available metrics.
>
> 2.   More accurate result.
>
> 3.   Focus on benchmark logic only, no need to worry about measure
> logic.
>
> Thanks
> Chengxiang
>


[Proposal] Create a separate sub module for benchmark test

2015-09-22 Thread Li, Chengxiang
Hi, folks
During work on Flink, I found several micro benchmarks which come from 
different modules, these benchmarks measure on manual, annotated with Junit 
annotations, so they got executed during unit test as well. There are some 
shortage on current implementation:

1.   Benchmark test performance instead of feature, and normally, it takes 
much more time than unit test. Mixed benchmark with unit test would expand the 
CI check time.

2.   With mixed with other tests, no warm up, no standalone process... 
these benchmarks result may not very accurate.
Although looks easy, there are actually many pitfalls about benchmark, so I 
suggest we create a new sub module for all benchmark test, and import 
JMH(http://openjdk.java.net/projects/code-tools/jmh/) as the benchmark 
framework. With the help of JMH, we should get:

1.   More available metrics.

2.   More accurate result.

3.   Focus on benchmark logic only, no need to worry about measure logic.

Thanks
Chengxiang


Use bloom filter to improve hybrid hash join performance

2015-06-18 Thread Li, Chengxiang
Hi, flink developers

I read the flink hybrid hash join documents and implementation, very nice job. 
For the case of small table does not all fit into memory, I think we may able 
to improve the performance better.  Currently in hybrid hash join, while small 
table does not fit into memory, part of the small table data would be spilled 
to disk, and the counterpart partition of big table data would be spilled to 
disk in probe phase as well. You can find detail description here: 
http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
 . If we build a bloom filter while spill small table to disk during build 
phase, and use it to filter the big table records which tend to be spilled to 
disk, this may greatly reduce the spilled big table file size, and saved the 
disk IO cost for writing and further reading. I have created 
FLINK-2240https://issues.apache.org/jira/browse/FLINK-2240 about it,  I would 
like to contribute on this optimization if someone can assign the JIRA to me. 
But before that, I would like to hear your comments about this.

Thanks
Chengxiang