[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-05-15 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-102366422
  
Hey @andralungu! Thanks a lot for looking into this!
Can you please close this PR and I'll open a new one with the latest 
changes.

Regarding returning -1 if the option is not set, I did that so that we 
avoid throwing an exception and also because other options like 
getIterationAggregator() have a similar behavior. I really don't think it's 
such a big deal for one to read the javadoc when using a method :P 
Of course, if you have any idea on how we could make this better, please 
let me know! (but let's continue the discussion in the new PR).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-05-15 Thread andralungu
Github user andralungu closed the pull request at:

https://github.com/apache/flink/pull/537


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-05-14 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-101978637
  
Hi @vasia, 

I had a look at the new branch. The changes look good, degrees are no 
longer exposed to the user and the current approach removes the need to 
subclass Vertex. :+1: 

The only small remark/comment I have comes from a user perspective: 
- let's say that, by mistake, I forgot to set the degrees option;
- let's also say I was too busy to read the manual :)
- result: I will get -1 instead of the expected number of degrees per vertex

I understand why you had to pass -1 there; it should be of the same type as 
the degrees. However, maybe we can come up with some way to hint users that 
they should not forget to set the corresponding options.  Adding an extra line 
in the documentation might not suffice.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-05-11 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-101048928
  
Hey @andralungu ,

I built on your changes and implemented what I described above. I have 
pushed in [this branch](https://github.com/vasia/flink/tree/vc-extentions).
I had to manually resolve conflicts caused by #657, so I guess the easiest 
way to allow people to review would be for me to open a fresh PR.
Since you are the one mostly familiar with these features, could you please 
take a look and let me know what you think?

Thanks a lot!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-05-05 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-99070508
  
Hey @andralungu ,

I'm not sure what @StephanEwen had in mind when suggesting to subclass 
Vertex, but the current implementation forces the user to cast the `Vertex` to 
`VertexWithDegrees` in order to access the degrees, which is not a very 
user-friendly :S
I guess the point was to have `VertexWithDegrees` as an argument to the 
update and messaging functions instead? If that's the case however, it's not 
quite clear to me how this will play along with the configuration option 
setting.

Back to my previous comment on only accessing the degrees inside the 
update/messaging functions instead of adding public methods to the Vertex 
class, a simple way to do this would be the following:
Add set/get vertexDegree methods in `VertexUpdateFunction` and 
`MessagingFunction`. These would be the methods that the user will call to 
access the vertex degrees, in a way similar to `getDirection()`, 
`getBroadcastSet()` etc.
Inside `createResultVerticesWithDegrees`, you're already creating the 
augmented vertex dataset with the degrees and pass this as the solution set 
of the delta iteration. Then, in the corresponding coGroup method of the 
Messaging/VertexUpdate UDFs, you iterate over this dataset (state) and call the 
user-defined functions for each vertex. This is where you can set the degrees 
inside the update/messaging functions, instead of how you currently set them on 
the vertex object.

What do you think of this solution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-05-05 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-99232613
  
Hey @vasia ,

Everyone is acting as though I do not want to make these methods 
user-freindly. I do :), it's just not possible. 

Keep in mind that in the VertexUpdateFunction and in the MessagingFunction 
you can only access the vertex in the updateVertex() and sendMessages() 
methods. 

getDirection(), getBroadcastSet() return a single value for all the 
vertices. getDegree() returns one  value for each of the vertices. As 
previously implied, you cannot access the vertex in these classes. The only 
very ugly solution is to give it the entire DataSet and to constantly filter 
it. You basically have the degree of the vertex in the Tuple3, but this 
information is hidden from the user. 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-28 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-97159494
  
Thanks for the review, @StephanEwen! 
PR updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-27 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-96558324
  
Looks good in general. A few thoughts on this:

Your prior discussion involved efficiency, and Vasia suggested to not carry 
the degrees in all cases (as they are not needed in most cases). It seems that 
was not yet realized, because the Vertex class always carries the degrees.

I think we can improve the abstraction between the with-degree and 
without-degree case a bit. Cases where one has to throw a not supported can 
usually be improved with a good inheritance hierarchy. Does it work to make the 
VertexWithDegrees class a subclass of the Vertex class?

Also, as a bit of background: Vertex is a subclass of Tuple2. Tuples are 
currently the fastest data types in Flink. By adding additional Fields to the 
Tuple, you are making it a POJO (as far as I know), which is a bit slower to 
serialize type.

Also: primitives are usually better than boxed types. Prefer `long` over 
`Long` where possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-21 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28811435
  
--- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.IncrementalSSSPExample;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+   private String verticesPath;
+
+   private String edgesPath;
+
+   private String edgesInSSSPPath;
+
+   private String resultPath;
+
+   private String expected;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   public IncrementalSSSPITCase(TestExecutionMode mode) {
+   super(mode);
+   }
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   File verticesFile = tempFolder.newFile();
+   Files.write(IncrementalSSSPData.VERTICES, verticesFile, 
Charsets.UTF_8);
+
+   File edgesFile = tempFolder.newFile();
+   Files.write(IncrementalSSSPData.EDGES, edgesFile, 
Charsets.UTF_8);
+
+   File edgesInSSSPFile = tempFolder.newFile();
+   Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, 
Charsets.UTF_8);
+
+   verticesPath = verticesFile.toURI().toString();
+   edgesPath = edgesFile.toURI().toString();
+   edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+   }
+
+   @Test
--- End diff --

The test for removing the non-SP-edge is doable. It's the bigger graph test 
case that concerns me. Of course, we can generate some randomised edges, but 
how do we know which of those edges are in SSSP? That can only be done if we 
have the actual algorithm implemented.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-21 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28764335
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 ---
@@ -40,7 +42,37 @@
VertexValue extends Serializable, Message implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices = -1L;
+
+   public long getNumberOfVertices() throws Exception{
+   if (numberOfVertices == -1) {
+   throw new InaccessibleMethodException(The number of 
vertices option is not set);
+   }
+   return numberOfVertices;
+   }
+
+   void setNumberOfVertices(long numberOfVertices) {
+   this.numberOfVertices = numberOfVertices;
+   }
+
+   
//-
+
+   private boolean optDegrees;
+
+   public boolean isOptDegrees() {
--- End diff --

Are you talking about the getters? To my knowledge, these are public. 
Otherwise, they would be totally unusable. 
If you're talking about the default setters, I needed to see them in the 
entire package. But they are not public. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-21 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28785218
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 ---
@@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, 
VertexValue inputData) {
if (this.initialVertices == null) {
--- End diff --

I made that division in order to avoid having duplicate code: the number of 
vertices and the direction are totally independent of the degree option which 
is why they can  be set in the createResult() method. Afterwards, the code does 
exactly what you described in this comment: it separates the creation of a 
delta iteration and the creation of the messaging function plus vertex update 
function according to the vertex type(with degrees or not). It's not just the 
vertex that changes, but everything that uses its value afterwards changes too. 
I suggest you look a bit closer at the createResultVerticesWithDegrees and 
createResultSimpleVertex methods. I don't think their functionality can be 
simplified by just creating a simple vertex and a vertex with degrees. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-21 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-94915061
  
Hi @vasia ,

I added some answers to your inline comments! I will push my latest version 
for this tomorrow. 

Regarding the suggestion that the Vertex class might not be the best place 
for the getDegree methods, there is a reason why implementing this took a while 
^^. I wanted to make the degrees available only in the iteration. The problem 
is that with the current code(this one and the one in production), a vertex 
is only accessible in the updateVertex() method. This means that there is no 
way to get the vertex within the iteration, you don't have a Vertex object, 
it's not something you send to the class. That was the quick workaround I found 
for this issue. If you know a better way, I am eager to hear your suggestions. 

Thanks! :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-21 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28769333
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 ---
@@ -40,7 +42,37 @@
VertexValue extends Serializable, Message implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices = -1L;
+
+   public long getNumberOfVertices() throws Exception{
+   if (numberOfVertices == -1) {
+   throw new InaccessibleMethodException(The number of 
vertices option is not set);
+   }
+   return numberOfVertices;
+   }
+
+   void setNumberOfVertices(long numberOfVertices) {
+   this.numberOfVertices = numberOfVertices;
+   }
+
+   
//-
+
+   private boolean optDegrees;
+
+   public boolean isOptDegrees() {
--- End diff --

I'm not sure what I was referring to :confused: You can probably ignore 
this comment, sorry!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643293
  
--- Diff: docs/gelly_guide.md ---
@@ -380,6 +380,16 @@ all aggregates globally once per superstep and makes 
them available in the next
 
 * strongBroadcast Variables/strong: DataSets can be added as 
[Broadcast Variables](programming_guide.html#broadcast-variables) to the 
`VertexUpdateFunction` and `MessagingFunction`, using the 
`addBroadcastSetForUpdateFunction()` and 
`addBroadcastSetForMessagingFunction()` methods, respectively.
 
+* strongNumber of Vertices/strong: Accessing the total number of 
vertices within the iteration. This property can be set using the 
`setOptNumVertices()` method.
+
+The number of vertices can then be accessed in the vertex update function 
and in the messaging function using the `getNumberOfVertices()` method.
+
+* strongDegrees/strong: Accessing the in/out degree for a vertex 
within an iteration. This property can be set using the `setOptDegrees()` 
method.
+
--- End diff --

same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28644140
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 ---
@@ -40,7 +42,37 @@
VertexValue extends Serializable, Message implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices = -1L;
+
+   public long getNumberOfVertices() throws Exception{
+   if (numberOfVertices == -1) {
+   throw new InaccessibleMethodException(The number of 
vertices option is not set);
+   }
+   return numberOfVertices;
+   }
+
+   void setNumberOfVertices(long numberOfVertices) {
+   this.numberOfVertices = numberOfVertices;
+   }
+
+   
//-
+
+   private boolean optDegrees;
+
+   public boolean isOptDegrees() {
--- End diff --

this shouldn't be public, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643290
  
--- Diff: docs/gelly_guide.md ---
@@ -380,6 +380,16 @@ all aggregates globally once per superstep and makes 
them available in the next
 
 * strongBroadcast Variables/strong: DataSets can be added as 
[Broadcast Variables](programming_guide.html#broadcast-variables) to the 
`VertexUpdateFunction` and `MessagingFunction`, using the 
`addBroadcastSetForUpdateFunction()` and 
`addBroadcastSetForMessagingFunction()` methods, respectively.
 
+* strongNumber of Vertices/strong: Accessing the total number of 
vertices within the iteration. This property can be set using the 
`setOptNumVertices()` method.
+
--- End diff --

delete this newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643470
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
 ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was 
computed).
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation 
is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest 
path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge u, v is removed, v checks if it has another out-going 
SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, 
by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE 
message.
+ * When a vertex u receives an INVALIDATE message from v, it checks 
whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE 
message.
+ * The propagation stops when a vertex with an alternative shortest path 
is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage codeIncrementalSSSPExample lt;vertex pathgt; lt;edge 
pathgt; lt;edges in SSSPgt;
+ * lt;edge to be removedgt; lt;result pathgt; lt;number of 
iterationsgt;/codebr
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings(serial)
+public class IncrementalSSSPExample implements ProgramDescription {
+
+   public static void main(String [] args) throws Exception {
+
+   if(!parseParameters(args)) {
+   return;
+   }
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetVertexLong, Double vertices = 
getVerticesDataSet(env);
+
+   DataSetEdgeLong, Double edges = getEdgesDataSet(env);
+
+   DataSetEdgeLong, Double edgesInSSSP = 
getEdgesinSSSPDataSet(env);
+
+   EdgeLong, Double edgeToBeRemoved = getEdgeToBeRemoved();
+
+   GraphLong, Double, Double graph = Graph.fromDataSet(vertices, 
edges, env);
+
+   // Assumption: all minimum weight paths are kept
+   GraphLong, Double, Double ssspGraph = 
Graph.fromDataSet(vertices, edgesInSSSP, env);
+
+   // remove the edge
+   graph.removeEdge(edgeToBeRemoved);
+
+   // configure the iteration
+   IterationConfiguration parameters = new 
IterationConfiguration();
+
+   if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+   

[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643473
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
 ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was 
computed).
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation 
is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest 
path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge u, v is removed, v checks if it has another out-going 
SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, 
by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE 
message.
+ * When a vertex u receives an INVALIDATE message from v, it checks 
whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE 
message.
+ * The propagation stops when a vertex with an alternative shortest path 
is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage codeIncrementalSSSPExample lt;vertex pathgt; lt;edge 
pathgt; lt;edges in SSSPgt;
+ * lt;edge to be removedgt; lt;result pathgt; lt;number of 
iterationsgt;/codebr
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings(serial)
+public class IncrementalSSSPExample implements ProgramDescription {
+
+   public static void main(String [] args) throws Exception {
+
+   if(!parseParameters(args)) {
+   return;
+   }
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetVertexLong, Double vertices = 
getVerticesDataSet(env);
+
+   DataSetEdgeLong, Double edges = getEdgesDataSet(env);
+
+   DataSetEdgeLong, Double edgesInSSSP = 
getEdgesinSSSPDataSet(env);
+
+   EdgeLong, Double edgeToBeRemoved = getEdgeToBeRemoved();
+
+   GraphLong, Double, Double graph = Graph.fromDataSet(vertices, 
edges, env);
+
+   // Assumption: all minimum weight paths are kept
+   GraphLong, Double, Double ssspGraph = 
Graph.fromDataSet(vertices, edgesInSSSP, env);
+
+   // remove the edge
+   graph.removeEdge(edgeToBeRemoved);
+
+   // configure the iteration
+   IterationConfiguration parameters = new 
IterationConfiguration();
+
+   if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+   

[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643463
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
 ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was 
computed).
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation 
is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest 
path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge u, v is removed, v checks if it has another out-going 
SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, 
by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE 
message.
+ * When a vertex u receives an INVALIDATE message from v, it checks 
whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE 
message.
+ * The propagation stops when a vertex with an alternative shortest path 
is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage codeIncrementalSSSPExample lt;vertex pathgt; lt;edge 
pathgt; lt;edges in SSSPgt;
+ * lt;edge to be removedgt; lt;result pathgt; lt;number of 
iterationsgt;/codebr
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings(serial)
+public class IncrementalSSSPExample implements ProgramDescription {
+
+   public static void main(String [] args) throws Exception {
+
+   if(!parseParameters(args)) {
+   return;
+   }
+
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSetVertexLong, Double vertices = 
getVerticesDataSet(env);
+
+   DataSetEdgeLong, Double edges = getEdgesDataSet(env);
+
+   DataSetEdgeLong, Double edgesInSSSP = 
getEdgesinSSSPDataSet(env);
+
+   EdgeLong, Double edgeToBeRemoved = getEdgeToBeRemoved();
+
+   GraphLong, Double, Double graph = Graph.fromDataSet(vertices, 
edges, env);
+
+   // Assumption: all minimum weight paths are kept
+   GraphLong, Double, Double ssspGraph = 
Graph.fromDataSet(vertices, edgesInSSSP, env);
+
+   // remove the edge
+   graph.removeEdge(edgeToBeRemoved);
+
+   // configure the iteration
+   IterationConfiguration parameters = new 
IterationConfiguration();
+
+   if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+   

[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643683
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 ---
@@ -24,18 +24,24 @@
 
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
--- End diff --

Could you please also update the description referring to the 
`withPlainEdges` and `withValuedEdges` methods? This is no longer the case :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-94167112
  
Hi @andralungu! Thanks for the update :-)

I left a few inline comments. Overall it looks good, I just had a little 
trouble following the logic in `VertexCentricIteration` with your changes. I 
think it could be simplified a bit, so that all configuration options are set 
in one place. Other that that, there are some javadocs missing, please make 
sure to add a description on every public method.

One concern I have is whether the `Vertex` class is the right place for the 
degrees fields and methods. These should be available only inside the iteration 
methods. In the current implementation one can use the public `setInDegree` 
method and then retrieve the value, regardless of whether that happens inside 
an iteration or whether it was set in the configuration. Could we instead 
create methods inside the `VertexUpdateFunction` and `MessagingFunction`? 
Something like `getVertexInDegree()` instead of `vertex.getInDegree()`? Or do 
you have a better idea?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643437
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
 ---
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
--- End diff --

hmm this might be a bit misleading, as this program only incrementally 
updates the shortest paths when removing an edge from the initial graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643520
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
 ---
@@ -189,4 +199,33 @@ public void addBroadcastSetForUpdateFunction(String 
name, DataSet? data) {
public ListTuple2String, DataSet? getMessagingBcastVars() {
return this.bcVarsMessaging;
}
+
+   // 
--
+   // The direction, degrees and the total number of vertices should be 
optional.
+   // The user can access them by setting the direction, degrees or the 
numVertices options.
+   // 
--
--- End diff --

all parameters in this class are optional :-)
Can you add javadocs to the new methods, like in the methods above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643695
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 ---
@@ -86,7 +92,9 @@
private DataSetVertexVertexKey, VertexValue initialVertices;
 
private IterationConfiguration configuration;
-   
+
+   private DataSetVertexVertexKey, Tuple3VertexValue, Long, Long 
verticesWithDegrees;
--- End diff --

it seems that this isn't used anywhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28644150
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 ---
@@ -38,25 +42,57 @@
  * @param Message The type of the message sent between vertices along 
the edges.
  * @param EdgeValue The type of the values that are associated with the 
edges.
  */
-public abstract class MessagingFunctionVertexKey extends 
ComparableVertexKey  Serializable, 
-   VertexValue extends Serializable, Message, EdgeValue extends 
Serializable implements Serializable {
+public abstract class MessagingFunctionVertexKey extends 
ComparableVertexKey  Serializable,
+   VertexValue extends Serializable, Message, EdgeValue extends 
Serializable implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices = -1L;
+
+   public long getNumberOfVertices() throws Exception{
+   if (numberOfVertices == -1) {
+   throw new InaccessibleMethodException(The number of 
vertices option is not set);
+   }
+   return numberOfVertices;
+   }
+
+   void setNumberOfVertices(long numberOfVertices) {
+   this.numberOfVertices = numberOfVertices;
+   }
+
+   // 

+   //  Attribute that allows the user to choose the neighborhood 
type(in/out/all) on which to run
+   //  the vertex centric iteration.
+   // 

+
+   private EdgeDirection direction;
+
+   public EdgeDirection getDirection() {
+   return direction;
+   }
+
+   public void setDirection(EdgeDirection direction) {
--- End diff --

these shouldn't be public either


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643344
  
--- Diff: docs/gelly_guide.md ---
@@ -380,6 +380,16 @@ all aggregates globally once per superstep and makes 
them available in the next
 
 * strongBroadcast Variables/strong: DataSets can be added as 
[Broadcast Variables](programming_guide.html#broadcast-variables) to the 
`VertexUpdateFunction` and `MessagingFunction`, using the 
`addBroadcastSetForUpdateFunction()` and 
`addBroadcastSetForMessagingFunction()` methods, respectively.
 
+* strongNumber of Vertices/strong: Accessing the total number of 
vertices within the iteration. This property can be set using the 
`setOptNumVertices()` method.
+
+The number of vertices can then be accessed in the vertex update function 
and in the messaging function using the `getNumberOfVertices()` method.
+
+* strongDegrees/strong: Accessing the in/out degree for a vertex 
within an iteration. This property can be set using the `setOptDegrees()` 
method.
+
+The in/out degrees can then be accessed in the vertex update function and 
in the messaging function, per vertex using `vertex.getInDegree()` or 
`vertex.getOutDegree()`.
+
+* strongMessaging Direction/strong: The direction in which messages 
are sent. This can be either EdgeDirection.IN, EdgeDirection.OUT, 
EdgeDirection.ALL. The messaging direction also dictates the update direction 
which would be EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, 
respectively. This property can be set using the `setDirection()` method.
+
--- End diff --

First, I would highlight the `EdgeDirection.IN` etc. Then, maybe we can 
rephrase this to make a bit clearer. I would write something like the 
following: By default a vertex sends messages to its out-neighbors and thus, 
updates its value based on received messages from its in-neighbors. This 
configuration option allows to change the messaging direction and set it to 
[...]. The messaging direction also dictates the update direction.
We could also add a couple of figures to illustrate the messaging-update 
directions. I can create those, so that they're in the same format as the rest 
of the figures in the guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28644120
  
--- Diff: 
flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.IncrementalSSSPExample;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+   private String verticesPath;
+
+   private String edgesPath;
+
+   private String edgesInSSSPPath;
+
+   private String resultPath;
+
+   private String expected;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   public IncrementalSSSPITCase(TestExecutionMode mode) {
+   super(mode);
+   }
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile().toURI().toString();
+   File verticesFile = tempFolder.newFile();
+   Files.write(IncrementalSSSPData.VERTICES, verticesFile, 
Charsets.UTF_8);
+
+   File edgesFile = tempFolder.newFile();
+   Files.write(IncrementalSSSPData.EDGES, edgesFile, 
Charsets.UTF_8);
+
+   File edgesInSSSPFile = tempFolder.newFile();
+   Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, 
Charsets.UTF_8);
+
+   verticesPath = verticesFile.toURI().toString();
+   edgesPath = edgesFile.toURI().toString();
+   edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+   }
+
+   @Test
--- End diff --

I would add a few more tests here with a bigger dataset maybe. The example 
algorithm is quite complex. you only check what happens if you remove an 
SP-edge is a very small graph. We should at least test what happens when you 
remove a non-SP-edge and whether the INVALIDATE messages are propagated 
correctly in the case of a bigger graph.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643494
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
 ---
@@ -77,7 +77,7 @@ public Double map(VertexK, Double value) {
extends VertexUpdateFunctionK, Double, Double {
 
--- End diff --

I really like what the examples look like now! Much better having the 
Vertex object instead of separated key-value ^^


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643926
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 ---
@@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, 
VertexValue inputData) {
if (this.initialVertices == null) {
throw new IllegalStateException(The input data set has 
not been set.);
}
-   
+
// prepare some type information
-   TypeInformationVertexVertexKey, VertexValue vertexTypes = 
initialVertices.getType();
TypeInformationVertexKey keyType = ((TupleTypeInfo?) 
initialVertices.getType()).getTypeAt(0);
TypeInformationTuple2VertexKey, Message messageTypeInfo = 
new TupleTypeInfoTuple2VertexKey,Message(keyType, messageType);
 
-   final int[] zeroKeyPos = new int[] {0};
-   
-   final DeltaIterationVertexVertexKey, VertexValue, 
VertexVertexKey, VertexValue iteration =
-   this.initialVertices.iterateDelta(this.initialVertices, 
this.maximumNumberOfIterations, zeroKeyPos);
+   // create a graph
+   GraphVertexKey, VertexValue, EdgeValue graph =
+   Graph.fromDataSet(initialVertices, 
edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
 
-   // set up the iteration operator
-   if (this.configuration != null) {
+   // check whether the numVertices option is set and, if so, 
compute the total number of vertices
+   // and set it within the messaging and update functions
 
-   iteration.name(this.configuration.getName(
-   Vertex-centric iteration ( + 
updateFunction +  |  + messagingFunction + )));
-   
iteration.parallelism(this.configuration.getParallelism());
-   
iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
-   // register all aggregators
-   for (Map.EntryString, Aggregator? entry : 
this.configuration.getAggregators().entrySet()) {
-   iteration.registerAggregator(entry.getKey(), 
entry.getValue());
+   if (this.configuration != null  
this.configuration.isOptNumVertices()) {
+   try {
+   long numberOfVertices = 
graph.numberOfVertices();
+   
messagingFunction.setNumberOfVertices(numberOfVertices);
+   
updateFunction.setNumberOfVertices(numberOfVertices);
+   } catch (Exception e) {
+   e.printStackTrace();
}
}
-   else {
-   // no configuration provided; set default name
-   iteration.name(Vertex-centric iteration ( + 
updateFunction +  |  + messagingFunction + ));
-   }
-   
-   // build the messaging function (co group)
-   CoGroupOperator?, ?, Tuple2VertexKey, Message messages;
-   MessagingUdfWithEdgeValuesVertexKey, VertexValue, Message, 
EdgeValue messenger = new MessagingUdfWithEdgeValuesVertexKey, VertexValue, 
Message, EdgeValue(messagingFunction, messageTypeInfo);
-   messages = 
this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-   
-   // configure coGroup message function with name and broadcast 
variables
-   messages = messages.name(Messaging);
 
-   if (this.configuration != null) {
-   for (Tuple2String, DataSet? e : 
this.configuration.getMessagingBcastVars()) {
-   messages = messages.withBroadcastSet(e.f1, 
e.f0);
-   }   
+   if(this.configuration != null) {
+   
messagingFunction.setDirection(this.configuration.getDirection());
+   } else {
+   messagingFunction.setDirection(EdgeDirection.OUT);
}
-   
-   VertexUpdateUdfVertexKey, VertexValue, Message updateUdf = 
new VertexUpdateUdfVertexKey, VertexValue, Message(updateFunction, 
vertexTypes);
-   
-   // build the update function (co group)
-   CoGroupOperator?, ?, VertexVertexKey, VertexValue updates =
-   
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-   
-   // configure coGroup update function with name and broadcast 
variables
-   updates = updates.name(Vertex State Updates);
 
-   if 

[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-18 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r28643920
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
 ---
@@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, 
VertexValue inputData) {
if (this.initialVertices == null) {
--- End diff --

I find the new logic of this method a bit confusing. Why do you set some 
configuration parameters here (degrees and direction) and the rest in the 
helper methods?
As far as I understand, you have 2 cases that matter here: the degrees 
option is set or not. This defines whether you create a simple vertex or you 
augment it with degree information, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-13 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-92301058
  
Hey @andralungu! 
Now that #547 is merged, the degrees, total number of vertices and 
direction can be added as configuration options. This should also simplify 
writing the tests. Take a look at `VertexCentricConfigurationITCase`. You 
should be able to cover some of the cases by extending it.
I hope rebasing won't be a big pain :-)

If an option is not set and the user tries to call one of the methods that 
isn't computed, then we could display a message to inform them that the 
corresponding option is not set and how to enable it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-07 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-90728415
  
Hi @vasia ,

I simply assumed that if the previous examples work, the iteration also 
works as before. Would you like a specific test suite for that?

For the degree option not being set : the getInDegree(); getOutDegree() 
methods are located inside the Vertex class. Not sure that their availability 
can be changed by setting the option on the iteration. They are not computed 
when the degree option is not set, but you can still see the method. Do you 
have any idea how to make the degrees inaccessible?

Once this becomes clear, I can add the tests :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-06 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-90233701
  
Hey @andralungu! Thanks a lot for your work on this :)

I only took a quick look at your changes for now, but it seems that all 
issues are fixed.
This PR introduces a lot of changes, so I would like to look into it more 
carefully. Also, it'd be great if one more person could review.

Next, I think we should add a few tests to make sure we don't break 
existing functionality and that the newly introduced options work as expected.

More specifically, I would test the following:
- if no direction is given, the iteration works as before, i.e. collecting 
messages from in-neighbors and sending messages to out-neighbors .
- if direction is set to IN / OUT / ALL
- if the degrees option is not set, the vertex cannot access the degree, 
i.e. no method is available or an error is produced
- if the degrees option is set, the degrees can be accessed in all 
supersteps and their values are correct
- combination of the above, e.g. set direction to ALL and degrees.

I guess adding those tests will be easier after we merge #547 and make the 
new options part of the configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-04-06 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-90061271
  
@vasia ,

PR updated :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-29 Thread andralungu
Github user andralungu commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-87425953
  
Hi @vasia ,

1). I will add the documentation once we reach a consensus. No use 
documenting something that might change :)
2). For the public setter: I still couldn't think of a workaround. Waiting 
for suggestions. 
3). Most of the comments were addressed in this final commit, however, when 
speaking about avoiding unnecessary overhead, the only way I could do that is 
to add another VertexCentricIteration class that operates on Tuple3 and have 
the user decide from the constructor which class he/she wants to use. To me 
that seems to introduce a ton of duplicate code which will look very bad. 

Keeping in mind that I need to store the degrees in the vertex value, any 
suggestions on how to eliminate this overhead would be very useful.

Thanks! 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-29 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-87462312
  
Hi @andralungu!
Regarding (2), I think you could do that inside VertexCentricIteration 
instead of inside a method of Graph.
Regarding (3), I agree that adding another VertexCentricIteration wouldn't 
be a nice solution. Why don't you create the Tuple3 (or whatever you need based 
on the options) vertex dataset inside `createResult()`? I don't see the need in 
changing VertexCentricIteration's types / output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-28 Thread andralungu
Github user andralungu commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r27344502
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 ---
@@ -38,11 +41,41 @@
  * @param Message The type of the message sent between vertices along 
the edges.
  * @param EdgeValue The type of the values that are associated with the 
edges.
  */
-public abstract class MessagingFunctionVertexKey extends 
ComparableVertexKey  Serializable, 
-   VertexValue extends Serializable, Message, EdgeValue extends 
Serializable implements Serializable {
+public abstract class MessagingFunctionVertexKey extends 
ComparableVertexKey  Serializable,
+   VertexValue extends Serializable, Message, EdgeValue extends 
Serializable implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices;
+
+   public long getNumberOfVertices() {
+   return numberOfVertices;
+   }
+
+   public void setNumberOfVertices(long numberOfVertices) {
--- End diff --

Normally it should be private. However, I need to access that setter in 
Graph's createVertexCentricIteration. If you can think of an alternative, let 
me know. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-26 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/537#issuecomment-86727594
  
Hey @andralungu! Thanks for this PR. I know it took more effort than you 
might have thought initially :-)

I have a few comments:
- In my opinion, the degrees availability and the total number of vertices 
should be *optional*. Otherwise, we are introducing unnecessary overhead for 
the majority of cases, i.e. when these are not actually used by an algorithm.
- The edge direction option and the degrees / numOfVertices should be 
*separate extensions* and not tight together, i.e. we should allow a user to 
choose the edge direction option independently of whether they also want to 
access the degrees. To be consistent with the rest of the 
VertexCentricIteration options, we could introduce methods 
`setMessagingDirection()`, `setInDegreesAvailable()` and so on.
- The change should be transparent to user code, i.e. if a user has turned 
on the option for accessing inDegrees, they should be able to call 
`vertex.getInDegree()` inside the messaging/update functions, but 
`vertex.getValue()` should still return the vertex value. In other words, this 
Tuple3 you're using to store the degrees should be hidden from the public 
methods.
- Finally, we should add documentation for these options, including 
description of how they work and some illustrative examples.

Let me know what you think about the above and whether you have any 
questions!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-26 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r27256989
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
 ---
@@ -38,11 +41,41 @@
  * @param Message The type of the message sent between vertices along 
the edges.
  * @param EdgeValue The type of the values that are associated with the 
edges.
  */
-public abstract class MessagingFunctionVertexKey extends 
ComparableVertexKey  Serializable, 
-   VertexValue extends Serializable, Message, EdgeValue extends 
Serializable implements Serializable {
+public abstract class MessagingFunctionVertexKey extends 
ComparableVertexKey  Serializable,
+   VertexValue extends Serializable, Message, EdgeValue extends 
Serializable implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices;
+
+   public long getNumberOfVertices() {
+   return numberOfVertices;
+   }
+
+   public void setNumberOfVertices(long numberOfVertices) {
--- End diff --

I guess this one shouldn't be public.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-26 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r27257390
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
 ---
@@ -40,7 +41,22 @@
VertexValue extends Serializable, Message implements Serializable {
 
private static final long serialVersionUID = 1L;
-   
+
+   // 

+   //  Attributes that allow vertices to access their in/out degrees and 
the total number of vertices
+   //  inside an iteration.
+   // 

+
+   private long numberOfVertices;
+
+   public long getNumberOfVertices() {
+   return numberOfVertices;
+   }
+
+   public void setNumberOfVertices(long numberOfVertices) {
--- End diff --

same here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-26 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r27255825
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java ---
@@ -1118,17 +1118,61 @@ public boolean filter(EdgeK, EV edge) {
}
 
/**
-* Create a Vertex-Centric iteration on the graph.
-* 
+* Create a Vertex-Centric iteration on the graph. inDegrees, outDegrees
+* and the total number of vertices are set here in order to be 
retrieved
+* later on within the iteration.
+*
+* Also, The user can define the direction in which the updates are 
performed
+* and in which the messages are sent. The function below tackles the 
regular
+* use-case where updates of the in-neighbors are used to calculate 
state
+* and messages are sent to out-neighbors.
+*
 * @param vertexUpdateFunction the vertex update function
 * @param messagingFunction the messaging function
 * @param maximumNumberOfIterations maximum number of iterations to 
perform
 * @return
 */
-   public M VertexCentricIterationK, VV, M, EV 
createVertexCentricIteration(
+   public M, NV extends Serializable VertexCentricIterationK, VV, M, 
EV createVertexCentricIteration(
--- End diff --

NV is never used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...

2015-03-26 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/537#discussion_r27256784
  
--- Diff: 
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
 ---
@@ -68,12 +70,12 @@ public LabelPropagation(int maxIterations) {
public static final class UpdateVertexLabelK extends ComparableK  
Serializable
extends VertexUpdateFunctionK, Long, Long {
 
-   public void updateVertex(K vertexKey, Long vertexValue,
+   public void updateVertex(VertexK, Tuple3Long,Long,Long 
vertex,
--- End diff --

as in the comment below, this should be `VertexK, Long`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---