[jira] [Created] (FLINK-1893) Add Scala support for Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1893:
-

 Summary: Add Scala support for Flink on Tez
 Key: FLINK-1893
 URL: https://issues.apache.org/jira/browse/FLINK-1893
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas


Create Scala versions of LocalTezEnvironment and RemoteTezEnvironment



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1898) Add support for self-joins to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1898:
-

 Summary: Add support for self-joins to Flink on Tez
 Key: FLINK-1898
 URL: https://issues.apache.org/jira/browse/FLINK-1898
 Project: Flink
  Issue Type: Bug
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas


Self-joins currently are not supported by Flink on Tez due to 
[TEZ-1190|https://issues.apache.org/jira/browse/TEZ-1190]. We should find a 
workaround (e.g., create a dummy node).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Flink interactive Scala shell

2015-04-16 Thread Robert Metzger
I would also keep an eye on this issue from the Zeppelin project:
https://issues.apache.org/jira/browse/ZEPPELIN-44
The needed infrastructure is going to be very similar

On Thu, Apr 16, 2015 at 10:15 AM, Kostas Tzoumas ktzou...@apache.org
wrote:

 Great, let us know if you run into any issues.

 Can you create a JIRA on the REPL and link to your repository for the
 community to track the status?

 On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s 
 nikolaas.steenber...@gmail.com
 wrote:

  Thanks for the feedback guys!
  Apparently The Scala Shell compiles the Shell input to some kind of
 virtual
  directory.
  It should be possible to create a jar from it's content and then hand it
  over to Flink for execution in some way.
  I will further investigate..
 
  cheers,
  Nikolaas
 
  2015-04-15 11:20 GMT+02:00 Stephan Ewen se...@apache.org:
 
   To give a bit of context for the exception:
  
   To execute a program, the classes of the user functions need to be
   available the executing TaskManagers.
  
- If you execute locally from the IDE, all classes are in the
 classpath
   anyways.
- If you use the remote environment, you need to attach the jar file
 to
   environment.
  
- In your case (repl), you need to make sure that the generated
 classes
   are given to the TaskManager. In that sense, the approach is probably
   similar to the case of executing with a remote environment - only that
  you
   do not have a jar file up front, but need to generate it on the fly. As
   Robert mentioned, https://github.com/apache/flink/pull/35 may have a
  first
   solution to that. Other approaches are also possible, like simply
 always
   bundling all classes in the directory where the repl puts its generated
   classes.
  
   Greetings,
   Stephan
  
  
   On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek 
 aljos...@apache.org
   wrote:
  
I will look into it once I have some time (end of this week, or next
week probably)
   
On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger rmetz...@apache.org
 
wrote:
 Hey Nikolaas,

 Thank you for posting on the mailing list. I've met Nikolaas today
 in
 person and we were talking a bit about an interactive shell for
  Flink,
 potentially also an integration with Zeppelin.

 Great stuff I'm really looking forward to :)

 We were wondering if somebody from the list has some experience
 with
   the
 scala shell.
 I've pointed Nikolaas also to this PR:
 https://github.com/apache/flink/pull/35.

 Best,
 Robert


 On Tue, Apr 14, 2015 at 5:26 PM, nse sik 
   nikolaas.steenber...@gmail.com

 wrote:

 Hi!
 I am trying to implement a scala shell for flink.

 I've started with a simple scala object who's main function will
  drop
the
 user to the interactive scala shell (repl) at one point:




 import scala.tools.nsc.interpreter.ILoop
 import scala.tools.nsc.Settings

 object Job {
   def main(args: Array[String]) {

 val repl = new ILoop()
 repl.settings = new Settings()

 // enable this line to use scala in intellij
 repl.settings.usejavacp.value = true

 repl.createInterpreter()

 // start scala interpreter shell
 repl.process(repl.settings)

 repl.closeInterpreter()
 }
   }




 Now I am trying to execute the word count example as in:




 scala import org.apache.flink.api.scala._

 scala val env = ExecutionEnvironment.getExecutionEnvironment

 scala val text = env.fromElements(To be, or not to be,--that is
  the
 question:--,Whether 'tis nobler in the mind to suffer, The
  slings
and
 arrows of outrageous fortune,Or to take arms against a sea of
troubles,)

 scala val counts = text.flatMap { _.toLowerCase.split(\\W+)
  }.map {
(_,
 1) }.groupBy(0).sum(1)

 scala counts.print()

 scala env.execute(Flink Scala Api Skeleton)






 However I am running into following error:

 env.execute(Flink Scala Api Skeleton)
 org.apache.flink.runtime.client.JobExecutionException:
 java.lang.RuntimeException: The initialization of the DataSource's
outputs
 caused an error: The type serializer factory could not load its
parameters
 from the configuration due to missing classes.
 at


   
  
 
 org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
 at


   
  
 
 org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:187)
 at


   
  
 
 org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at


   
  
 
 

Re: Flink interactive Scala shell

2015-04-16 Thread Kostas Tzoumas
Great, let us know if you run into any issues.

Can you create a JIRA on the REPL and link to your repository for the
community to track the status?

On Wed, Apr 15, 2015 at 4:23 PM, Nikolaas s nikolaas.steenber...@gmail.com
wrote:

 Thanks for the feedback guys!
 Apparently The Scala Shell compiles the Shell input to some kind of virtual
 directory.
 It should be possible to create a jar from it's content and then hand it
 over to Flink for execution in some way.
 I will further investigate..

 cheers,
 Nikolaas

 2015-04-15 11:20 GMT+02:00 Stephan Ewen se...@apache.org:

  To give a bit of context for the exception:
 
  To execute a program, the classes of the user functions need to be
  available the executing TaskManagers.
 
   - If you execute locally from the IDE, all classes are in the classpath
  anyways.
   - If you use the remote environment, you need to attach the jar file to
  environment.
 
   - In your case (repl), you need to make sure that the generated classes
  are given to the TaskManager. In that sense, the approach is probably
  similar to the case of executing with a remote environment - only that
 you
  do not have a jar file up front, but need to generate it on the fly. As
  Robert mentioned, https://github.com/apache/flink/pull/35 may have a
 first
  solution to that. Other approaches are also possible, like simply always
  bundling all classes in the directory where the repl puts its generated
  classes.
 
  Greetings,
  Stephan
 
 
  On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek aljos...@apache.org
  wrote:
 
   I will look into it once I have some time (end of this week, or next
   week probably)
  
   On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger rmetz...@apache.org
   wrote:
Hey Nikolaas,
   
Thank you for posting on the mailing list. I've met Nikolaas today in
person and we were talking a bit about an interactive shell for
 Flink,
potentially also an integration with Zeppelin.
   
Great stuff I'm really looking forward to :)
   
We were wondering if somebody from the list has some experience with
  the
scala shell.
I've pointed Nikolaas also to this PR:
https://github.com/apache/flink/pull/35.
   
Best,
Robert
   
   
On Tue, Apr 14, 2015 at 5:26 PM, nse sik 
  nikolaas.steenber...@gmail.com
   
wrote:
   
Hi!
I am trying to implement a scala shell for flink.
   
I've started with a simple scala object who's main function will
 drop
   the
user to the interactive scala shell (repl) at one point:
   
   
   
   
import scala.tools.nsc.interpreter.ILoop
import scala.tools.nsc.Settings
   
object Job {
  def main(args: Array[String]) {
   
val repl = new ILoop()
repl.settings = new Settings()
   
// enable this line to use scala in intellij
repl.settings.usejavacp.value = true
   
repl.createInterpreter()
   
// start scala interpreter shell
repl.process(repl.settings)
   
repl.closeInterpreter()
}
  }
   
   
   
   
Now I am trying to execute the word count example as in:
   
   
   
   
scala import org.apache.flink.api.scala._
   
scala val env = ExecutionEnvironment.getExecutionEnvironment
   
scala val text = env.fromElements(To be, or not to be,--that is
 the
question:--,Whether 'tis nobler in the mind to suffer, The
 slings
   and
arrows of outrageous fortune,Or to take arms against a sea of
   troubles,)
   
scala val counts = text.flatMap { _.toLowerCase.split(\\W+)
 }.map {
   (_,
1) }.groupBy(0).sum(1)
   
scala counts.print()
   
scala env.execute(Flink Scala Api Skeleton)
   
   
   
   
   
   
However I am running into following error:
   
env.execute(Flink Scala Api Skeleton)
org.apache.flink.runtime.client.JobExecutionException:
java.lang.RuntimeException: The initialization of the DataSource's
   outputs
caused an error: The type serializer factory could not load its
   parameters
from the configuration due to missing classes.
at
   
   
  
 
 org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
at
   
   
  
 
 org.apache.flink.runtime.execution.RuntimeEnvironment.init(RuntimeEnvironment.java:187)
at
   
   
  
 
 org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
   
   
  
 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
   
   
  
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
Caused by: java.lang.RuntimeException: The type serializer factory
  could
not load its parameters from the configuration 

[jira] [Created] (FLINK-1897) Add accummulators and counters feature to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1897:
-

 Summary: Add accummulators and counters feature to Flink on Tez
 Key: FLINK-1897
 URL: https://issues.apache.org/jira/browse/FLINK-1897
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1895) Add task chaining to Flink on Tez

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1895:
-

 Summary: Add task chaining to Flink on Tez
 Key: FLINK-1895
 URL: https://issues.apache.org/jira/browse/FLINK-1895
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas
Priority: Minor


Currently, every runtime operator is wrapped inside a Tez processor. We should 
implement some form of task chaining, and measure the performance difference



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1894) Add Tez execution mode to Flink command-line tools

2015-04-16 Thread Kostas Tzoumas (JIRA)
Kostas Tzoumas created FLINK-1894:
-

 Summary: Add Tez execution mode to Flink command-line tools
 Key: FLINK-1894
 URL: https://issues.apache.org/jira/browse/FLINK-1894
 Project: Flink
  Issue Type: Improvement
  Components: Flink on Tez
Reporter: Kostas Tzoumas
Assignee: Kostas Tzoumas
Priority: Minor


To run Flink programs on Tez, users currently need to
(1) Specify the main class by env.registerMainClass
(2) Package the job in a fat jar
(3) User hadoop jar to submit the job to YARN

This is somewhat problematic, and certainly a worse user experience than 
regular Flink jobs. Tez execution mode should be part of Flink's command-line 
tools



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


TableAPI - Join on two keys

2015-04-16 Thread Felix Neutatz
Hi,

I want to join two tables in the following way:

case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
 .where(nodeID = src  nodeID = target)
 .groupBy('communityID)
 .select(communityID, weight.sum as sumTotal).toSet[CommunitySumTotal]


but I get this exception:

Exception in thread main
org.apache.flink.api.common.InvalidProgramException: The types of the key
fields do not match: The number of specified keys is different.
at
org.apache.flink.api.java.operators.JoinOperator.init(JoinOperator.java:96)
at
org.apache.flink.api.java.operators.JoinOperator$EquiJoin.init(JoinOperator.java:197)
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Moreover when I use the following where clause:

.where(nodeID = src || nodeID = target)

I get another error:

Exception in thread main
org.apache.flink.api.table.ExpressionException: Could not derive
equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
'target.

at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)


Apart from that the TableApi seems really promising. It's a really great tool.

Thank you for your help,

Felix


Re: About Operator and OperatorBase

2015-04-16 Thread Timo Walther

I share Stephans opinion.

By the way, we could also find a common name for operators with two 
inputs. Sometimes it's TwoInputXXX, DualInputXXX, 
BinaryInputXXX... pretty inconsistent.


On 15.04.2015 17:48, Till Rohrmann wrote:

I would also be in favour of making the distinction between the API and
common API layer more clear by using different names. This will ease the
understanding of the source code.

In the wake of a possible renaming we could also get rid of the legacy code
org.apache.flink.optimizer.dag.MatchNode and
rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to
make the naming more consistent.

On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi u...@apache.org wrote:


On 15 Apr 2015, at 15:01, Stephan Ewen se...@apache.org wrote:


I think we can rename the base operators.

Renaming the subclass of DataSet would be extremely api breaking. I think
that is not worth it.

Oh, that's right. We return MapOperator for DataSet operations. Stephan's
point makes sense.




[jira] [Created] (FLINK-1899) Table API Bug

2015-04-16 Thread Felix Neutatz (JIRA)
Felix Neutatz created FLINK-1899:


 Summary: Table API Bug
 Key: FLINK-1899
 URL: https://issues.apache.org/jira/browse/FLINK-1899
 Project: Flink
  Issue Type: Bug
  Components: Expression API
Affects Versions: 0.9
Reporter: Felix Neutatz
Priority: Minor


I want to run the following program

{code:scala}
case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable 
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
  .where(nodeID = src)
  .groupBy('communityID)
  .select('communityID, 'weight.sum).toSet[CommunitySumTotal]
{code}

but I get this exception. In my opinion the outputs do have the same field 
types.

{code:xml}
Exception in thread main org.apache.flink.api.table.ExpressionException: 
Expression result type org.apache.flink.api.table.Row(communityID: Integer, 
intermediate.1: Double) does not have the samefields as output type 
io.ssc.trackthetrackers.analysis.algorithms.CommunitySumTotal(communityID: 
Integer, sumTotal: Double)
at 
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88)
at 
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at 
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:105)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-1900) Table API documentation example does not work

2015-04-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-1900:
---

 Summary: Table API documentation example does not work
 Key: FLINK-1900
 URL: https://issues.apache.org/jira/browse/FLINK-1900
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Reporter: Timo Walther


Running the word count example leads to

{code}
Exception in thread main org.apache.flink.api.table.ExpressionException: 
Expression result type org.apache.flink.api.table.Row(word: String, 
intermediate.1: Integer) does not have the samefields as output type 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$WC$3(word:
 String, count: Integer)
at 
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:88)
at 
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at 
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.detectCommunities(LouvainCommunityDetection.scala:112)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$delayedInit$body.apply(LouvainCommunityDetection.scala:38)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection$.main(LouvainCommunityDetection.scala:36)
at 
io.ssc.trackthetrackers.analysis.algorithms.LouvainCommunityDetection.main(LouvainCommunityDetection.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Rework of the window-join semantics

2015-04-16 Thread Asterios Katsifodimos
As far as I see in [1], Peter's/Gyula's suggestion is what Infosphere
Streams does: symmetric hash join.

From [1]:
When a tuple is received on an input port, it is inserted into the window
corresponding to the input port, which causes the window to trigger. As
part of the trigger processing, the tuple is compared against all tuples
inside the window of the opposing input port. If the tuples match, then an
output tuple will be produced for each match. If at least one output was
generated, a window punctuation will be generated after all the outputs.

Cheers,
Asterios

[1]
http://www-01.ibm.com/support/knowledgecenter/#!/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-standard-toolkit-reference.doc/doc/join.html



On Thu, Apr 9, 2015 at 1:30 PM, Matthias J. Sax 
mj...@informatik.hu-berlin.de wrote:

 Hi Paris,

 thanks for the pointer to the Naiad paper. That is quite interesting.

 The paper I mentioned [1], does not describe the semantics in detail; it
 is more about the implementation for the stream-joins. However, it uses
 the same semantics (from my understanding) as proposed by Gyula.

 -Matthias

 [1] Kang, Naughton, Viglas. Evaluationg Window Joins over Unbounded
 Streams. VLDB 2002.



 On 04/07/2015 12:38 PM, Paris Carbone wrote:
  Hello Matthias,
 
  Sure, ordering guarantees are indeed a tricky thing, I recall having
 that discussion back in TU Berlin. Bear in mind thought that DataStream,
 our abstract data type, represents a *partitioned* unbounded sequence of
 events. There are no *global* ordering guarantees made whatsoever in that
 model across partitions. If you see it more generally there are many “race
 conditions” in a distributed execution graph of vertices that process
 multiple inputs asynchronously, especially when you add joins and
 iterations into the mix (how do you deal with reprocessing “old” tuples
 that iterate in the graph). Btw have you checked the Naiad paper [1]?
 Stephan cited a while ago and it is quite relevant to that discussion.
 
  Also, can you cite the paper with the joining semantics you are
 referring to? That would be of good help I think.
 
  Paris
 
  [1] https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
 
  https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
 
  https://users.soe.ucsc.edu/~abadi/Papers/naiad_final.pdf
  On 07 Apr 2015, at 11:50, Matthias J. Sax mj...@informatik.hu-berlin.de
 mailto:mj...@informatik.hu-berlin.de wrote:
 
  Hi @all,
 
  please keep me in the loop for this work. I am highly interested and I
  want to help on it.
 
  My initial thoughts are as follows:
 
  1) Currently, system timestamps are used and the suggested approach can
  be seen as state-of-the-art (there is actually a research paper using
  the exact same join semantic). Of course, the current approach is
  inherently non-deterministic. The advantage is, that there is no
  overhead in keeping track of the order of records and the latency should
  be very low. (Additionally, state-recovery is simplified. Because, the
  processing in inherently non-deterministic, recovery can be done with
  relaxed guarantees).
 
   2) The user should be able to switch on deterministic processing,
  ie, records are timestamped (either externally when generated, or
  timestamped at the sources). Because deterministic processing adds some
  overhead, the user should decide for it actively.
  In this case, the order must be preserved in each re-distribution step
  (merging is sufficient, if order is preserved within each incoming
  channel). Furthermore, deterministic processing can be achieved by sound
  window semantics (and there is a bunch of them). Even for
  single-stream-windows it's a tricky problem; for join-windows it's even
  harder. From my point of view, it is less important which semantics are
  chosen; however, the user must be aware how it works. The most tricky
  part for deterministic processing, is to deal with duplicate timestamps
  (which cannot be avoided). The timestamping for (intermediate) result
  tuples, is also an important question to be answered.
 
 
  -Matthias
 
 
  On 04/07/2015 11:37 AM, Gyula Fóra wrote:
  Hey,
 
  I agree with Kostas, if we define the exact semantics how this works,
 this
  is not more ad-hoc than any other stateful operator with multiple inputs.
  (And I don't think any other system support something similar)
 
  We need to make some design choices that are similar to the issues we had
  for windowing. We need to chose how we want to evaluate the windowing
  policies (global or local) because that affects what kind of policies can
  be parallel, but I can work on these things.
 
  I think this is an amazing feature, so I wouldn't necessarily rush the
  implementation for 0.9 though.
 
  And thanks for helping writing these down.
 
  Gyula
 
  On Tue, Apr 7, 2015 at 11:11 AM, Kostas Tzoumas ktzou...@apache.org
 mailto:ktzou...@apache.org wrote:
 
  Yes, we should write these semantics down. I volunteer to help.
 
  I don't 

Re: About Operator and OperatorBase

2015-04-16 Thread Maximilian Michels
+1 for keeping the API. Even though this will not change your initial
concern much, Aljoscha :) I agree with you that it would be more consistent
to call the result of an operator OperatorDataSet.

On Thu, Apr 16, 2015 at 3:16 PM, Fabian Hueske fhue...@gmail.com wrote:

 Renaming the core operators is fine with me, but I would not touch API
 facing classes.
 A big +1 for Timo's suggestion.

 2015-04-16 6:30 GMT-05:00 Timo Walther twal...@apache.org:

  I share Stephans opinion.
 
  By the way, we could also find a common name for operators with two
  inputs. Sometimes it's TwoInputXXX, DualInputXXX, BinaryInputXXX...
  pretty inconsistent.
 
 
  On 15.04.2015 17:48, Till Rohrmann wrote:
 
  I would also be in favour of making the distinction between the API and
  common API layer more clear by using different names. This will ease the
  understanding of the source code.
 
  In the wake of a possible renaming we could also get rid of the legacy
  code
  org.apache.flink.optimizer.dag.MatchNode and
  rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to
  make the naming more consistent.
 
  On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi u...@apache.org wrote:
 
   On 15 Apr 2015, at 15:01, Stephan Ewen se...@apache.org wrote:
 
   I think we can rename the base operators.
 
  Renaming the subclass of DataSet would be extremely api breaking. I
  think
  that is not worth it.
 
  Oh, that's right. We return MapOperator for DataSet operations.
 Stephan's
  point makes sense.
 
 
 



Re: About Operator and OperatorBase

2015-04-16 Thread Fabian Hueske
Renaming the core operators is fine with me, but I would not touch API
facing classes.
A big +1 for Timo's suggestion.

2015-04-16 6:30 GMT-05:00 Timo Walther twal...@apache.org:

 I share Stephans opinion.

 By the way, we could also find a common name for operators with two
 inputs. Sometimes it's TwoInputXXX, DualInputXXX, BinaryInputXXX...
 pretty inconsistent.


 On 15.04.2015 17:48, Till Rohrmann wrote:

 I would also be in favour of making the distinction between the API and
 common API layer more clear by using different names. This will ease the
 understanding of the source code.

 In the wake of a possible renaming we could also get rid of the legacy
 code
 org.apache.flink.optimizer.dag.MatchNode and
 rename org.apache.flink.runtime.operators.MatchDriver into JoinDriver to
 make the naming more consistent.

 On Wed, Apr 15, 2015 at 3:05 PM, Ufuk Celebi u...@apache.org wrote:

  On 15 Apr 2015, at 15:01, Stephan Ewen se...@apache.org wrote:

  I think we can rename the base operators.

 Renaming the subclass of DataSet would be extremely api breaking. I
 think
 that is not worth it.

 Oh, that's right. We return MapOperator for DataSet operations. Stephan's
 point makes sense.





[jira] [Created] (FLINK-1901) Create sample operator for Dataset

2015-04-16 Thread Theodore Vasiloudis (JIRA)
Theodore Vasiloudis created FLINK-1901:
--

 Summary: Create sample operator for Dataset
 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis


In order to be able to implement Stochastic Gradient Descent and a number of 
other machine learning algorithms we need to have a way to take a random sample 
from a Dataset.

We need to be able to sample with or without replacement from the Dataset, 
choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Gelly] Vertex-centric iteration updateVertex does not get called

2015-04-16 Thread Andra Lungu
Hello Gabor,

Yes, currently updateVertex only gets called when a new message was
received.
Could you please describe the logic behind your triangle count? The one I
know is described at the beginning of page 1643 in this article:
http://www.cc.gatech.edu/~bader/papers/GraphBSPonXMT-MTAAP2013.pdf

As you can see, each time(for all the three supersteps), a message gets
sent.
Here is my suboptimal implementation of the algorithm in the paper (it's
supposed to prove that high degree nodes overload the system):
https://github.com/andralungu/gelly-partitioning/commit/224cb9b6917c2320e16a657a549b2a0313aeb300

It needs some serious rebasing. I'll get to it this weekend :).
Nevertheless, it should serve as a starting point for your implementation.

Let us know if you have further questions!
Andra

P.S. I'm not sure calling vertexUpdate with an empty message iterator would
be so straightforward to implement. I'll have to look into it a bit more
once I get some spare time :)



On Thu, Apr 16, 2015 at 9:44 PM, Hermann Gábor reckone...@gmail.com wrote:

 Hi all,

 I am implementing a simple triangle counting example for a workshop with
 vertex-centric iteration and I found that the updateVertex method only gets
 called if there are new messages for that vertex. Is it the expected
 behavior?

 I know that the iteration should stop for the given vertex when the we
 don't change the vertex value but (at least in my case) it would be useful
 if the updateVertex got called with an empty message iterator. I guess
 receiving zero messages might have a meaning in other cases too, and the
 user would like to update the vertex value.
 Does changing the current behavior make sense?

 Cheers,
 Gabor