Re: Tuple project method
Sorry, to be effective the project should also take in input the target tuple itself :) Tuple3x,x,x reuse = tuple.project(reuse, 0,2,5)? On Wed, May 27, 2015 at 11:51 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi flinkers, it happens very often to me that I have to output a reuse tuple that basically is a subset of the data contained of the input tuple..do you think it could be useful to add a project method to Tuple class? So that to be able to write something like: Tuple3x,x,x reuse = tuple.project(0,2,5)? Best, Flavio
Re: Package multiple jobs in a single jar
Hi Matthias, I understand your point about advertising the interfaces but there is so much stuff to be advertised :). Honestly, I think ProgramDescription doesn't add much value although it is kind of neat. Parameters can be described in the code or by displaying a help message. However, I'm in favor of making it easier to list all executable classes in a JAR. Therefore, I like your proposed changes. I just don't see much of a use of the Program or ProgramDescription interface in the examples. That's just my personal opinion. Best regards, Max On Tue, May 26, 2015 at 5:10 PM, Flavio Pompermaier pomperma...@okkam.it wrote: I agree with Matthias,I didn't know about ProgramDesciption and Program Interfaces because they are not advertised anywhere.. On Tue, May 26, 2015 at 5:01 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: I see your point. However, right now only few people are aware of ProgramDesciption interface. If we want to advertise for it, it should be used (at least) in a few examples. Otherwise, people will never use it, and the changes I plan to apply are kind of useless. I would even claim, that the interface should be removed completely is this case... On 05/26/2015 03:31 PM, Maximilian Michels wrote: Sorry, my bad. Yes, it is helpful to have a separate program and parameter description in ProgramDescription. I'm not sure if it adds much value to implement ProgramDescription in the examples. It introduces verbosity and might give the impression that you have to implement ProgramDescription in your Flink job. On Tue, May 26, 2015 at 12:00 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi Max, thanks for your feedback. I guess you confuse the interfaces Program and ProgramDescription. Using Program the use of main method is replaced by getPlan(...). However, ProgramDescription only adds method getDescription() which returns a string that explains the usage of the program (ie, short description, expected parameters). Thus, adding ProgramDescription to the examples, does not change the examples -- main method will still be uses. It only adds the ability that a program explains itself (ie, give meta info). Furhtermore, ProgramDescription is also not related to the new ParameterTool. -Matthias On 05/26/2015 11:46 AM, Maximilian Michels wrote: I don't think `getDisplayName()` is necessary either. The class name and the description string should be fine. Adding ProgramDescription to the examples is not necessary; as already pointed out, using the main method is more convenient for most users. As far as I know, the idea of the ParameterTool was to use it only in the user code and not automatically handle parameters. Changing the interface would be quite API breaking but since most programs use the main method, IMHO we could do it. On Fri, May 22, 2015 at 10:09 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Makes sense to me. :) One more thing: What about extending the ProgramDescription interface to have multiple methods as Flavio suggested (with the config(...) method that should be handle by the ParameterTool) public interface FlinkJob { /** The name to display in the job submission UI or shell */ //e.g. My Flink HelloWorld String getDisplayName(); //e.g. This program does this and that etc.. String getDescription(); //e.g. 0,Integer,An integer representing my first param, 1,String,An string representing my second param ListTuple3Integer, TypeInfo, String paramDescription; /** Set up the flink job in the passed ExecutionEnvironment */ ExecutionEnvironment config(ExecutionEnvironment env); } Right now, the interface is used only a couple of times in Flink's code base, so it would not be a problem to update those classes. However, it could break external code that uses the interface already (even if I doubt that the interface is well known and used often [or at all]). I personally don't think, that getDiplayName() to too helpful. Splitting the program description and the parameter description seems to be useful. For example, if wrong parameters are provided, the parameter description can be included in the error message. If program+parameter description is given in a single string, this is not possible. But this is only a minor issue of course. Maybe, we should also add the interface to the current Flink examples, to make people more aware of it. Is there any documentation on the web site. -Matthias On 05/22/2015 09:43 PM, Robert Metzger wrote: Thank you for working on this. My responses are inline below: (Flavio) My suggestion is to create a specific Flink interface to get also description of a job and standardize parameter passing. I've
Tuple project method
Hi flinkers, it happens very often to me that I have to output a reuse tuple that basically is a subset of the data contained of the input tuple..do you think it could be useful to add a project method to Tuple class? So that to be able to write something like: Tuple3x,x,x reuse = tuple.project(0,2,5)? Best, Flavio
Re: [DISCUSS] Dedicated streaming mode
Hi Henry! I think the idea was to have a dedicated streaming mode as long as the default cluster mode does not support batch and streaming equally well. Once we have reached this level in the dedicated streaming mode, this will be the default cluster mode. I share your doubts about whether it is a good idea to advertise the streaming mode. It might let people think that a Flink cluster can only do either one of the modes. Best, Max On Tue, May 26, 2015 at 8:53 PM, Henry Saputra henry.sapu...@gmail.com wrote: Ah yes, technically the streaming mode could run batch jobs as well in Flink. I am thinking that it could cause confusion with users since most systems that does batch and stream (well, pretty much Spark ^_^) does not differentiate the deployment topologies for the cluster to support different modes of applications. - Henry On Tue, May 26, 2015 at 11:44 AM, Stephan Ewen se...@apache.org wrote: The streaming mode runs batch jobs as well :-) There should be slightly reduced predictability in the memory management in the streaming mode, but otherwise there should not be a problem. So if you want to run mixed workloads, you start the streaming mode. (Note: Currently, the batch mode runs streaming jobs as well, but gives them very little memory. I am thinking of prohibiting that (separate discussion), to prevent people from not noticing that and running a highly sub-optimal Flink setup.) On Tue, May 26, 2015 at 8:26 PM, Henry Saputra henry.sapu...@gmail.com wrote: One immediate concern I have is the deployment topology. With streaming has its own cluster deployment, this means that in standalone mode, if ops would like to deploy Flink it has to know what mode it needs to deploy Flink as, either batch or Streaming. So, if the use case was to support both batch and streaming, would that mean the deployment need to separate 2 clusters to support different applications to run on Flink? I think this would be ok if Flink is deployed in YARN or other resource management platforms like Mesos or Apache Myriad. Maybe someone, like Robert, could confirm this is the case. - Henry On Tue, May 26, 2015 at 1:51 AM, Maximilian Michels m...@apache.org wrote: +1 great changes coming up! I like the idea that, ultimately, Flink will handle streaming and batch programs equally well independently of the chosen cluster startup mode. What is the time frame for these changes? On Tue, May 26, 2015 at 7:34 AM, Henry Saputra henry.sapu...@gmail.com wrote: Thanks Aljoscha and Stephan, this helps - Henry On Fri, May 22, 2015 at 4:37 AM, Stephan Ewen se...@apache.org wrote: Aljoscha is right. There are plans to migrate the streaming state to the MemoryManager as well, but streaming state is not managed at this point. What is managed in streaming jobs is the data buffered and cached in the network stack. But that is a different memory pool than the memory manager. We keep those pools separate because the network stack is currently more advanced in terms of dynamically rebalancing memory, compared to the memory manager. On Fri, May 22, 2015 at 12:25 PM, Aljoscha Krettek aljos...@apache.org wrote: Hi, streaming currently does not use any memory manager. All state is kept in Java Objects on the Java Heap, for example an ArrayList for the window buffer. On Thu, May 21, 2015 at 11:56 PM, Henry Saputra henry.sapu...@gmail.com wrote: Hi Stephan, Gyula, Paris, How does streaming currently different in term of memory management? Currently we only have one MemoryManager which is used by both modes I believe. - Henry On Thu, May 21, 2015 at 12:34 PM, Stephan Ewen se...@apache.org wrote: I discussed a bit via Skype with Gyula and Paris. We thought about the following way to do it: - We add a dedicated streaming mode for now. The streaming mode supersedes the batch mode, so it can run both type of programs. - The streaming mode sets the memory manager to lazy allocation. - So long as it runs pure streaming jobs, the full heap will be available to window buffers and UDFs. - Batch programs can still run, so mixed workloads are not prevented. Batch programs are a bit less robust there, because the memory manager does not pre-allocate memory. UDFs can eat into Flink's memory portion. - The streaming mode starts the necessary configured components/services for state backups Over the next versions, we want to bring these things together: - use the managed memory for window buffers - on-demand starting of the state backend Then, we deprecate the streaming mode, let both modes start the cluster in the
Re: Tuple project method
It would be an interesting addition. Such a method cannot be done fully type safe in Java, but that might be okay, since it is user-code internal. On Wed, May 27, 2015 at 11:52 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Sorry, to be effective the project should also take in input the target tuple itself :) Tuple3x,x,x reuse = tuple.project(reuse, 0,2,5)? On Wed, May 27, 2015 at 11:51 AM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi flinkers, it happens very often to me that I have to output a reuse tuple that basically is a subset of the data contained of the input tuple..do you think it could be useful to add a project method to Tuple class? So that to be able to write something like: Tuple3x,x,x reuse = tuple.project(0,2,5)? Best, Flavio
Re: [DISCUSS] Canceling Streaming Jobs
I would also prefer the second option. The first is rather a hack but not an option. :D On May 27, 2015 9:14 AM, Márton Balassi balassi.mar...@gmail.com wrote: +1 for the second option: It would also provide possibility to properly commit a state checkpoint after the terminate message was triggered. In some cases this can be a desirable behaviour. On Wed, May 27, 2015 at 8:46 AM, Gyula Fóra gyf...@apache.org wrote: Hey, I would also strongly prefer the second option, users need to have the option to force cancel a program in case of something unwanted behaviour. Cheers, Gyula Matthias J. Sax mj...@informatik.hu-berlin.de ezt írta (időpont: 2015. máj. 27., Sze, 1:20): Hi, currently, the only way to stop a streaming job is to cancel the job, This has multiple disadvantage: 1) a clean stopping is not possible (see https://issues.apache.org/jira/browse/FLINK-1929 -- I think a clean stop is a pre-requirement for FLINK-1929) and 2) as a minor issue, all canceled jobs are listed as canceled in the history (what is somewhat confusing for the user -- at least it was for me when I started to work with Flink Streaming). This issue was raised a few times already, however, no final conclusion was there (if I remember correctly). I could not find a JIRA for it either. From my understanding of the system, there would be two ways to implement a nice way for stopping streaming jobs: 1) Tasks can be distinguished between batch and streaming - canceling a batch jobs works as always - canceling a streaming job only send a canceling signal to the sources, and waits until the job finishes (ie, sources stop emitting data and finish regularly, triggering the finishing of all operators). For this case, streaming jobs are stopped in a clean way (as is the input would have be finite) and the job will be listed as finished in the history regularly. This approach has the advantage, that it should be simpler to implement. However, the disadvantages are (1) a hard canceling of jobs is not possible any more, and (2) Flink must be able to distinguishes batch and streaming jobs (I don't think Flink runtime can distinguish both right now?) 2) A new message terminate (or similar) is introduced, that can only be used for streaming jobs (would be ignored for batch jobs) that stops the sources and waits until the job finishes regularly. This approach has the advantage, that current system behavior is preserved (it only adds a few feature). The disadvantage is, that all clients need to be touched and it must be clear to the user, that terminate does not work for streaming jobs. If an error/warning should be raised if a user tries to terminate a batch job, Flink must be able to distinguish between batch and streaming jobs, too. As an alternative, terminate on batch jobs could be interpreted as cancel, too. I personally think, that the second approach is better. Please give feedback. If we can get to a conclusion how to implement it, I would like to work on it. -Matthias
Re: [DISCUSS] Canceling Streaming Jobs
Stephan, not sure what you mean by this exactly... But I guess, this a an add-on that can be done later. Seems to be related to https://issues.apache.org/jira/browse/FLINK-1929 I will open a JIRA for the new terminate message and assign it to myself. -Matthias On 05/27/2015 12:36 PM, Stephan Ewen wrote: +1 for the second option. How about we allow to pass a flag that indicates whether a checkpoint should be taken together with the canceling? On Wed, May 27, 2015 at 12:27 PM, Aljoscha Krettek aljos...@apache.org wrote: I would also prefer the second option. The first is rather a hack but not an option. :D On May 27, 2015 9:14 AM, Márton Balassi balassi.mar...@gmail.com wrote: +1 for the second option: It would also provide possibility to properly commit a state checkpoint after the terminate message was triggered. In some cases this can be a desirable behaviour. On Wed, May 27, 2015 at 8:46 AM, Gyula Fóra gyf...@apache.org wrote: Hey, I would also strongly prefer the second option, users need to have the option to force cancel a program in case of something unwanted behaviour. Cheers, Gyula Matthias J. Sax mj...@informatik.hu-berlin.de ezt írta (időpont: 2015. máj. 27., Sze, 1:20): Hi, currently, the only way to stop a streaming job is to cancel the job, This has multiple disadvantage: 1) a clean stopping is not possible (see https://issues.apache.org/jira/browse/FLINK-1929 -- I think a clean stop is a pre-requirement for FLINK-1929) and 2) as a minor issue, all canceled jobs are listed as canceled in the history (what is somewhat confusing for the user -- at least it was for me when I started to work with Flink Streaming). This issue was raised a few times already, however, no final conclusion was there (if I remember correctly). I could not find a JIRA for it either. From my understanding of the system, there would be two ways to implement a nice way for stopping streaming jobs: 1) Tasks can be distinguished between batch and streaming - canceling a batch jobs works as always - canceling a streaming job only send a canceling signal to the sources, and waits until the job finishes (ie, sources stop emitting data and finish regularly, triggering the finishing of all operators). For this case, streaming jobs are stopped in a clean way (as is the input would have be finite) and the job will be listed as finished in the history regularly. This approach has the advantage, that it should be simpler to implement. However, the disadvantages are (1) a hard canceling of jobs is not possible any more, and (2) Flink must be able to distinguishes batch and streaming jobs (I don't think Flink runtime can distinguish both right now?) 2) A new message terminate (or similar) is introduced, that can only be used for streaming jobs (would be ignored for batch jobs) that stops the sources and waits until the job finishes regularly. This approach has the advantage, that current system behavior is preserved (it only adds a few feature). The disadvantage is, that all clients need to be touched and it must be clear to the user, that terminate does not work for streaming jobs. If an error/warning should be raised if a user tries to terminate a batch job, Flink must be able to distinguish between batch and streaming jobs, too. As an alternative, terminate on batch jobs could be interpreted as cancel, too. I personally think, that the second approach is better. Please give feedback. If we can get to a conclusion how to implement it, I would like to work on it. -Matthias signature.asc Description: OpenPGP digital signature
Greetings and help regarding my first issue
Hi! I am new to the open source would and wanted to start my journey with Flink. I found an issue that I think would be good for me: https://issues.apache.org/jira/browse/FLINK-2077 I had a few queries regarding it and it would be great if you could help with them. 1. The issue talks about cleaning and refactoring the code. Are there any code writing and refactoring guidelines that Apache or Flink follows that I should learn? Thanks! Best regards, Vijendra
Storm compatibility layer currently does not support Storm's SimpleJoin example
Hey everyone, I experimented with the Storm compatibility layer Matthias wrote, and ran some Storm examples on Flink. I found that Storm's SimpleJoin example does not work. I suppose it is because of the multiple input streams. I'm willing to add another example instead. Right now, I'm getting it through Aljoscha's streaming refactor. Peter
[jira] [Created] (FLINK-2098) Checkpoint barrier initiation at source is not aligned with snapshotting
Stephan Ewen created FLINK-2098: --- Summary: Checkpoint barrier initiation at source is not aligned with snapshotting Key: FLINK-2098 URL: https://issues.apache.org/jira/browse/FLINK-2098 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Blocker Fix For: 0.9 The stream source does not properly align the emission of checkpoint barriers with the drawing of snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2102) Add predict operation for LabeledVector
Theodore Vasiloudis created FLINK-2102: -- Summary: Add predict operation for LabeledVector Key: FLINK-2102 URL: https://issues.apache.org/jira/browse/FLINK-2102 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Priority: Minor Fix For: 0.9 Currently we can only call predict on DataSet[V : Vector]. A lot of times though we have a DataSet[LabeledVector] that we split into a train and test set. We should be able to make predictions on the test DataSet[LabeledVector] without having to transform it into a DataSet[Vector] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: SQL on Flink
On 27 May 2015, at 17:05, Timo Walther twal...@apache.org wrote: It's rather passion for the future of the project than passion for SQL ;-) I always try to think like someone from the economy. And IMO the guys from economy are still thinking in SQL. If you want to persuade someone coming from the SQL world, you should offer a SQL interface to run legacy code first (similar to Hadoop operators). Rewriting old queries in Table API is not very convenient. I share Stephans opinion. Building both APIs concurrently would act as a good source to test and extend the Table API. Currently, the Table API is half-done, but I think the goal is to have SQL functionality. I can implement an SQL operator and extend the Table API if functionality is missing. Very exiting! :-) +1 As suggested, I think the best thing is to do this hand-in-hand with the Table API. I don't think that there was any real disagreement. Everyone agrees that the SQL layer should be built on top of the Table API, which is great for both the Table API and the SQL layer. :-)
Re: SQL on Flink
very excited to see this starting! On Wed, May 27, 2015 at 6:06 PM, Ufuk Celebi u...@apache.org wrote: On 27 May 2015, at 17:05, Timo Walther twal...@apache.org wrote: It's rather passion for the future of the project than passion for SQL ;-) I always try to think like someone from the economy. And IMO the guys from economy are still thinking in SQL. If you want to persuade someone coming from the SQL world, you should offer a SQL interface to run legacy code first (similar to Hadoop operators). Rewriting old queries in Table API is not very convenient. I share Stephans opinion. Building both APIs concurrently would act as a good source to test and extend the Table API. Currently, the Table API is half-done, but I think the goal is to have SQL functionality. I can implement an SQL operator and extend the Table API if functionality is missing. Very exiting! :-) +1 As suggested, I think the best thing is to do this hand-in-hand with the Table API. I don't think that there was any real disagreement. Everyone agrees that the SQL layer should be built on top of the Table API, which is great for both the Table API and the SQL layer. :-)
Re: SQL on Flink
IMO, it is better to have one feature that is reasonably well developed instead of two half-baked features. That's why I proposed to advance the Table API a bit further before starting the next big thing. I played around with the Table API recently and I think it definitely needs a bit more contributor attention and more features to be actually usable. Also since all features of the SQL interface need to be included in the Table API (given we follow the SQL on Table approach) it makes sense IMO to push the Table API a bit further before going for the next thing. 2015-05-27 16:06 GMT+02:00 Stephan Ewen se...@apache.org: I see no reason why a SQL interface cannot be bootstrapped concurrently. It would initially not support many operations, but would act as a good source to test and drive functionality from the Table API. @Ted: I would like to learn a bit more about the stack and internal abstractions of Drill. It may make sense to reuse some of the query execution operators from Drill. I especially like the learning schema on the fly part of drill. Flink DataSets and Streams have a schema, but it may in several cases be a schema lower bound, like the greatest common superclass. Those cases may benefit big time from Drill's ability to refine schema on the fly. That may be useful also in the Table API, making it again available to LINQ-like programs, and SQL scripts. On Wed, May 27, 2015 at 3:49 PM, Robert Metzger rmetz...@apache.org wrote: I didn't know that paper... Thanks for sharing. I've worked on a SQL layer for Stratosphere some time ago, using Apache Calcite (called Optiq back then). I think the project provides a lot of very good tooling for creating a SQL layer. So if we decide to go for SQL on Flink, I would suggest to use Calcite. I can also help you a bit with Calcite to get started with it. I agree with Fabian that it would probably make more sense for now to enhance the Table API. I think the biggest limitation right now is that it only supports POJOs. We should also support Tuples (I know thats difficult to do), data from HCatalog (that includes parquet orc), JSON, ... Then, I would add filter and projection pushdown into the table API. On Tue, May 26, 2015 at 10:03 PM, Ted Dunning ted.dunn...@gmail.com wrote: It would also be relatively simple (I think) to retarget drill to Flink if Flink doesn't provide enough typing meta-data to do traditional SQL. On Tue, May 26, 2015 at 12:52 PM, Fabian Hueske fhue...@gmail.com wrote: Hi, Flink's Table API is pretty close to what SQL provides. IMO, the best approach would be to leverage that and build a SQL parser (maybe together with a logical optimizer) on top of the Table API. Parser (and optimizer) could be built using Apache Calcite which is providing exactly this. Since the Table API is still a fairly new component and not very feature rich, it might make sense to extend and strengthen it before putting something major on top. Cheers, Fabian 2015-05-26 21:38 GMT+02:00 Timo Walther twal...@apache.org: Hey everyone, I would be interested in having a complete SQL API in Flink. How is the status there? Is someone already working on it? If not, I would like to work on it. I found http://ijcsi.org/papers/IJCSI-12-1-1-169-174.pdf but I couldn't find anything on the mailing list or Jira. Otherwise I would open an issue and start a discussion about it there. Regards, Timo
Re: SQL on Flink
It's rather passion for the future of the project than passion for SQL ;-) I always try to think like someone from the economy. And IMO the guys from economy are still thinking in SQL. If you want to persuade someone coming from the SQL world, you should offer a SQL interface to run legacy code first (similar to Hadoop operators). Rewriting old queries in Table API is not very convenient. I share Stephans opinion. Building both APIs concurrently would act as a good source to test and extend the Table API. Currently, the Table API is half-done, but I think the goal is to have SQL functionality. I can implement an SQL operator and extend the Table API if functionality is missing. On 27.05.2015 16:41, Fabian Hueske wrote: +1 for committer passion! Please don't get me wrong, I think SQL on Flink would be a great feature. I just wanted to make the point that the Table API needs to mirror all SQL features, if SQL is implemented on top of the Table API. 2015-05-27 16:34 GMT+02:00 Kostas Tzoumas ktzou...@apache.org: I think Fabian's arguments make a lot of sense. However, if Timo *really wants* to start SQL on top of Table, that is what he will do a great job at :-) As usual, we can keep it in beta status in flink-staging until it is mature... and it will help create issues for the Table API and give direction to its development. Perhaps we will have a feature-poor SQL for a bit, then switch to hardening the Table API to support more features and then back to SQL. I'm just advocating for committer passion-first here :-) Perhaps Timo should weight in On Wed, May 27, 2015 at 4:19 PM, Fabian Hueske fhue...@gmail.com wrote: IMO, it is better to have one feature that is reasonably well developed instead of two half-baked features. That's why I proposed to advance the Table API a bit further before starting the next big thing. I played around with the Table API recently and I think it definitely needs a bit more contributor attention and more features to be actually usable. Also since all features of the SQL interface need to be included in the Table API (given we follow the SQL on Table approach) it makes sense IMO to push the Table API a bit further before going for the next thing. 2015-05-27 16:06 GMT+02:00 Stephan Ewen se...@apache.org: I see no reason why a SQL interface cannot be bootstrapped concurrently. It would initially not support many operations, but would act as a good source to test and drive functionality from the Table API. @Ted: I would like to learn a bit more about the stack and internal abstractions of Drill. It may make sense to reuse some of the query execution operators from Drill. I especially like the learning schema on the fly part of drill. Flink DataSets and Streams have a schema, but it may in several cases be a schema lower bound, like the greatest common superclass. Those cases may benefit big time from Drill's ability to refine schema on the fly. That may be useful also in the Table API, making it again available to LINQ-like programs, and SQL scripts. On Wed, May 27, 2015 at 3:49 PM, Robert Metzger rmetz...@apache.org wrote: I didn't know that paper... Thanks for sharing. I've worked on a SQL layer for Stratosphere some time ago, using Apache Calcite (called Optiq back then). I think the project provides a lot of very good tooling for creating a SQL layer. So if we decide to go for SQL on Flink, I would suggest to use Calcite. I can also help you a bit with Calcite to get started with it. I agree with Fabian that it would probably make more sense for now to enhance the Table API. I think the biggest limitation right now is that it only supports POJOs. We should also support Tuples (I know thats difficult to do), data from HCatalog (that includes parquet orc), JSON, ... Then, I would add filter and projection pushdown into the table API. On Tue, May 26, 2015 at 10:03 PM, Ted Dunning ted.dunn...@gmail.com wrote: It would also be relatively simple (I think) to retarget drill to Flink if Flink doesn't provide enough typing meta-data to do traditional SQL. On Tue, May 26, 2015 at 12:52 PM, Fabian Hueske fhue...@gmail.com wrote: Hi, Flink's Table API is pretty close to what SQL provides. IMO, the best approach would be to leverage that and build a SQL parser (maybe together with a logical optimizer) on top of the Table API. Parser (and optimizer) could be built using Apache Calcite which is providing exactly this. Since the Table API is still a fairly new component and not very feature rich, it might make sense to extend and strengthen it before putting something major on top. Cheers, Fabian 2015-05-26 21:38 GMT+02:00 Timo Walther twal...@apache.org: Hey everyone, I would be interested in having a complete SQL API in Flink. How is the status there? Is someone already working on it? If not, I would like to work on it. I found http://ijcsi.org/papers/IJCSI-12-1-1-169-174.pdf but I
[jira] [Created] (FLINK-2101) Scheme Inference doesn't work for Tuple5
Rico Bergmann created FLINK-2101: Summary: Scheme Inference doesn't work for Tuple5 Key: FLINK-2101 URL: https://issues.apache.org/jira/browse/FLINK-2101 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: master Reporter: Rico Bergmann Calling addSink(new KafkaSinkTuple5String, String, String, Long, Double( localhost:9092, webtrends.ec1601, new Utils.TypeInformationSerializationSchemaTuple5String, String, String, Long, Double( new Tuple5String, String, String, Long, Double(), env.getConfig(; gives me an Exception stating, that the generic type infos are not given. Exception in thread main org.apache.flink.api.common.functions.InvalidTypesException: Tuple needs to be parameterized by using generics. at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:396) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:339) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:318) at org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo.init(ObjectArrayTypeInfo.java:45) at org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo.getInfoFor(ObjectArrayTypeInfo.java:167) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1150) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1122) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:1476) at org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:1446) at org.apache.flink.streaming.connectors.kafka.Utils$TypeInformationSerializationSchema.init(Utils.java:37) at de.otto.streamexample.WCExample.main(WCExample.java:132) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: SQL on Flink
+1 for committer passion! Please don't get me wrong, I think SQL on Flink would be a great feature. I just wanted to make the point that the Table API needs to mirror all SQL features, if SQL is implemented on top of the Table API. 2015-05-27 16:34 GMT+02:00 Kostas Tzoumas ktzou...@apache.org: I think Fabian's arguments make a lot of sense. However, if Timo *really wants* to start SQL on top of Table, that is what he will do a great job at :-) As usual, we can keep it in beta status in flink-staging until it is mature... and it will help create issues for the Table API and give direction to its development. Perhaps we will have a feature-poor SQL for a bit, then switch to hardening the Table API to support more features and then back to SQL. I'm just advocating for committer passion-first here :-) Perhaps Timo should weight in On Wed, May 27, 2015 at 4:19 PM, Fabian Hueske fhue...@gmail.com wrote: IMO, it is better to have one feature that is reasonably well developed instead of two half-baked features. That's why I proposed to advance the Table API a bit further before starting the next big thing. I played around with the Table API recently and I think it definitely needs a bit more contributor attention and more features to be actually usable. Also since all features of the SQL interface need to be included in the Table API (given we follow the SQL on Table approach) it makes sense IMO to push the Table API a bit further before going for the next thing. 2015-05-27 16:06 GMT+02:00 Stephan Ewen se...@apache.org: I see no reason why a SQL interface cannot be bootstrapped concurrently. It would initially not support many operations, but would act as a good source to test and drive functionality from the Table API. @Ted: I would like to learn a bit more about the stack and internal abstractions of Drill. It may make sense to reuse some of the query execution operators from Drill. I especially like the learning schema on the fly part of drill. Flink DataSets and Streams have a schema, but it may in several cases be a schema lower bound, like the greatest common superclass. Those cases may benefit big time from Drill's ability to refine schema on the fly. That may be useful also in the Table API, making it again available to LINQ-like programs, and SQL scripts. On Wed, May 27, 2015 at 3:49 PM, Robert Metzger rmetz...@apache.org wrote: I didn't know that paper... Thanks for sharing. I've worked on a SQL layer for Stratosphere some time ago, using Apache Calcite (called Optiq back then). I think the project provides a lot of very good tooling for creating a SQL layer. So if we decide to go for SQL on Flink, I would suggest to use Calcite. I can also help you a bit with Calcite to get started with it. I agree with Fabian that it would probably make more sense for now to enhance the Table API. I think the biggest limitation right now is that it only supports POJOs. We should also support Tuples (I know thats difficult to do), data from HCatalog (that includes parquet orc), JSON, ... Then, I would add filter and projection pushdown into the table API. On Tue, May 26, 2015 at 10:03 PM, Ted Dunning ted.dunn...@gmail.com wrote: It would also be relatively simple (I think) to retarget drill to Flink if Flink doesn't provide enough typing meta-data to do traditional SQL. On Tue, May 26, 2015 at 12:52 PM, Fabian Hueske fhue...@gmail.com wrote: Hi, Flink's Table API is pretty close to what SQL provides. IMO, the best approach would be to leverage that and build a SQL parser (maybe together with a logical optimizer) on top of the Table API. Parser (and optimizer) could be built using Apache Calcite which is providing exactly this. Since the Table API is still a fairly new component and not very feature rich, it might make sense to extend and strengthen it before putting something major on top. Cheers, Fabian 2015-05-26 21:38 GMT+02:00 Timo Walther twal...@apache.org: Hey everyone, I would be interested in having a complete SQL API in Flink. How is the status there? Is someone already working on it? If not, I would like to work on it. I found http://ijcsi.org/papers/IJCSI-12-1-1-169-174.pdf but I couldn't find anything on the mailing list or Jira. Otherwise I would open an issue and start a discussion about it there. Regards, Timo
Re: SQL on Flink
I think Fabian's arguments make a lot of sense. However, if Timo *really wants* to start SQL on top of Table, that is what he will do a great job at :-) As usual, we can keep it in beta status in flink-staging until it is mature... and it will help create issues for the Table API and give direction to its development. Perhaps we will have a feature-poor SQL for a bit, then switch to hardening the Table API to support more features and then back to SQL. I'm just advocating for committer passion-first here :-) Perhaps Timo should weight in On Wed, May 27, 2015 at 4:19 PM, Fabian Hueske fhue...@gmail.com wrote: IMO, it is better to have one feature that is reasonably well developed instead of two half-baked features. That's why I proposed to advance the Table API a bit further before starting the next big thing. I played around with the Table API recently and I think it definitely needs a bit more contributor attention and more features to be actually usable. Also since all features of the SQL interface need to be included in the Table API (given we follow the SQL on Table approach) it makes sense IMO to push the Table API a bit further before going for the next thing. 2015-05-27 16:06 GMT+02:00 Stephan Ewen se...@apache.org: I see no reason why a SQL interface cannot be bootstrapped concurrently. It would initially not support many operations, but would act as a good source to test and drive functionality from the Table API. @Ted: I would like to learn a bit more about the stack and internal abstractions of Drill. It may make sense to reuse some of the query execution operators from Drill. I especially like the learning schema on the fly part of drill. Flink DataSets and Streams have a schema, but it may in several cases be a schema lower bound, like the greatest common superclass. Those cases may benefit big time from Drill's ability to refine schema on the fly. That may be useful also in the Table API, making it again available to LINQ-like programs, and SQL scripts. On Wed, May 27, 2015 at 3:49 PM, Robert Metzger rmetz...@apache.org wrote: I didn't know that paper... Thanks for sharing. I've worked on a SQL layer for Stratosphere some time ago, using Apache Calcite (called Optiq back then). I think the project provides a lot of very good tooling for creating a SQL layer. So if we decide to go for SQL on Flink, I would suggest to use Calcite. I can also help you a bit with Calcite to get started with it. I agree with Fabian that it would probably make more sense for now to enhance the Table API. I think the biggest limitation right now is that it only supports POJOs. We should also support Tuples (I know thats difficult to do), data from HCatalog (that includes parquet orc), JSON, ... Then, I would add filter and projection pushdown into the table API. On Tue, May 26, 2015 at 10:03 PM, Ted Dunning ted.dunn...@gmail.com wrote: It would also be relatively simple (I think) to retarget drill to Flink if Flink doesn't provide enough typing meta-data to do traditional SQL. On Tue, May 26, 2015 at 12:52 PM, Fabian Hueske fhue...@gmail.com wrote: Hi, Flink's Table API is pretty close to what SQL provides. IMO, the best approach would be to leverage that and build a SQL parser (maybe together with a logical optimizer) on top of the Table API. Parser (and optimizer) could be built using Apache Calcite which is providing exactly this. Since the Table API is still a fairly new component and not very feature rich, it might make sense to extend and strengthen it before putting something major on top. Cheers, Fabian 2015-05-26 21:38 GMT+02:00 Timo Walther twal...@apache.org: Hey everyone, I would be interested in having a complete SQL API in Flink. How is the status there? Is someone already working on it? If not, I would like to work on it. I found http://ijcsi.org/papers/IJCSI-12-1-1-169-174.pdf but I couldn't find anything on the mailing list or Jira. Otherwise I would open an issue and start a discussion about it there. Regards, Timo
Re: SQL on Flink
+1 to what ufuk said. :D On May 27, 2015 6:13 PM, Kostas Tzoumas ktzou...@apache.org wrote: very excited to see this starting! On Wed, May 27, 2015 at 6:06 PM, Ufuk Celebi u...@apache.org wrote: On 27 May 2015, at 17:05, Timo Walther twal...@apache.org wrote: It's rather passion for the future of the project than passion for SQL ;-) I always try to think like someone from the economy. And IMO the guys from economy are still thinking in SQL. If you want to persuade someone coming from the SQL world, you should offer a SQL interface to run legacy code first (similar to Hadoop operators). Rewriting old queries in Table API is not very convenient. I share Stephans opinion. Building both APIs concurrently would act as a good source to test and extend the Table API. Currently, the Table API is half-done, but I think the goal is to have SQL functionality. I can implement an SQL operator and extend the Table API if functionality is missing. Very exiting! :-) +1 As suggested, I think the best thing is to do this hand-in-hand with the Table API. I don't think that there was any real disagreement. Everyone agrees that the SQL layer should be built on top of the Table API, which is great for both the Table API and the SQL layer. :-)
Re: [DISCUSS] Canceling Streaming Jobs
Hey, I would also strongly prefer the second option, users need to have the option to force cancel a program in case of something unwanted behaviour. Cheers, Gyula Matthias J. Sax mj...@informatik.hu-berlin.de ezt írta (időpont: 2015. máj. 27., Sze, 1:20): Hi, currently, the only way to stop a streaming job is to cancel the job, This has multiple disadvantage: 1) a clean stopping is not possible (see https://issues.apache.org/jira/browse/FLINK-1929 -- I think a clean stop is a pre-requirement for FLINK-1929) and 2) as a minor issue, all canceled jobs are listed as canceled in the history (what is somewhat confusing for the user -- at least it was for me when I started to work with Flink Streaming). This issue was raised a few times already, however, no final conclusion was there (if I remember correctly). I could not find a JIRA for it either. From my understanding of the system, there would be two ways to implement a nice way for stopping streaming jobs: 1) Tasks can be distinguished between batch and streaming - canceling a batch jobs works as always - canceling a streaming job only send a canceling signal to the sources, and waits until the job finishes (ie, sources stop emitting data and finish regularly, triggering the finishing of all operators). For this case, streaming jobs are stopped in a clean way (as is the input would have be finite) and the job will be listed as finished in the history regularly. This approach has the advantage, that it should be simpler to implement. However, the disadvantages are (1) a hard canceling of jobs is not possible any more, and (2) Flink must be able to distinguishes batch and streaming jobs (I don't think Flink runtime can distinguish both right now?) 2) A new message terminate (or similar) is introduced, that can only be used for streaming jobs (would be ignored for batch jobs) that stops the sources and waits until the job finishes regularly. This approach has the advantage, that current system behavior is preserved (it only adds a few feature). The disadvantage is, that all clients need to be touched and it must be clear to the user, that terminate does not work for streaming jobs. If an error/warning should be raised if a user tries to terminate a batch job, Flink must be able to distinguish between batch and streaming jobs, too. As an alternative, terminate on batch jobs could be interpreted as cancel, too. I personally think, that the second approach is better. Please give feedback. If we can get to a conclusion how to implement it, I would like to work on it. -Matthias
Adding custom Tuple to a DataSet
Hi Is there a way, where I can add a custom (newly created) Tuple to a new DataSet or already existing DataSet? DataSet set = env.fromElements (myCustomTuple); works fine, but only with same datatype in case of Tuple2 or higher. Tuple2String,Long creates a problem (as stated in JavaDoc it needs all elements to be of same type). Is there any workaround? Or using fromCollection is the only option? Thanks for your help in advance. Thanks and Regards Amit Pawar