[jira] [Created] (FLINK-1893) Add Scala support for Flink on Tez
Kostas Tzoumas created FLINK-1893: - Summary: Add Scala support for Flink on Tez Key: FLINK-1893 URL: https://issues.apache.org/jira/browse/FLINK-1893 Project: Flink Issue Type: Improvement Components: Flink on Tez Reporter: Kostas Tzoumas Assignee: Kostas Tzoumas Create Scala versions of LocalTezEnvironment and RemoteTezEnvironment -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1898) Add support for self-joins to Flink on Tez
Kostas Tzoumas created FLINK-1898: - Summary: Add support for self-joins to Flink on Tez Key: FLINK-1898 URL: https://issues.apache.org/jira/browse/FLINK-1898 Project: Flink Issue Type: Bug Components: Flink on Tez Reporter: Kostas Tzoumas Assignee: Kostas Tzoumas Self-joins currently are not supported by Flink on Tez due to [TEZ-1190|https://issues.apache.org/jira/browse/TEZ-1190]. We should find a workaround (e.g., create a dummy node). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Flink interactive Scala shell
I would also keep an eye on this issue from the Zeppelin project: https://issues.apache.org/jira/browse/ZEPPELIN-44 The needed infrastructure is going to be very similar On Thu, Apr 16, 2015 at 10:15 AM, Kostas Tzoumas ktzou...@apache.org wrote: Great, let us know if you run into any issues. Can you create a JIRA on the REPL and link to your repository for the community to track the status? On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s nikolaas.steenber...@gmail.com wrote: Thanks for the feedback guys! Apparently The Scala Shell compiles the Shell input to some kind of virtual directory. It should be possible to create a jar from it's content and then hand it over to Flink for execution in some way. I will further investigate.. cheers, Nikolaas 2015-04-15 11:20 GMT+02:00 Stephan Ewen se...@apache.org: To give a bit of context for the exception: To execute a program, the classes of the user functions need to be available the executing TaskManagers. - If you execute locally from the IDE, all classes are in the classpath anyways. - If you use the remote environment, you need to attach the jar file to environment. - In your case (repl), you need to make sure that the generated classes are given to the TaskManager. In that sense, the approach is probably similar to the case of executing with a remote environment - only that you do not have a jar file up front, but need to generate it on the fly. As Robert mentioned, https://github.com/apache/flink/pull/35 may have a first solution to that. Other approaches are also possible, like simply always bundling all classes in the directory where the repl puts its generated classes. Greetings, Stephan On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek aljos...@apache.org wrote: I will look into it once I have some time (end of this week, or next week probably) On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger rmetz...@apache.org wrote: Hey Nikolaas, Thank you for posting on the mailing list. I've met Nikolaas today in person and we were talking a bit about an interactive shell for Flink, potentially also an integration with Zeppelin. Great stuff I'm really looking forward to :) We were wondering if somebody from the list has some experience with the scala shell. I've pointed Nikolaas also to this PR: https://github.com/apache/flink/pull/35. Best, Robert On Tue, Apr 14, 2015 at 5:26 PM, nse sik nikolaas.steenber...@gmail.com wrote: Hi! I am trying to implement a scala shell for flink. I've started with a simple scala object who's main function will drop the user to the interactive scala shell (repl) at one point: import scala.tools.nsc.interpreter.ILoop import scala.tools.nsc.Settings object Job { def main(args: Array[String]) { val repl = new ILoop() repl.settings = new Settings() // enable this line to use scala in intellij repl.settings.usejavacp.value = true repl.createInterpreter() // start scala interpreter shell repl.process(repl.settings) repl.closeInterpreter() } } Now I am trying to execute the word count example as in: scala import org.apache.flink.api.scala._ scala val env = ExecutionEnvironment.getExecutionEnvironment scala val text = env.fromElements(To be, or not to be,--that is the question:--,Whether 'tis nobler in the mind to suffer, The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,) scala val counts = text.flatMap { _.toLowerCase.split(\\W+) }.map { (_, 1) }.groupBy(0).sum(1) scala counts.print() scala env.execute(Flink Scala Api Skeleton) However I am running into following error: env.execute(Flink Scala Api Skeleton) org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:187) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Flink interactive Scala shell
Great, let us know if you run into any issues. Can you create a JIRA on the REPL and link to your repository for the community to track the status? On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s nikolaas.steenber...@gmail.com wrote: Thanks for the feedback guys! Apparently The Scala Shell compiles the Shell input to some kind of virtual directory. It should be possible to create a jar from it's content and then hand it over to Flink for execution in some way. I will further investigate.. cheers, Nikolaas 2015-04-15 11:20 GMT+02:00 Stephan Ewen se...@apache.org: To give a bit of context for the exception: To execute a program, the classes of the user functions need to be available the executing TaskManagers. - If you execute locally from the IDE, all classes are in the classpath anyways. - If you use the remote environment, you need to attach the jar file to environment. - In your case (repl), you need to make sure that the generated classes are given to the TaskManager. In that sense, the approach is probably similar to the case of executing with a remote environment - only that you do not have a jar file up front, but need to generate it on the fly. As Robert mentioned, https://github.com/apache/flink/pull/35 may have a first solution to that. Other approaches are also possible, like simply always bundling all classes in the directory where the repl puts its generated classes. Greetings, Stephan On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek aljos...@apache.org wrote: I will look into it once I have some time (end of this week, or next week probably) On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger rmetz...@apache.org wrote: Hey Nikolaas, Thank you for posting on the mailing list. I've met Nikolaas today in person and we were talking a bit about an interactive shell for Flink, potentially also an integration with Zeppelin. Great stuff I'm really looking forward to :) We were wondering if somebody from the list has some experience with the scala shell. I've pointed Nikolaas also to this PR: https://github.com/apache/flink/pull/35. Best, Robert On Tue, Apr 14, 2015 at 5:26 PM, nse sik nikolaas.steenber...@gmail.com wrote: Hi! I am trying to implement a scala shell for flink. I've started with a simple scala object who's main function will drop the user to the interactive scala shell (repl) at one point: import scala.tools.nsc.interpreter.ILoop import scala.tools.nsc.Settings object Job { def main(args: Array[String]) { val repl = new ILoop() repl.settings = new Settings() // enable this line to use scala in intellij repl.settings.usejavacp.value = true repl.createInterpreter() // start scala interpreter shell repl.process(repl.settings) repl.closeInterpreter() } } Now I am trying to execute the word count example as in: scala import org.apache.flink.api.scala._ scala val env = ExecutionEnvironment.getExecutionEnvironment scala val text = env.fromElements(To be, or not to be,--that is the question:--,Whether 'tis nobler in the mind to suffer, The slings and arrows of outrageous fortune,Or to take arms against a sea of troubles,) scala val counts = text.flatMap { _.toLowerCase.split(\\W+) }.map { (_, 1) }.groupBy(0).sum(1) scala counts.print() scala env.execute(Flink Scala Api Skeleton) However I am running into following error: env.execute(Flink Scala Api Skeleton) org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) at org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:187) at org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) Caused by: java.lang.RuntimeException: The type serializer factory could not load its parameters from the configuration
[jira] [Created] (FLINK-1897) Add accummulators and counters feature to Flink on Tez
Kostas Tzoumas created FLINK-1897: - Summary: Add accummulators and counters feature to Flink on Tez Key: FLINK-1897 URL: https://issues.apache.org/jira/browse/FLINK-1897 Project: Flink Issue Type: Improvement Components: Flink on Tez Reporter: Kostas Tzoumas Assignee: Kostas Tzoumas -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1895) Add task chaining to Flink on Tez
Kostas Tzoumas created FLINK-1895: - Summary: Add task chaining to Flink on Tez Key: FLINK-1895 URL: https://issues.apache.org/jira/browse/FLINK-1895 Project: Flink Issue Type: Improvement Components: Flink on Tez Reporter: Kostas Tzoumas Assignee: Kostas Tzoumas Priority: Minor Currently, every runtime operator is wrapped inside a Tez processor. We should implement some form of task chaining, and measure the performance difference -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1894) Add Tez execution mode to Flink command-line tools
Kostas Tzoumas created FLINK-1894: - Summary: Add Tez execution mode to Flink command-line tools Key: FLINK-1894 URL: https://issues.apache.org/jira/browse/FLINK-1894 Project: Flink Issue Type: Improvement Components: Flink on Tez Reporter: Kostas Tzoumas Assignee: Kostas Tzoumas Priority: Minor To run Flink programs on Tez, users currently need to (1) Specify the main class by env.registerMainClass (2) Package the job in a fat jar (3) User hadoop jar to submit the job to YARN This is somewhat problematic, and certainly a worse user experience than regular Flink jobs. Tez execution mode should be part of Flink's command-line tools -- This message was sent by Atlassian JIRA (v6.3.4#6332)
TableAPI - Join on two keys
Hi, I want to join two tables in the following way: case class WeightedEdge(src: Int, target: Int, weight: Double) case class Community(communityID: Int, nodeID: Int) case class CommunitySumTotal(communityID: Int, sumTotal: Double) val communities: DataSet[Community] val weightedEdges: DataSet[WeightedEdge] val communitiesTable = communities.toTable val weightedEdgesTable = weightedEdges.toTable val sumTotal = communitiesTable.join(weightedEdgesTable) .where(nodeID = src nodeID = target) .groupBy('communityID) .select(communityID, weight.sum as sumTotal).toSet[CommunitySumTotal] but I get this exception: Exception in thread main org.apache.flink.api.common.InvalidProgramException: The types of the key fields do not match: The number of specified keys is different. at org.apache.flink.api.java.operators.JoinOperator.init(JoinOperator.java:96) at org.apache.flink.api.java.operators.JoinOperator$EquiJoin.init(JoinOperator.java:197) at org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) Moreover when I use the following where clause: .where(nodeID = src || nodeID = target) I get another error: Exception in thread main org.apache.flink.api.table.ExpressionException: Could not derive equi-join predicates for predicate 'nodeID === 'src || 'nodeID === 'target. at org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) Apart from that the TableApi seems really promising. It's a really great tool. Thank you for your help, Felix
Re: About Operator and OperatorBase
I share Stephans opinion. By the way, we could also find a common name for operators with two inputs. Sometimes it's TwoInputXXX, DualInputXXX, BinaryInputXXX... pretty inconsistent. On 15.04.2015 17:48, Till Rohrmann wrote: I would also be in favour of making the distinction between the API and common API layer more clear by using different names. This will ease the understanding of the source code. In the wake of a possible renaming we could also get rid of the legacy code org.apache.flink.optimizer.dag.MatchNode and rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to make the naming more consistent. On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi u...@apache.org wrote: On 15 Apr 2015, at 15:01, Stephan Ewen se...@apache.org wrote: I think we can rename the base operators. Renaming the subclass of DataSet would be extremely api breaking. I think that is not worth it. Oh, that's right. We return MapOperator for DataSet operations. Stephan's point makes sense.
[jira] [Created] (FLINK-1899) Table API Bug
Felix Neutatz created FLINK-1899: Summary: Table API Bug Key: FLINK-1899 URL: https://issues.apache.org/jira/browse/FLINK-1899 Project: Flink Issue Type: Bug Components: Expression API Affects Versions: 0.9 Reporter: Felix Neutatz Priority: Minor I want to run the following program {code:scala} case class WeightedEdge(src: Int, target: Int, weight: Double) case class Community(communityID: Int, nodeID: Int) case class CommunitySumTotal(communityID: Int, sumTotal: Double) val communities: DataSet[Community] val weightedEdges: DataSet[WeightedEdge] val communitiesTable = communities.toTable val weightedEdgesTable = weightedEdges.toTable val sumTotal = communitiesTable.join(weightedEdgesTable) .where(nodeID = src) .groupBy('communityID) .select('communityID, 'weight.sum).toSet[CommunitySumTotal] {code} but I get this exception. In my opinion the outputs do have the same field types. {code:xml} Exception in thread main org.apache.flink.api.table.ExpressionException: Expression result type org.apache.flink.api.table.Row(communityID: Integer, intermediate.1: Double) does not have the samefields as output type io.ssc.trackthetrackers.analysis.algorithms.CommunitySumTotal(communityID: Integer, sumTotal: Double) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:105) at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-1900) Table API documentation example does not work
Timo Walther created FLINK-1900: --- Summary: Table API documentation example does not work Key: FLINK-1900 URL: https://issues.apache.org/jira/browse/FLINK-1900 Project: Flink Issue Type: Bug Components: Documentation Reporter: Timo Walther Running the word count example leads to {code} Exception in thread main org.apache.flink.api.table.ExpressionException: Expression result type org.apache.flink.api.table.Row(word: String, intermediate.1: Integer) does not have the samefields as output type io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$WC$3(word: String, count: Integer) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:112) at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.main(LouvainCommunityDetection.scala:36) at io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection.main(LouvainCommunityDetection.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Rework of the window-join semantics
As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere Streams does: symmetric hash join. From [1]: When a tuple is received on an input port, it is inserted into the window corresponding to the input port, which causes the window to trigger. As part of the trigger processing, the tuple is compared against all tuples inside the window of the opposing input port. If the tuples match, then an output tuple will be produced for each match. If at least one output was generated, a window punctuation will be generated after all the outputs. Cheers, Asterios [1] http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax mj...@informatik.hu-berlin.de wrote: Hi Paris, thanks for the pointer to the Naiad paper. That is quite interesting. The paper I mentioned [1], does not describe the semantics in detail; it is more about the implementation for the stream-joins. However, it uses the same semantics (from my understanding) as proposed by Gyula. -Matthias [1] Kang, Naughton, Viglas. Evaluationg Window Joins over Unbounded Streams. VLDB 2002. On 04/07/2015 12:38 PM, Paris Carbone wrote: Hello Matthias, Sure, ordering guarantees are indeed a tricky thing, I recall having that discussion back in TU Berlin. Bear in mind thought that DataStream, our abstract data type, represents a *partitioned* unbounded sequence of events. There are no *global* ordering guarantees made whatsoever in that model across partitions. If you see it more generally there are many “race conditions” in a distributed execution graph of vertices that process multiple inputs asynchronously, especially when you add joins and iterations into the mix (how do you deal with reprocessing “old” tuples that iterate in the graph). Btw have you checked the Naiad paper [1]? Stephan cited a while ago and it is quite relevant to that discussion. Also, can you cite the paper with the joining semantics you are referring to? That would be of good help I think. Paris [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf On 07 Apr 2015, at 11:50, Matthias J. Sax mj...@informatik.hu-berlin.de mailto:mj...@informatik.hu-berlin.de wrote: Hi @all, please keep me in the loop for this work. I am highly interested and I want to help on it. My initial thoughts are as follows: 1) Currently, system timestamps are used and the suggested approach can be seen as state-of-the-art (there is actually a research paper using the exact same join semantic). Of course, the current approach is inherently non-deterministic. The advantage is, that there is no overhead in keeping track of the order of records and the latency should be very low. (Additionally, state-recovery is simplified. Because, the processing in inherently non-deterministic, recovery can be done with relaxed guarantees). 2) The user should be able to switch on deterministic processing, ie, records are timestamped (either externally when generated, or timestamped at the sources). Because deterministic processing adds some overhead, the user should decide for it actively. In this case, the order must be preserved in each re-distribution step (merging is sufficient, if order is preserved within each incoming channel). Furthermore, deterministic processing can be achieved by sound window semantics (and there is a bunch of them). Even for single-stream-windows it's a tricky problem; for join-windows it's even harder. From my point of view, it is less important which semantics are chosen; however, the user must be aware how it works. The most tricky part for deterministic processing, is to deal with duplicate timestamps (which cannot be avoided). The timestamping for (intermediate) result tuples, is also an important question to be answered. -Matthias On 04/07/2015 11:37 AM, Gyula Fóra wrote: Hey, I agree with Kostas, if we define the exact semantics how this works, this is not more ad-hoc than any other stateful operator with multiple inputs. (And I don't think any other system support something similar) We need to make some design choices that are similar to the issues we had for windowing. We need to chose how we want to evaluate the windowing policies (global or local) because that affects what kind of policies can be parallel, but I can work on these things. I think this is an amazing feature, so I wouldn't necessarily rush the implementation for 0.9 though. And thanks for helping writing these down. Gyula On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas ktzou...@apache.org mailto:ktzou...@apache.org wrote: Yes, we should write these semantics down. I volunteer to help. I don't
Re: About Operator and OperatorBase
+1 for keeping the API. Even though this will not change your initial concern much, Aljoscha :) I agree with you that it would be more consistent to call the result of an operator OperatorDataSet. On Thu, Apr 16, 2015 at 3:16 PM, Fabian Hueske fhue...@gmail.com wrote: Renaming the core operators is fine with me, but I would not touch API facing classes. A big +1 for Timo's suggestion. 2015-04-16 6:30 GMT-05:00 Timo Walther twal...@apache.org: I share Stephans opinion. By the way, we could also find a common name for operators with two inputs. Sometimes it's TwoInputXXX, DualInputXXX, BinaryInputXXX... pretty inconsistent. On 15.04.2015 17:48, Till Rohrmann wrote: I would also be in favour of making the distinction between the API and common API layer more clear by using different names. This will ease the understanding of the source code. In the wake of a possible renaming we could also get rid of the legacy code org.apache.flink.optimizer.dag.MatchNode and rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to make the naming more consistent. On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi u...@apache.org wrote: On 15 Apr 2015, at 15:01, Stephan Ewen se...@apache.org wrote: I think we can rename the base operators. Renaming the subclass of DataSet would be extremely api breaking. I think that is not worth it. Oh, that's right. We return MapOperator for DataSet operations. Stephan's point makes sense.
Re: About Operator and OperatorBase
Renaming the core operators is fine with me, but I would not touch API facing classes. A big +1 for Timo's suggestion. 2015-04-16 6:30 GMT-05:00 Timo Walther twal...@apache.org: I share Stephans opinion. By the way, we could also find a common name for operators with two inputs. Sometimes it's TwoInputXXX, DualInputXXX, BinaryInputXXX... pretty inconsistent. On 15.04.2015 17:48, Till Rohrmann wrote: I would also be in favour of making the distinction between the API and common API layer more clear by using different names. This will ease the understanding of the source code. In the wake of a possible renaming we could also get rid of the legacy code org.apache.flink.optimizer.dag.MatchNode and rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to make the naming more consistent. On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi u...@apache.org wrote: On 15 Apr 2015, at 15:01, Stephan Ewen se...@apache.org wrote: I think we can rename the base operators. Renaming the subclass of DataSet would be extremely api breaking. I think that is not worth it. Oh, that's right. We return MapOperator for DataSet operations. Stephan's point makes sense.
[jira] [Created] (FLINK-1901) Create sample operator for Dataset
Theodore Vasiloudis created FLINK-1901: -- Summary: Create sample operator for Dataset Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Gelly] Vertex-centric iteration updateVertex does not get called
Hello Gabor, Yes, currently updateVertex only gets called when a new message was received. Could you please describe the logic behind your triangle count? The one I know is described at the beginning of page 1643 in this article: http://www.cc.gatech.edu/~bader/papers/GraphBSPonXMT-MTAAP2013.pdf As you can see, each time(for all the three supersteps), a message gets sent. Here is my suboptimal implementation of the algorithm in the paper (it's supposed to prove that high degree nodes overload the system): https://github.com/andralungu/gelly-partitioning/commit/224cb9b6917c2320e16a657a549b2a0313aeb300 It needs some serious rebasing. I'll get to it this weekend :). Nevertheless, it should serve as a starting point for your implementation. Let us know if you have further questions! Andra P.S. I'm not sure calling vertexUpdate with an empty message iterator would be so straightforward to implement. I'll have to look into it a bit more once I get some spare time :) On Thu, Apr 16, 2015 at 9:44 PM, Hermann Gábor reckone...@gmail.com wrote: Hi all, I am implementing a simple triangle counting example for a workshop with vertex-centric iteration and I found that the updateVertex method only gets called if there are new messages for that vertex. Is it the expected behavior? I know that the iteration should stop for the given vertex when the we don't change the vertex value but (at least in my case) it would be useful if the updateVertex got called with an empty message iterator. I guess receiving zero messages might have a meaning in other cases too, and the user would like to update the vertex value. Does changing the current behavior make sense? Cheers, Gabor