RE: The null in Flink
Hi, Summary of our discussion about NULL value handling in FLink: 1. Read from data source to Table/Row DataSet directly is necessary for NULL value handling. 2. NULL value representation in Row object, this would change its binary data layout, so we would need new Row Serializer/Comparator(and its dependency) which aware of this new binary data layout. Tuple and Case Class serializer/Comparator should remain the same. 3. NULL value handling in operations. We would follow the SQL standard as default, but these are not concluded yet, any more input would be welcomed. I've created an umbrella JIRA(https://issues.apache.org/jira/browse/FLINK-3139) for this, the following subtasks based on the previous 3 aspects would be created as well, so anyone interested could contribute and comment on all subtasks. And we could also move discussion on specified issues to JIRA system. Thanks Chengxiang -Original Message- From: Li, Chengxiang [mailto:chengxiang...@intel.com] Sent: Thursday, December 3, 2015 4:43 PM To: dev@flink.apache.org Subject: RE: The null in Flink Hi, Stephan Treat UNKOWN as FALSE may works if the Boolean expression is used in filter operation, but for other operations, such as select and groupBy, it does not make sense anymore, we should need UNKNOWN value(or unified as NULL) to distinguish with TRUE/FALSE . Thanks Chengxiang -Original Message- From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan Ewen Sent: Wednesday, December 2, 2015 6:27 PM To: dev@flink.apache.org Subject: Re: The null in Flink Hi Chenliang! I have to dig into this again, it was a while back. I think (vaguely) the reason why this worked was that in the end (at the root of a tree that is a logical expression) if the result is UNKNOWN, it is treated like FALSE. For example a predicate like "WHERE t.a > 10 && t.b == 'pending' ". If one boolean atom is UNKNOWN, the other is TRUE, the whole term becomes UNKNOWN and the row is filtered out (as if the predicate was false) - the result of the query contains no rows where predicate results are UNKNOWN. Stephan On Tue, Dec 1, 2015 at 4:09 AM, Li, Chengxiangwrote: > Stephen, > For the 3rd topic, you mentioned that "If the boolean expressions are > monotonous (have no NOT), then the UNKNOWN value can be the same as > FALSE ", as UNKNOWN means it could be TRUE as well, does it a proper > way to handle it just as FALSE? > > Aljoscha, > I agree with you, Table can only be transformed from Tuple/Case Class > DataSet now, and Tuple/Case Class does not allow null field value, so > read files from data source to Row DataSet is necessary for NULL value > handling. > > -Original Message- > From: Aljoscha Krettek [mailto:aljos...@apache.org] > Sent: Friday, November 27, 2015 6:41 PM > To: dev@flink.apache.org > Subject: Re: The null in Flink > > Oh, this is probably the Jira for what I mentioned: > https://issues.apache.org/jira/browse/FLINK-2988 > > > On 27 Nov 2015, at 11:02, Aljoscha Krettek wrote: > > > > Hi, > > just some information. The Table API code generator already has > preliminary support for generating code that is NULL-aware. So for > example if you have expressions like 1 + NULL the result would also be null. > > > > I think one of the missing pieces is a way to get data that contains > null values into the system. For example, right now the expected way > to read csv files is via tuples and they don’t support null values. I > think we need a way to directly read CSV files into a Row DataSet (or Table). > > > > Cheers, > > Aljoscha > >> On 26 Nov 2015, at 12:31, Stephan Ewen wrote: > >> > >> Hi! > >> > >> Thanks for the good discussion! Here are some thoughts from my side: > >> > >> 1) > >> I would agree with Chengxiang that it helps to have as much NULL > >> handling in the table API as possible, since most SQL constructs > >> will be permitted there are well. > >> > >> 2) > >> A question that I have is whether we want to actually follow the > >> SQL standard exactly. There is a lot of criticism on NULL in the > >> SQL standard, and there have been many good proposals for more > >> meaningful semantics (for example differentiate between the > >> meanings "value missing", "value unknown", "value not applicable", etc). > >> > >> Going with the SQL way is easiest and makes SQL addition on top of > >> the table API much easier. Also, there is only one type of NULL, > >> meaning that null-values can be encoded efficiently in bitmaps. > >> Further more, the fact that the Table API users have the power of a > >> programming language at hand (rather than the limited set of SQL > >> operators), they should be able to easily define their own > >> constants for special meanings like "value not applicable" or so. > >> > >> Just curious if anyone has experience with some of the other > >> null-sematic proposals that have
[jira] [Created] (FLINK-3140) NULL value data layout in Row Serializer/Comparator
Chengxiang Li created FLINK-3140: Summary: NULL value data layout in Row Serializer/Comparator Key: FLINK-3140 URL: https://issues.apache.org/jira/browse/FLINK-3140 Project: Flink Issue Type: Sub-task Components: Table API Reporter: Chengxiang Li To store/materialize NULL value in Row objects, we should need new Row Serializer/Comparator which is aware of NULL value fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3139) NULL values handling in Table API
Chengxiang Li created FLINK-3139: Summary: NULL values handling in Table API Key: FLINK-3139 URL: https://issues.apache.org/jira/browse/FLINK-3139 Project: Flink Issue Type: Task Components: Table API Reporter: Chengxiang Li This is an umbrella task for NULL value handling in Table API. As the logical API for queries, Table API should support handling NULL values, NULL value is quite a common case during logical query, for example: # Data source may miss column value in many cases("value missing", "value unknown", "value not applicable" .etc ). # Some operators generate Null values on their own, like Outer Join/Cube/Rollup/Grouping Set, and so on. Null values handling in Table API is the prerequisite of these features. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3142) CheckpointCoordinatorTest fails
Matthias J. Sax created FLINK-3142: -- Summary: CheckpointCoordinatorTest fails Key: FLINK-3142 URL: https://issues.apache.org/jira/browse/FLINK-3142 Project: Flink Issue Type: Bug Components: Tests Reporter: Matthias J. Sax Priority: Critical https://travis-ci.org/mjsax/flink/jobs/95439203 {noformat} Tests run: 14, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 63.713 sec <<< FAILURE! - in org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest testMaxConcurrentAttempsWithSubsumption(org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest) Time elapsed: 60.145 sec <<< FAILURE! java.lang.AssertionError: null at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertNotNull(Assert.java:621) at org.junit.Assert.assertNotNull(Assert.java:631) at org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.testMaxConcurrentAttempsWithSubsumption(CheckpointCoordinatorTest.java:946) {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3141) Design of NULL values handling in operation
Chengxiang Li created FLINK-3141: Summary: Design of NULL values handling in operation Key: FLINK-3141 URL: https://issues.apache.org/jira/browse/FLINK-3141 Project: Flink Issue Type: Sub-task Reporter: Chengxiang Li We discuss and finalize how NULL value is handled in specified cases here. this is the first proposal: # NULL compare. In ascending order, NULL is smaller than any other value, and NULL == NULL return false. # NULL exists in GroupBy Key, all NULL values are grouped as a single group. # NULL exists in Aggregate columns, ignore NULL in aggregation function. # NULL exists in both side Join key, refer to #i, NULL == NULL return false, no output for NULL Join key. # NULL in Scalar expression, expression within NULL(eg. 1 + NULL) return NULL. # NULL in Boolean expression, add an extra result: UNKNOWN, more semantic for Boolean expression in reference #1. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
JobManagerCheckpointRecoveryITCase
Hi, I had a failing build due to "Maven produced no output for 300 seconds." It seems that > JobManagerCheckpointRecoveryITCase.testCheckpointedStreamingSumProgram() got stuck. Did anyone see this before? Should I open an "instable-test" JIRA? https://travis-ci.org/mjsax/flink/jobs/95439206 -Matthias signature.asc Description: OpenPGP digital signature
[jira] [Created] (FLINK-3129) Add tooling to ensure interface stability
Robert Metzger created FLINK-3129: - Summary: Add tooling to ensure interface stability Key: FLINK-3129 URL: https://issues.apache.org/jira/browse/FLINK-3129 Project: Flink Issue Type: Sub-task Components: Build System Reporter: Robert Metzger Assignee: Robert Metzger I would like to use this maven plugin: https://github.com/siom79/japicmp to automatically ensure interface stability across minor releases. Ideally we have the plugin in place after Flink 1.0 is out, so that maven builds break if a breaking change has been made. The plugin already supports downloading a reference release, checking the build and breaking it. Not yet supported are class/method inclusions based on annotations, but I've opened a pull request for adding it. There are also issues with the resolution of the dependency with the annotations, but I'm working on resolving those issues. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3131) Expose checkpoint metrics
Ufuk Celebi created FLINK-3131: -- Summary: Expose checkpoint metrics Key: FLINK-3131 URL: https://issues.apache.org/jira/browse/FLINK-3131 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 0.10.1 Reporter: Ufuk Celebi Assignee: Ufuk Celebi Metrics about checkpoints are only accessible via the job manager logs and only show information about the completed checkpoints. The checkpointing metrics should be exposed in the web frontend, including: - number - duration - state size -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [jira] [Created] (FLINK-3132) Restructure streaming guide
Hi All, I am really thrill with Apache Flink community work which is going. Can we have the PDF versions of these web-pages so that even offline user can use for reference. Cheer's, Nyamath Ulla Khan On Mon, Dec 7, 2015 at 7:23 PM, Ufuk Celebi (JIRA)wrote: > Ufuk Celebi created FLINK-3132: > -- > > Summary: Restructure streaming guide > Key: FLINK-3132 > URL: https://issues.apache.org/jira/browse/FLINK-3132 > Project: Flink > Issue Type: Improvement > Components: Documentation > Affects Versions: 0.10.1 > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Fix For: 1.0.0 > > > The streaming guide is in parts very cluttered and overloaded. > > I think it needs a restructuring and de-cluttering by splitting it up into > multiple pages. > > This will make it easier to navigate to the relevant parts of it. It will > also improve the custom search. At the moment you often end up on the > single long page streaming guide, where you still need to find it manually. > > Some introductory notes about the main concepts are missing as well. > > > > > -- > This message was sent by Atlassian JIRA > (v6.3.4#6332) > -- Thanks and Regards Nyamath Ulla Khan
[jira] [Created] (FLINK-3130) Visualize throughput of tasks
Ufuk Celebi created FLINK-3130: -- Summary: Visualize throughput of tasks Key: FLINK-3130 URL: https://issues.apache.org/jira/browse/FLINK-3130 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 0.10.1 Reporter: Ufuk Celebi Assignee: Ufuk Celebi Priority: Minor Currently, the web runtime monitor only shows the number of transferred bytes and records per task. From this you can manually infer the throughput of the system, but it is very cumbersome. It should be possible to display/plot the throughput over time as well. This will improve the understanding of the system's runtime behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3132) Restructure streaming guide
Ufuk Celebi created FLINK-3132: -- Summary: Restructure streaming guide Key: FLINK-3132 URL: https://issues.apache.org/jira/browse/FLINK-3132 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 0.10.1 Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 1.0.0 The streaming guide is in parts very cluttered and overloaded. I think it needs a restructuring and de-cluttering by splitting it up into multiple pages. This will make it easier to navigate to the relevant parts of it. It will also improve the custom search. At the moment you often end up on the single long page streaming guide, where you still need to find it manually. Some introductory notes about the main concepts are missing as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3133) Introduce collect()/coun()/print() methods in DataStream API
Maximilian Michels created FLINK-3133: - Summary: Introduce collect()/coun()/print() methods in DataStream API Key: FLINK-3133 URL: https://issues.apache.org/jira/browse/FLINK-3133 Project: Flink Issue Type: Improvement Components: DataStream API, Streaming Affects Versions: 0.10.1, 0.10.0, 1.0.0 Reporter: Maximilian Michels Fix For: 1.0.0 The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should be mirrored to the DataStream API. The semantics of the calls are different. We need to be able to sample parts of a stream, e.g. by supplying a time period in the arguments to the methods. Collect/count/print should be lazily evaluated. Users should use the {{StreamEnvironment}} to retrieve the results. {code:java} StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment(); DataStream printSink = env.addSource(..).print(); ResultQueryable queryObject = env.executeWithResultQueryable(); List sampled = queryObject.retrieve(printSink, Time.seconds(5)); {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3134) Make YarnJobManager's allocate call asynchronous
Maximilian Michels created FLINK-3134: - Summary: Make YarnJobManager's allocate call asynchronous Key: FLINK-3134 URL: https://issues.apache.org/jira/browse/FLINK-3134 Project: Flink Issue Type: Bug Components: YARN Client Affects Versions: 0.10.1, 0.10.0, 1.0.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 1.0.0 The {{allocate()}} call is used in the {{YarnJobManager}} to send a heartbeat to the YARN resource manager. This call may block the JobManager actor system for arbitrary time, e.g. if retry handlers are set up within the call to allocate. I propose to use the {{AMRMClientAsync}}'s callback methods to send heartbeats and update the container information. The API is available for our supported Hadoop versions (2.3.0 and above). https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3135) Add chainable driver for UNARY_NO_OP strategy
Fabian Hueske created FLINK-3135: Summary: Add chainable driver for UNARY_NO_OP strategy Key: FLINK-3135 URL: https://issues.apache.org/jira/browse/FLINK-3135 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 1.0.0 Reporter: Fabian Hueske Priority: Minor A chainable driver for UNARY_NO_OP strategy would decrease the serialization overhead in certain situations. Should be fairly easy to implement. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Monitoring backpressure
Hey guys, Is there any way to monitor the backpressure in the Flink job? I find it hard to debug slow operators because of the backpressure mechanism so it would be good to get some info out of the network layer on what exactly caused the backpressure. For example: task1 -> task2 -> task3 -> task4 I want to figure out whether task 2 or task 3 is slow. Any ideas? Thanks, Gyula
[jira] [Created] (FLINK-3127) Measure backpressure in Flink jobs
Maximilian Michels created FLINK-3127: - Summary: Measure backpressure in Flink jobs Key: FLINK-3127 URL: https://issues.apache.org/jira/browse/FLINK-3127 Project: Flink Issue Type: Improvement Components: Streaming, TaskManager, Webfrontend Affects Versions: 0.10.1, 0.10.0 Reporter: Maximilian Michels Fix For: 1.0.0 Multiple users have asked for a simple way to display the backpressure in a Flink job. Backpressure could be determined by monitoring throughput and allocated network buffers at the task managers. Subsequently, a reliable measure of backpressue could be used to adjust the read behavior of sources, e.g. adjust the offset of the Kafka consumer. Also, the web frontend should display the backpressure measure to allow users to find bottlenecks in there jobs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Monitoring backpressure
I discussed about this quite a bit with other people. It is not totally straightforward. One could try and measure exhaustion of the output buffer pools, but that fluctuates a lot - it would need some work to get a stable metric from that... If you have a profiler that you can attach to the processes, you could check whether a lot of time is spent within the "requestBufferBlocking()" method of the buffer pool... Stephan On Mon, Dec 7, 2015 at 9:45 AM, Gyula Fórawrote: > Hey guys, > > Is there any way to monitor the backpressure in the Flink job? I find it > hard to debug slow operators because of the backpressure mechanism so it > would be good to get some info out of the network layer on what exactly > caused the backpressure. > > For example: > > task1 -> task2 -> task3 -> task4 > > I want to figure out whether task 2 or task 3 is slow. > > Any ideas? > > Thanks, > Gyula >
Re: Monitoring backpressure
Thanks Stephan, I will try with the profiler for now. Gyula Stephan Ewenezt írta (időpont: 2015. dec. 7., H, 10:51): > I discussed about this quite a bit with other people. > > It is not totally straightforward. One could try and measure exhaustion of > the output buffer pools, but that fluctuates a lot - it would need some > work to get a stable metric from that... > > If you have a profiler that you can attach to the processes, you could > check whether a lot of time is spent within the "requestBufferBlocking()" > method of the buffer pool... > > Stephan > > > On Mon, Dec 7, 2015 at 9:45 AM, Gyula Fóra wrote: > > > Hey guys, > > > > Is there any way to monitor the backpressure in the Flink job? I find it > > hard to debug slow operators because of the backpressure mechanism so it > > would be good to get some info out of the network layer on what exactly > > caused the backpressure. > > > > For example: > > > > task1 -> task2 -> task3 -> task4 > > > > I want to figure out whether task 2 or task 3 is slow. > > > > Any ideas? > > > > Thanks, > > Gyula > > >
Re: Lack of review on PRs
Hi Sachin, I can understand your dissatisfaction with the review process of your ML PRs. They are open for a long time without much activity from a committer even though you've spent a lot of effort writing them. I'm sorry for this, since it's probably mainly because of me lacking the time resources to review them. I agree with Stephan that reviewing ML algorithms is particularly time consuming because you not only have to verify the parallel implementation but first of all you have to understand the algorithm. This usually requires reading the respective literature. Furthermore, a close examination is often necessary because a single wrong sign renders the math and also the algorithm directly wrong. Figuring these things out when it's still fresh is usually easier. I personally prefer not to rush things and rather merge only things which are working correctly, ideally supported by exhaustive tests, and efficiently implemented. At the moment I think the biggest problem is that there are too few committers working on ML. Currently, it's hard for me to allocate time for it and I guess so for others too. It would be great if the review process wouldn't solely depend on the committers, though. However, I don't see an immediate solution for this problem. Therefore, I would ask you to not lose patience with the community, even though this might not be a satisfying answer for you. Cheers, Till On Sun, Dec 6, 2015 at 8:59 PM, Stephan Ewenwrote: > Hi Sachin! > > Thank you for honestly speaking your mind on this issue. > I agree that handling ML pull requests is particularly challenging right > now for various reasons. > > There are currently very few active committers that have the background to > actually review these pull requests. The whole work basically hinges on > Till and Chiwan. As far as I know, both of them work on many things, and > have only a certain time they can spend on ML. With regard to ML, Flink is > simply a bit understaffed. > > The ML parts hard hard to develop. They require people who understand math, > know how to write good code, and can think deeply into the parallel systems > aspects of Flink. A lot of ML pull requests had to be review multiple times > because they did not bring these aspects together. > > Finally, the ML pull requests are very time consuming to review, because > the reviewer usually has to learn the implemented algorithm (or the > parallel adaption) before being able to properly review it. As a result, > while most other pull requests can be reviewed with a bit of time next to > another weekly agenda, ML pull requests need a lot of dedicated time. > > > The things I see that can really improve the situation are these: > > 1) Try to get more people in the community to help bring these pull > requests into shape. While the group of ML-savvy committers is still small, > it would be good if contributors could help each other out there are well, > by reviewing the pull requests, and helping to get them into shape. Pull > requests that are high quality by the time one of the committers looks over > them can be merged relatively quickly. > > 2) Honestly reject some pull requests. Focusing the time on the promising > ML pull requests and earlier rejecting pull requests that are far away from > good shape. Giving the rejected pull requests some coarser feedback about > what categories to improve, rather than detailed comments on each part. > While it is nice to try and shepherd as many pull requests in as possible > (and the community really tries to do that), it may not be possible in that > area, as long as there is still not enough ML people. > > Maybe Till and Chiwan can share their thoughts on this. > > > Greetings, > Stephan > > > On Sun, Dec 6, 2015 at 5:44 PM, Chiwan Park wrote: > > > Hi Sachin, > > > > I’m sorry for your unsatisfied experience about lack of review. > > > > As you know, because there are few committers (including me) whose > > interest is ML, the review of PRs could be completed slowly. But I admit > my > > fault about leaving the PRs unreviewed for 3-5 months. > > > > I’ll try to review your PRs as soon as possible. > > > > > On Dec 6, 2015, at 1:00 AM, Sachin Goel > > wrote: > > > > > > Hi all > > > Sorry about a weekend email. > > > > > > This email is to express my displeasure over the lack of any review on > my > > > PRs on extending the ML library. Five of my PRs have been without any > > > review for times varying from 3-5 months now. > > > When I took up the task of extending the ML library by implementing > core > > > algorithms such as Decision Tree and k-means clustering [with several > > > initialization schemes], I had hoped that the community will be > actively > > > involved in it since ML is a very important component of any big data > > > system these days. However, it appears I have been wrong. > > > Surely, the initial reviews required a lot of changes from
[jira] [Created] (FLINK-3128) Add Isotonic Regression To ML Library
Fridtjof Sander created FLINK-3128: -- Summary: Add Isotonic Regression To ML Library Key: FLINK-3128 URL: https://issues.apache.org/jira/browse/FLINK-3128 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Fridtjof Sander Priority: Minor Isotonic Regression fits a monotonically increasing function (also called isotonic function) to a plane of datapoints. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3136) Scala Closure Cleaner uses wrong ASM import
Stephan Ewen created FLINK-3136: --- Summary: Scala Closure Cleaner uses wrong ASM import Key: FLINK-3136 URL: https://issues.apache.org/jira/browse/FLINK-3136 Project: Flink Issue Type: Bug Components: Scala API Affects Versions: 0.10.1 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Critical Fix For: 1.0.0, 0.10.2 The closure cleaner uses Kryo's ReflectASM's ASM. That is currently in version 4.0, which is incompatible with later Scala versions. Using ASM directly gives version 5.0. Flink also shades that ASM version correctly away in the end. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3137) Missing break in OutputEmitter ctor
Ted Yu created FLINK-3137: - Summary: Missing break in OutputEmitter ctor Key: FLINK-3137 URL: https://issues.apache.org/jira/browse/FLINK-3137 Project: Flink Issue Type: Bug Reporter: Ted Yu Here is related code: {code} switch (strategy) { case PARTITION_CUSTOM: extractedKeys = new Object[1]; case FORWARD: case PARTITION_HASH: {code} Looks like a break is missing following assignment to extractedKeys -- This message was sent by Atlassian JIRA (v6.3.4#6332)