RE: [ANNOUNCE] Chengxiang Li added as committer
Thanks everyone, it's always great to collaborate with you guys, look forward to contribute more on Flink. Thanks Chengxiang -Original Message- From: Paris Carbone [mailto:par...@kth.se] Sent: Tuesday, January 19, 2016 9:24 PM To: dev@flink.apache.org Subject: Re: [ANNOUNCE] Chengxiang Li added as committer Congrats Chengxiang! Really pleased to have you on board > On 19 Jan 2016, at 13:16, Matthias J. Saxwrote: > > Congrats and welcome Chengxiang!! :) > > On 01/19/2016 12:56 PM, Kostas Tzoumas wrote: >> Welcome Chengxiang!! >> >> On Tue, Jan 19, 2016 at 12:31 PM, Stephan Ewen wrote: >> >>> Good to have you on board! >>> >>> On Tue, Jan 19, 2016 at 11:29 AM, Maximilian Michels >>> wrote: >>> Pleased to have you with us Chengxiang! Cheers, Max On Tue, Jan 19, 2016 at 11:13 AM, Chiwan Park wrote: > Congrats! Welcome Chengxiang Li! > >> On Jan 19, 2016, at 7:13 PM, Vasiliki Kalavri < vasilikikala...@gmail.com> wrote: >> >> Congratulations! Welcome Chengxiang Li! >> >> On 19 January 2016 at 11:02, Fabian Hueske wrote: >> >>> Hi everybody, >>> >>> I'd like to announce that Chengxiang Li accepted the PMC's offer to become >>> a committer of the Apache Flink project. >>> >>> Please join me in welcoming Chengxiang Li! >>> >>> Best, Fabian >>> > > Regards, > Chiwan Park > >>> >> >
RE: [DISCUSS] Git force pushing and deletion of branchs
+1 on the original style. Master branch disable force pushing in case of misusing and feature branch enable force pushing for flexible developing. -Original Message- From: Gyula Fóra [mailto:gyf...@apache.org] Sent: Wednesday, January 13, 2016 6:36 PM To: dev@flink.apache.org Subject: Re: [DISCUSS] Git force pushing and deletion of branchs +1 for protecting the master branch. I also don't see any reason why anyone should force push there Gyula Fabian Hueskeezt írta (időpont: 2016. jan. 13., Sze, 11:07): > Hi everybody, > > Lately, ASF Infra has changed the write permissions of all Git > repositories twice. > > Originally, it was not possible to force into the master branch. > A few weeks ago, infra disabled also force pushing into other branches. > > Now, this has changed again after the issue was discussed with the ASF > board. > The current situation is the following: > - force pushing is allowed on all branched, including master > - branches and tags can be deleted (not sure if this applies as well > for the master branch) > - "the 'protected' portions of git to primarily focus on refs/tags/rel > - thus any tags under rel, will have their entire commit history." > > I am not 100% sure which exact parts of the repository are protected > now as I am not very much into the details of Git. > However, I believe we need to create new tags under rel for our > previous releases to protect them. > > In addition, I would like to propose to ask Infra to add protection > for the master branch. I can only recall very few situations where > changes had to be reverted. I am much more in favor of a reverting > commit now and then compared to a branch that can be arbitrarily changed. > > What do you think about this? > > Best, Fabian >
RE: Effort to add SQL / StreamSQL to Flink
Very cool work, look forward to contribute. -Original Message- From: Chiwan Park [mailto:chiwanp...@apache.org] Sent: Friday, January 8, 2016 9:36 AM To: dev@flink.apache.org Subject: Re: Effort to add SQL / StreamSQL to Flink Really good! Many people want to use SQL. :) > On Jan 8, 2016, at 2:36 AM, Kostas Tzoumaswrote: > > Wow! Thanks Fabian, this looks fantastic! > > On Thu, Jan 7, 2016 at 4:35 PM, Stephan Ewen wrote: > >> Super, thanks for that detailed effort, Fabian! >> >> On Thu, Jan 7, 2016 at 3:40 PM, Matthias J. Sax wrote: >> >>> Pretty cool! >>> >>> On 01/07/2016 03:05 PM, Fabian Hueske wrote: Hi everybody, in the last days, Timo and I refined the design document for adding a >>> SQL / StreamSQL interface on top of Flink that was started by Stephan. The document proposes an architecture that is centered around Apache Calcite. Calcite is an Apache top-level project and includes a SQL >>> parser, a semantic validator for relational queries, and a rule- and cost-based relational optimizer. Calcite is used by Apache Hive and Apache Drill (among other projects). In a nutshell, the plan is to translate Table >> API and SQL queries into Calcite's relational expression trees, optimize >>> these trees, and translate them into DataSet and DataStream programs.The >>> document breaks down the work into several tasks and subtasks. Please review the design document and comment. -- > >>> >> https://docs.google.com/document/d/1TLayJNOTBle_-m1rQfgA6Ouj1oYsfqRjP >> cp1h2TVqdI/edit?usp=sharing Unless there are major concerns with the design, Timo and I want to >> start next week to move the current Table API on top of Apache Calcite (Task >> 1 >>> in the document). The goal of this task is to have the same functionality >> as currently, but with Calcite in the translation process. This is a >>> blocking task that we hope to complete soon. Afterwards, we can independently >> work on different aspects such as extending the Table API, adding a SQL interface (basically just a parser), integration with external data sources, better code generation, optimization rules, streaming support >>> for the Table API, StreamSQL, etc.. Timo and I plan to work on a WIP branch to implement Task 1 and merge >> it >>> to the master branch once the task is completed. Of course, everybody is welcome to contribute to this effort. Please let us know such that we >> can coordinate our efforts. Thanks, Fabian >>> >>> >> Regards, Chiwan Park
RE: The null in Flink
Hi, Summary of our discussion about NULL value handling in FLink: 1. Read from data source to Table/Row DataSet directly is necessary for NULL value handling. 2. NULL value representation in Row object, this would change its binary data layout, so we would need new Row Serializer/Comparator(and its dependency) which aware of this new binary data layout. Tuple and Case Class serializer/Comparator should remain the same. 3. NULL value handling in operations. We would follow the SQL standard as default, but these are not concluded yet, any more input would be welcomed. I've created an umbrella JIRA(https://issues.apache.org/jira/browse/FLINK-3139) for this, the following subtasks based on the previous 3 aspects would be created as well, so anyone interested could contribute and comment on all subtasks. And we could also move discussion on specified issues to JIRA system. Thanks Chengxiang -Original Message- From: Li, Chengxiang [mailto:chengxiang...@intel.com] Sent: Thursday, December 3, 2015 4:43 PM To: dev@flink.apache.org Subject: RE: The null in Flink Hi, Stephan Treat UNKOWN as FALSE may works if the Boolean expression is used in filter operation, but for other operations, such as select and groupBy, it does not make sense anymore, we should need UNKNOWN value(or unified as NULL) to distinguish with TRUE/FALSE . Thanks Chengxiang -Original Message- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Wednesday, December 2, 2015 6:27 PM To: dev@flink.apache.org Subject: Re: The null in Flink Hi Chenliang! I have to dig into this again, it was a while back. I think (vaguely) the reason why this worked was that in the end (at the root of a tree that is a logical expression) if the result is UNKNOWN, it is treated like FALSE. For example a predicate like "WHERE t.a > 10 && t.b == 'pending' ". If one boolean atom is UNKNOWN, the other is TRUE, the whole term becomes UNKNOWN and the row is filtered out (as if the predicate was false) - the result of the query contains no rows where predicate results are UNKNOWN. Stephan On Tue, Dec 1, 2015 at 4:09 AM, Li, Chengxiang <chengxiang...@intel.com> wrote: > Stephen, > For the 3rd topic, you mentioned that "If the boolean expressions are > monotonous (have no NOT), then the UNKNOWN value can be the same as > FALSE ", as UNKNOWN means it could be TRUE as well, does it a proper > way to handle it just as FALSE? > > Aljoscha, > I agree with you, Table can only be transformed from Tuple/Case Class > DataSet now, and Tuple/Case Class does not allow null field value, so > read files from data source to Row DataSet is necessary for NULL value > handling. > > -Original Message- > From: Aljoscha Krettek [mailto:aljos...@apache.org] > Sent: Friday, November 27, 2015 6:41 PM > To: dev@flink.apache.org > Subject: Re: The null in Flink > > Oh, this is probably the Jira for what I mentioned: > https://issues.apache.org/jira/browse/FLINK-2988 > > > On 27 Nov 2015, at 11:02, Aljoscha Krettek <aljos...@apache.org> wrote: > > > > Hi, > > just some information. The Table API code generator already has > preliminary support for generating code that is NULL-aware. So for > example if you have expressions like 1 + NULL the result would also be null. > > > > I think one of the missing pieces is a way to get data that contains > null values into the system. For example, right now the expected way > to read csv files is via tuples and they don’t support null values. I > think we need a way to directly read CSV files into a Row DataSet (or Table). > > > > Cheers, > > Aljoscha > >> On 26 Nov 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote: > >> > >> Hi! > >> > >> Thanks for the good discussion! Here are some thoughts from my side: > >> > >> 1) > >> I would agree with Chengxiang that it helps to have as much NULL > >> handling in the table API as possible, since most SQL constructs > >> will be permitted there are well. > >> > >> 2) > >> A question that I have is whether we want to actually follow the > >> SQL standard exactly. There is a lot of criticism on NULL in the > >> SQL standard, and there have been many good proposals for more > >> meaningful semantics (for example differentiate between the > >> meanings "value missing", "value unknown", "value not applicable", etc). > >> > >> Going with the SQL way is easiest and makes SQL addition on top of > >> the table API much easier. Also, there is only one type of NULL, > >> meaning that null-values can be encoded efficiently in
RE: The null in Flink
Hi, Stephan Treat UNKOWN as FALSE may works if the Boolean expression is used in filter operation, but for other operations, such as select and groupBy, it does not make sense anymore, we should need UNKNOWN value(or unified as NULL) to distinguish with TRUE/FALSE . Thanks Chengxiang -Original Message- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Wednesday, December 2, 2015 6:27 PM To: dev@flink.apache.org Subject: Re: The null in Flink Hi Chenliang! I have to dig into this again, it was a while back. I think (vaguely) the reason why this worked was that in the end (at the root of a tree that is a logical expression) if the result is UNKNOWN, it is treated like FALSE. For example a predicate like "WHERE t.a > 10 && t.b == 'pending' ". If one boolean atom is UNKNOWN, the other is TRUE, the whole term becomes UNKNOWN and the row is filtered out (as if the predicate was false) - the result of the query contains no rows where predicate results are UNKNOWN. Stephan On Tue, Dec 1, 2015 at 4:09 AM, Li, Chengxiang <chengxiang...@intel.com> wrote: > Stephen, > For the 3rd topic, you mentioned that "If the boolean expressions are > monotonous (have no NOT), then the UNKNOWN value can be the same as > FALSE ", as UNKNOWN means it could be TRUE as well, does it a proper > way to handle it just as FALSE? > > Aljoscha, > I agree with you, Table can only be transformed from Tuple/Case Class > DataSet now, and Tuple/Case Class does not allow null field value, so > read files from data source to Row DataSet is necessary for NULL value > handling. > > -Original Message- > From: Aljoscha Krettek [mailto:aljos...@apache.org] > Sent: Friday, November 27, 2015 6:41 PM > To: dev@flink.apache.org > Subject: Re: The null in Flink > > Oh, this is probably the Jira for what I mentioned: > https://issues.apache.org/jira/browse/FLINK-2988 > > > On 27 Nov 2015, at 11:02, Aljoscha Krettek <aljos...@apache.org> wrote: > > > > Hi, > > just some information. The Table API code generator already has > preliminary support for generating code that is NULL-aware. So for > example if you have expressions like 1 + NULL the result would also be null. > > > > I think one of the missing pieces is a way to get data that contains > null values into the system. For example, right now the expected way > to read csv files is via tuples and they don’t support null values. I > think we need a way to directly read CSV files into a Row DataSet (or Table). > > > > Cheers, > > Aljoscha > >> On 26 Nov 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote: > >> > >> Hi! > >> > >> Thanks for the good discussion! Here are some thoughts from my side: > >> > >> 1) > >> I would agree with Chengxiang that it helps to have as much NULL > >> handling in the table API as possible, since most SQL constructs > >> will be permitted there are well. > >> > >> 2) > >> A question that I have is whether we want to actually follow the > >> SQL standard exactly. There is a lot of criticism on NULL in the > >> SQL standard, and there have been many good proposals for more > >> meaningful semantics (for example differentiate between the > >> meanings "value missing", "value unknown", "value not applicable", etc). > >> > >> Going with the SQL way is easiest and makes SQL addition on top of > >> the table API much easier. Also, there is only one type of NULL, > >> meaning that null-values can be encoded efficiently in bitmaps. > >> Further more, the fact that the Table API users have the power of a > >> programming language at hand (rather than the limited set of SQL > >> operators), they should be able to easily define their own > >> constants for special meanings like "value not applicable" or so. > >> > >> Just curious if anyone has experience with some of the other > >> null-sematic proposals that have been around. > >> > >> 3) > >> One comment concerning the three-value-logic for boolean expressions: > >> > >> A while back, I worked on a SQL engine, and we were able to not > >> implement three-value logic with trick. If I recall correctly, it > >> was > like this: > >> > >> If the boolean expressions are monotonous (have no NOT), then the > >> UNKNOWN value can be the same as FALSE. So the query planner had to > >> rewrite all expression trees to have no NOT, which means pushing > >> the NOT
RE: The null in Flink
Stephen, For the 3rd topic, you mentioned that "If the boolean expressions are monotonous (have no NOT), then the UNKNOWN value can be the same as FALSE ", as UNKNOWN means it could be TRUE as well, does it a proper way to handle it just as FALSE? Aljoscha, I agree with you, Table can only be transformed from Tuple/Case Class DataSet now, and Tuple/Case Class does not allow null field value, so read files from data source to Row DataSet is necessary for NULL value handling. -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Friday, November 27, 2015 6:41 PM To: dev@flink.apache.org Subject: Re: The null in Flink Oh, this is probably the Jira for what I mentioned: https://issues.apache.org/jira/browse/FLINK-2988 > On 27 Nov 2015, at 11:02, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > just some information. The Table API code generator already has preliminary > support for generating code that is NULL-aware. So for example if you have > expressions like 1 + NULL the result would also be null. > > I think one of the missing pieces is a way to get data that contains null > values into the system. For example, right now the expected way to read csv > files is via tuples and they don’t support null values. I think we need a way > to directly read CSV files into a Row DataSet (or Table). > > Cheers, > Aljoscha >> On 26 Nov 2015, at 12:31, Stephan Ewen <se...@apache.org> wrote: >> >> Hi! >> >> Thanks for the good discussion! Here are some thoughts from my side: >> >> 1) >> I would agree with Chengxiang that it helps to have as much NULL >> handling in the table API as possible, since most SQL constructs will >> be permitted there are well. >> >> 2) >> A question that I have is whether we want to actually follow the SQL >> standard exactly. There is a lot of criticism on NULL in the SQL >> standard, and there have been many good proposals for more meaningful >> semantics (for example differentiate between the meanings "value >> missing", "value unknown", "value not applicable", etc). >> >> Going with the SQL way is easiest and makes SQL addition on top of >> the table API much easier. Also, there is only one type of NULL, >> meaning that null-values can be encoded efficiently in bitmaps. >> Further more, the fact that the Table API users have the power of a >> programming language at hand (rather than the limited set of SQL >> operators), they should be able to easily define their own constants >> for special meanings like "value not applicable" or so. >> >> Just curious if anyone has experience with some of the other >> null-sematic proposals that have been around. >> >> 3) >> One comment concerning the three-value-logic for boolean expressions: >> >> A while back, I worked on a SQL engine, and we were able to not >> implement three-value logic with trick. If I recall correctly, it was like >> this: >> >> If the boolean expressions are monotonous (have no NOT), then the >> UNKNOWN value can be the same as FALSE. So the query planner had to >> rewrite all expression trees to have no NOT, which means pushing the >> NOT down into the leaf comparison operations (for example push NOT into == >> to become !=). >> These leaf comparison operators needed to be NULL aware to return >> FALSE on comparisons with NULL. >> >> >> Greetings, >> Stephan >> >> >> On Thu, Nov 26, 2015 at 6:41 AM, Li, Chengxiang >> <chengxiang...@intel.com> >> wrote: >> >>> Thanks, Timo. >>> We may put the NULL related function support to SQL API, but for >>> Scalar expression and Boolean expression, it already been supported >>> in Table API, without NULL value handling support, query with Scalar >>> expression and Boolean expression would fail while encounter NULL value. >>> >>> Thanks >>> Chengxiang >>> >>> -Original Message- >>> From: Timo Walther [mailto:twal...@apache.org] >>> Sent: Wednesday, November 25, 2015 7:33 PM >>> To: dev@flink.apache.org >>> Subject: Re: The null in Flink >>> >>> Hi Chengxiang, >>> >>> I totally agree that the Table API should fully support NULL values. >>> The Table API is a logical API and therefore we should be as close >>> to ANSI SQL as possible. Rows need to be nullable in the near future. >>> >>> 2. i, ii, iii and iv sound reasonable. But v, vi and vii sound to &g
RE: The null in Flink
Hi In this mail list, there are some discussions about null value handling in Flink, and I saw several related JIRAs as well(like FLINK-2203, FLINK-2210), but unfortunately, got reverted due to immature design, and no further action since then. I would like to pick this topic up here, as it's quite an important part of data analysis and many features depend on it. Hopefully, through a plenary discussion, we can generate an acceptable solution and move forward. Stephan has explained very clearly about how and why Flink handle "Null values in the Programming Language APIs", so I mainly talk about the second part of "Null values in the high-level (logical) APIs ". 1. Why should Flink support Null values handling in Table API? i. Data source may miss column value in many cases, if no Null values handling in Table API, user need to write an extra ETL to handle missing values manually. ii. Some Table API operators generate Null values on their own, like Outer Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table API is the prerequisite of these features. 2. The semantic of Null value handling in Table API. Fortunately, there are already mature DBMS standards we can follow for Null value handling, I list several semantic of Null value handling here. To be noted that, this may not cover all the cases, and the semantics may vary in different DBMSs, so it should totally open to discuss. I, NULL compare. In ascending order, NULL is smaller than any other value, and NULL == NULL return false. ii. NULL exists in GroupBy Key, all NULL values are grouped as a single group. iii. NULL exists in Aggregate columns, ignore NULL in aggregation function. iv. NULL exists in both side Join key, refer to #i, NULL == NULL return false, no output for NULL Join key. v. NULL in Scalar expression, expression within NULL(eg. 1 + NULL) return NULL. vi. NULL in Boolean expression, add an extra result: UNKNOWN, more semantic for Boolean expression in reference #1. vii. More related function support, like COALESCE, NVL, NANVL, and so on. 3. NULL value storage in Table API. Just set null to Row field value. To mark NULL value in serialized binary record data, normally it use extra flag for each field to mark whether its value is NULL, which would change the data layout of Row object. So any logic that access serialized Row data directly should updated to sync with new data layout, for example, many methods in RowComparator. Reference: 1. Nulls: Nothing to worry about: http://www.oracle.com/technetwork/issue-archive/2005/05-jul/o45sql-097727.html. 2. Null related functions: https://oracle-base.com/articles/misc/null-related-functions -Original Message- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Thursday, June 18, 2015 8:43 AM To: dev@flink.apache.org Subject: Re: The null in Flink Hi! I think we actually have two discussions here, both of them important: -- 1) Null values in the Programming Language APIs -- Fields in composite types may simply be null pointers. In object types: - primitives members are naturally non-nullable - all other members are nullable => If you want to avoid the overhead of nullability, go with primitive types. In Tuples, and derives types (Scala case classes): - Fields are non-nullable. => The reason here is that we initially decided to keep tuples as a very fast data type. Because tuples cannot hold primitives in Java/Scala, we would not have a way to make fast non-nullable fields. The performance of nullable fields affects the key-operations, especially on normalized keys. We can work around that with some effort, but have not one it so far. => In Scala, the Option types is a natural way of elegantly working around that. -- 2) Null values in the high-level (logial) APIs -- This is mainly what Ted was referring to, if I understood him correctly. Here, we need to figure out what form of semantical null values in the Table API and later, in SQL. Besides deciding what semantics to follow here in the logical APIs, we need to decide what these values confert to/from when switching between logical/physical APIs. On Mon, Jun 15, 2015 at 10:07 AM, Ted Dunningwrote: > On Mon, Jun 15, 2015 at 8:45 AM, Maximilian Michels > wrote: > > > Just to give an idea what null values could cause in Flink: > DataSet.count() > > returns the number of elements of all values in a Dataset (null or > > not) while #834 would ignore null values and aggregate the DataSet > > without > them. > > > > Compare R's
RE: The null in Flink
Thanks, Timo. We may put the NULL related function support to SQL API, but for Scalar expression and Boolean expression, it already been supported in Table API, without NULL value handling support, query with Scalar expression and Boolean expression would fail while encounter NULL value. Thanks Chengxiang -Original Message- From: Timo Walther [mailto:twal...@apache.org] Sent: Wednesday, November 25, 2015 7:33 PM To: dev@flink.apache.org Subject: Re: The null in Flink Hi Chengxiang, I totally agree that the Table API should fully support NULL values. The Table API is a logical API and therefore we should be as close to ANSI SQL as possible. Rows need to be nullable in the near future. 2. i, ii, iii and iv sound reasonable. But v, vi and vii sound to much like SQL magic. I think all other SQL magic (DBMS specific corner cases) should be handled by the SQL API on top of the Table API. Regards, Timo On 25.11.2015 11:31, Li, Chengxiang wrote: > Hi > In this mail list, there are some discussions about null value handling in > Flink, and I saw several related JIRAs as well(like FLINK-2203, FLINK-2210), > but unfortunately, got reverted due to immature design, and no further action > since then. I would like to pick this topic up here, as it's quite an > important part of data analysis and many features depend on it. Hopefully, > through a plenary discussion, we can generate an acceptable solution and move > forward. Stephan has explained very clearly about how and why Flink handle > "Null values in the Programming Language APIs", so I mainly talk about the > second part of "Null values in the high-level (logical) APIs ". > > 1. Why should Flink support Null values handling in Table API? > i. Data source may miss column value in many cases, if no Null values > handling in Table API, user need to write an extra ETL to handle missing > values manually. > ii. Some Table API operators generate Null values on their own, like > Outer Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table > API is the prerequisite of these features. > > 2. The semantic of Null value handling in Table API. > Fortunately, there are already mature DBMS standards we can follow for Null > value handling, I list several semantic of Null value handling here. To be > noted that, this may not cover all the cases, and the semantics may vary in > different DBMSs, so it should totally open to discuss. > I, NULL compare. In ascending order, NULL is smaller than any other > value, and NULL == NULL return false. > ii. NULL exists in GroupBy Key, all NULL values are grouped as a single > group. > iii. NULL exists in Aggregate columns, ignore NULL in aggregation > function. > iv. NULL exists in both side Join key, refer to #i, NULL == > NULL return false, no output for NULL Join key. > v. NULL in Scalar expression, expression within NULL(eg. 1 > + NULL) return NULL. > vi. NULL in Boolean expression, add an extra result: > UNKNOWN, more semantic for Boolean expression in reference #1. > vii. More related function support, like COALESCE, NVL, > NANVL, and so on. > > 3. NULL value storage in Table API. >Just set null to Row field value. To mark NULL value in serialized binary > record data, normally it use extra flag for each field to mark whether its > value is NULL, which would change the data layout of Row object. So any logic > that access serialized Row data directly should updated to sync with new data > layout, for example, many methods in RowComparator. > > Reference: > 1. Nulls: Nothing to worry about: > http://www.oracle.com/technetwork/issue-archive/2005/05-jul/o45sql-097727.html. > 2. Null related functions: > https://oracle-base.com/articles/misc/null-related-functions > > -Original Message- > From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf > Of Stephan Ewen > Sent: Thursday, June 18, 2015 8:43 AM > To: dev@flink.apache.org > Subject: Re: The null in Flink > > Hi! > > I think we actually have two discussions here, both of them important: > > -- > 1) Null values in the Programming Language APIs > -- > > Fields in composite types may simply be null pointers. > > In object types: >- primitives members are naturally non-nullable >- all other members are nullable > > => If you want to avoid the overhead of nullability, go with primitive types. > > In Tuples, and derives types (Scala case classes): >- Fields are non-nullable. > > => The reason here
RE: A proposal about skew data handling in Flink
Thanks a lot for the comments, Fabian. I agree with you on the plan mostly, just add some more thoughts about Non-Range-Equally-Splittable case here. 1. Let's assume a case which 10% data is skewed on certain key, in this case, as long as the parallelism is larger than 10, it would fit into Non-Range-Equally-Splittable case. So it should not be very corner case of skew issue. 2. In proposal, the solution of Non-Range-Equally-Splittable case is based on 2 new RangePartitioner and little optimizer logic, which has been touched already in the plan #1, #2. It does not require to change anything about the operator semantics, so if we have a good partitioner abstraction, I think it does not add much complexity for Flink to handle this kind of issue. It should not block anything, after finished the simple case, we would have more knowledge about the implementation details, then we can look back at this issue, and decide whether it's deserved to be resolved at the cost. Thanks Chengxiang -Original Message- From: Fabian Hueske [mailto:fhue...@gmail.com] Sent: Monday, October 19, 2015 7:15 PM To: dev@flink.apache.org Subject: Re: A proposal about skew data handling in Flink Hi, First of all, thanks a lot for this extensive proposal! It contains a lot of good observations and techniques how to address data skew. I have a few remarks: 1) The terms Input and Output Contract were introduced in the first scientific publications and are not used anymore. Input Contract are what we call operators or transformations today, the concept of output contract is completely gone. In the current code, we have operators like Map, Reduce, and Join that describe how data needs to be organized (by key, etc.) and UDFs that process the data. 2) I would categorize skew as follows: - UDF Call Complexity Skew: The input cardinalities of UDF calls differ (only applicable to group-based operators such as GroupReduce and CoGroup) or the computational complexity of UDF calls depends on the data and varies a lot. UDF calls are the smallest parallelizable unit. It is not possible to change that without changing the semantics. Combiners can help to reduce the effect of skew for group-based operators. - Input Partition Skew: The cardinality of parallel partitions varies. This is handled by Flink as follows: - Lazy split assignment for data sources - Operators that do not require special partitioning (Map, Filter, Cross, etc.) just consume the output partitions of the preceding operator. Rebalance() can be used to enforce round-robin partitioning to equalize size of all partitions. - Operators that require key-based partitioning use hash partitioning. Range partitioning can help address significant data skew. - UDF Call Skew: The number of UDF calls per parallel partition varies. This can be an issue for n-m joins which essentially result in Cartesian products. - UDF Call Skew is most relevant for Joins - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by controlling Input Partition Skew 3) I agree that we should not try to detect and automatically fix data skew (at the moment) but give users tools to manually manage skew. 4) I would focus on addressing the Input Partition Skew problem. UDF Call Complexity Skew cannot be addressed because it would change the semantics of operators. UDF Call Skew is only affecting joins and much harder to solve. 5) I wonder how much the practical gain is to address the Non-Range-Equally-Splittable case compared to the added code complexity. In general, tackling skew is a very good idea, but solving corner cases with quite complex methods might make future features more complicated to add. Hence, I would propose to focus on the common and "easy" cases first. I would address Input Partition Skew first and ignore the Non-Range-Equally-Splittable case for now. We can do this in two steps: 1) Add the "simple" range partitioner as in your pull request for unary operators (explicit range partitioning, total order, groupBy). Once the sampling happens online, this is a very good addition to Flink. 2) Add the "simple" range partitioner also for binary operators (join, coGroup). This will be a bit more tricky, because we need to do a coordinated decision for both inputs. 3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe through optimizer hints. Since we want to have this transparently handled by the API and engine, we need to add a lot of these features into the optimizer, or JobGraphGenerator to be more precisely. Does that make sense to you? Cheers, Fabian 2015-10-16 17:13 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > Hi, > > thanks for starting a discussion about data skew! I agree, it's a > important issue that can cause a lot of problems. > I'll have a look at your proposal and add comments soon. > > Thanks, Fabian > > 201
RE: [Proposal] Create a separate sub module for benchmark test
Thanks for the correctness, Stephan. For the micro benchmark, I think it should be better to make a new maven project inside Flink. Thanks Chengxiang -Original Message- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Tuesday, September 22, 2015 8:14 PM To: dev@flink.apache.org Subject: Re: [Proposal] Create a separate sub module for benchmark test Sounds like a nice idea! Do you want to make this a new maven project as part of the Flink repository, or create a dedicated repository for that? BTW: We are currently not mixing microbenchmarks with test execution. The code for these benchmarks resides in the test scope of the projects (so it is not packaged), but it is not executed as part of the UnitTests or IntegrationTests. Greetings, Stephan On Tue, Sep 22, 2015 at 12:22 PM, Li, Chengxiang <chengxiang...@intel.com> wrote: > Hi, folks > During work on Flink, I found several micro benchmarks which come from > different modules, these benchmarks measure on manual, annotated with > Junit annotations, so they got executed during unit test as well. > There are some shortage on current implementation: > > 1. Benchmark test performance instead of feature, and normally, it > takes much more time than unit test. Mixed benchmark with unit test > would expand the CI check time. > > 2. With mixed with other tests, no warm up, no standalone process... > these benchmarks result may not very accurate. > Although looks easy, there are actually many pitfalls about benchmark, > so I suggest we create a new sub module for all benchmark test, and > import JMH( > http://openjdk.java.net/projects/code-tools/jmh/) as the benchmark > framework. With the help of JMH, we should get: > > 1. More available metrics. > > 2. More accurate result. > > 3. Focus on benchmark logic only, no need to worry about measure > logic. > > Thanks > Chengxiang >
[Proposal] Create a separate sub module for benchmark test
Hi, folks During work on Flink, I found several micro benchmarks which come from different modules, these benchmarks measure on manual, annotated with Junit annotations, so they got executed during unit test as well. There are some shortage on current implementation: 1. Benchmark test performance instead of feature, and normally, it takes much more time than unit test. Mixed benchmark with unit test would expand the CI check time. 2. With mixed with other tests, no warm up, no standalone process... these benchmarks result may not very accurate. Although looks easy, there are actually many pitfalls about benchmark, so I suggest we create a new sub module for all benchmark test, and import JMH(http://openjdk.java.net/projects/code-tools/jmh/) as the benchmark framework. With the help of JMH, we should get: 1. More available metrics. 2. More accurate result. 3. Focus on benchmark logic only, no need to worry about measure logic. Thanks Chengxiang
Use bloom filter to improve hybrid hash join performance
Hi, flink developers I read the flink hybrid hash join documents and implementation, very nice job. For the case of small table does not all fit into memory, I think we may able to improve the performance better. Currently in hybrid hash join, while small table does not fit into memory, part of the small table data would be spilled to disk, and the counterpart partition of big table data would be spilled to disk in probe phase as well. You can find detail description here: http://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html . If we build a bloom filter while spill small table to disk during build phase, and use it to filter the big table records which tend to be spilled to disk, this may greatly reduce the spilled big table file size, and saved the disk IO cost for writing and further reading. I have created FLINK-2240https://issues.apache.org/jira/browse/FLINK-2240 about it, I would like to contribute on this optimization if someone can assign the JIRA to me. But before that, I would like to hear your comments about this. Thanks Chengxiang