[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-102366422 Hey @andralungu! Thanks a lot for looking into this! Can you please close this PR and I'll open a new one with the latest changes. Regarding returning -1 if the option is not set, I did that so that we avoid throwing an exception and also because other options like getIterationAggregator() have a similar behavior. I really don't think it's such a big deal for one to read the javadoc when using a method :P Of course, if you have any idea on how we could make this better, please let me know! (but let's continue the discussion in the new PR). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu closed the pull request at: https://github.com/apache/flink/pull/537 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-101978637 Hi @vasia, I had a look at the new branch. The changes look good, degrees are no longer exposed to the user and the current approach removes the need to subclass Vertex. :+1: The only small remark/comment I have comes from a user perspective: - let's say that, by mistake, I forgot to set the degrees option; - let's also say I was too busy to read the manual :) - result: I will get -1 instead of the expected number of degrees per vertex I understand why you had to pass -1 there; it should be of the same type as the degrees. However, maybe we can come up with some way to hint users that they should not forget to set the corresponding options. Adding an extra line in the documentation might not suffice. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-101048928 Hey @andralungu , I built on your changes and implemented what I described above. I have pushed in [this branch](https://github.com/vasia/flink/tree/vc-extentions). I had to manually resolve conflicts caused by #657, so I guess the easiest way to allow people to review would be for me to open a fresh PR. Since you are the one mostly familiar with these features, could you please take a look and let me know what you think? Thanks a lot! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-99070508 Hey @andralungu , I'm not sure what @StephanEwen had in mind when suggesting to subclass Vertex, but the current implementation forces the user to cast the `Vertex` to `VertexWithDegrees` in order to access the degrees, which is not a very user-friendly :S I guess the point was to have `VertexWithDegrees` as an argument to the update and messaging functions instead? If that's the case however, it's not quite clear to me how this will play along with the configuration option setting. Back to my previous comment on only accessing the degrees inside the update/messaging functions instead of adding public methods to the Vertex class, a simple way to do this would be the following: Add set/get vertexDegree methods in `VertexUpdateFunction` and `MessagingFunction`. These would be the methods that the user will call to access the vertex degrees, in a way similar to `getDirection()`, `getBroadcastSet()` etc. Inside `createResultVerticesWithDegrees`, you're already creating the augmented vertex dataset with the degrees and pass this as the solution set of the delta iteration. Then, in the corresponding coGroup method of the Messaging/VertexUpdate UDFs, you iterate over this dataset (state) and call the user-defined functions for each vertex. This is where you can set the degrees inside the update/messaging functions, instead of how you currently set them on the vertex object. What do you think of this solution? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-99232613 Hey @vasia , Everyone is acting as though I do not want to make these methods user-freindly. I do :), it's just not possible. Keep in mind that in the VertexUpdateFunction and in the MessagingFunction you can only access the vertex in the updateVertex() and sendMessages() methods. getDirection(), getBroadcastSet() return a single value for all the vertices. getDegree() returns one value for each of the vertices. As previously implied, you cannot access the vertex in these classes. The only very ugly solution is to give it the entire DataSet and to constantly filter it. You basically have the degree of the vertex in the Tuple3, but this information is hidden from the user. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-97159494 Thanks for the review, @StephanEwen! PR updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-96558324 Looks good in general. A few thoughts on this: Your prior discussion involved efficiency, and Vasia suggested to not carry the degrees in all cases (as they are not needed in most cases). It seems that was not yet realized, because the Vertex class always carries the degrees. I think we can improve the abstraction between the with-degree and without-degree case a bit. Cases where one has to throw a not supported can usually be improved with a good inheritance hierarchy. Does it work to make the VertexWithDegrees class a subclass of the Vertex class? Also, as a bit of background: Vertex is a subclass of Tuple2. Tuples are currently the fastest data types in Flink. By adding additional Fields to the Tuple, you are making it a POJO (as far as I know), which is a bit slower to serialize type. Also: primitives are usually better than boxed types. Prefer `long` over `Long` where possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28811435 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.IncrementalSSSPExample; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class IncrementalSSSPITCase extends MultipleProgramsTestBase { + + private String verticesPath; + + private String edgesPath; + + private String edgesInSSSPPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public IncrementalSSSPITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8); + + File edgesInSSSPFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + edgesInSSSPPath = edgesInSSSPFile.toURI().toString(); + } + + @Test --- End diff -- The test for removing the non-SP-edge is doable. It's the bigger graph test case that concerns me. Of course, we can generate some randomised edges, but how do we know which of those edges are in SSSP? That can only be done if we have the actual algorithm implemented. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28764335 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java --- @@ -40,7 +42,37 @@ VertexValue extends Serializable, Message implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices = -1L; + + public long getNumberOfVertices() throws Exception{ + if (numberOfVertices == -1) { + throw new InaccessibleMethodException(The number of vertices option is not set); + } + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //- + + private boolean optDegrees; + + public boolean isOptDegrees() { --- End diff -- Are you talking about the getters? To my knowledge, these are public. Otherwise, they would be totally unusable. If you're talking about the default setters, I needed to see them in the entire package. But they are not public. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28785218 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, VertexValue inputData) { if (this.initialVertices == null) { --- End diff -- I made that division in order to avoid having duplicate code: the number of vertices and the direction are totally independent of the degree option which is why they can be set in the createResult() method. Afterwards, the code does exactly what you described in this comment: it separates the creation of a delta iteration and the creation of the messaging function plus vertex update function according to the vertex type(with degrees or not). It's not just the vertex that changes, but everything that uses its value afterwards changes too. I suggest you look a bit closer at the createResultVerticesWithDegrees and createResultSimpleVertex methods. I don't think their functionality can be simplified by just creating a simple vertex and a vertex with degrees. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-94915061 Hi @vasia , I added some answers to your inline comments! I will push my latest version for this tomorrow. Regarding the suggestion that the Vertex class might not be the best place for the getDegree methods, there is a reason why implementing this took a while ^^. I wanted to make the degrees available only in the iteration. The problem is that with the current code(this one and the one in production), a vertex is only accessible in the updateVertex() method. This means that there is no way to get the vertex within the iteration, you don't have a Vertex object, it's not something you send to the class. That was the quick workaround I found for this issue. If you know a better way, I am eager to hear your suggestions. Thanks! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28769333 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java --- @@ -40,7 +42,37 @@ VertexValue extends Serializable, Message implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices = -1L; + + public long getNumberOfVertices() throws Exception{ + if (numberOfVertices == -1) { + throw new InaccessibleMethodException(The number of vertices option is not set); + } + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //- + + private boolean optDegrees; + + public boolean isOptDegrees() { --- End diff -- I'm not sure what I was referring to :confused: You can probably ignore this comment, sorry! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643293 --- Diff: docs/gelly_guide.md --- @@ -380,6 +380,16 @@ all aggregates globally once per superstep and makes them available in the next * strongBroadcast Variables/strong: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively. +* strongNumber of Vertices/strong: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. + +The number of vertices can then be accessed in the vertex update function and in the messaging function using the `getNumberOfVertices()` method. + +* strongDegrees/strong: Accessing the in/out degree for a vertex within an iteration. This property can be set using the `setOptDegrees()` method. + --- End diff -- same --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28644140 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java --- @@ -40,7 +42,37 @@ VertexValue extends Serializable, Message implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices = -1L; + + public long getNumberOfVertices() throws Exception{ + if (numberOfVertices == -1) { + throw new InaccessibleMethodException(The number of vertices option is not set); + } + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + //- + + private boolean optDegrees; + + public boolean isOptDegrees() { --- End diff -- this shouldn't be public, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643290 --- Diff: docs/gelly_guide.md --- @@ -380,6 +380,16 @@ all aggregates globally once per superstep and makes them available in the next * strongBroadcast Variables/strong: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively. +* strongNumber of Vertices/strong: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. + --- End diff -- delete this newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643470 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * Incremental Single Sink Shortest Paths Example. + * + * The program takes as input the resulted graph after a SSSP computation, + * an edge to be removed and the initial graph(i.e. before SSSP was computed). + * + * - If the removed edge does not belong to the SP-graph, no computation is necessary. + * The edge is simply removed from the graph. + * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, + * potentially require re-computation. + * When the edge u, v is removed, v checks if it has another out-going SP-edge. + * If yes, no further computation is required. + * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. + * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. + * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. + * If not, it invalidates its current value and propagates the INVALIDATE message. + * The propagation stops when a vertex with an alternative shortest path is reached + * or when we reach a vertex with no SP-in-neighbors. + * + * Usage codeIncrementalSSSPExample lt;vertex pathgt; lt;edge pathgt; lt;edges in SSSPgt; + * lt;edge to be removedgt; lt;result pathgt; lt;number of iterationsgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} + */ +@SuppressWarnings(serial) +public class IncrementalSSSPExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetVertexLong, Double vertices = getVerticesDataSet(env); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + DataSetEdgeLong, Double edgesInSSSP = getEdgesinSSSPDataSet(env); + + EdgeLong, Double edgeToBeRemoved = getEdgeToBeRemoved(); + + GraphLong, Double, Double graph = Graph.fromDataSet(vertices, edges, env); + + // Assumption: all minimum weight paths are kept + GraphLong, Double, Double ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + IterationConfiguration parameters = new IterationConfiguration(); + + if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + +
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643473 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * Incremental Single Sink Shortest Paths Example. + * + * The program takes as input the resulted graph after a SSSP computation, + * an edge to be removed and the initial graph(i.e. before SSSP was computed). + * + * - If the removed edge does not belong to the SP-graph, no computation is necessary. + * The edge is simply removed from the graph. + * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, + * potentially require re-computation. + * When the edge u, v is removed, v checks if it has another out-going SP-edge. + * If yes, no further computation is required. + * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. + * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. + * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. + * If not, it invalidates its current value and propagates the INVALIDATE message. + * The propagation stops when a vertex with an alternative shortest path is reached + * or when we reach a vertex with no SP-in-neighbors. + * + * Usage codeIncrementalSSSPExample lt;vertex pathgt; lt;edge pathgt; lt;edges in SSSPgt; + * lt;edge to be removedgt; lt;result pathgt; lt;number of iterationsgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} + */ +@SuppressWarnings(serial) +public class IncrementalSSSPExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetVertexLong, Double vertices = getVerticesDataSet(env); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + DataSetEdgeLong, Double edgesInSSSP = getEdgesinSSSPDataSet(env); + + EdgeLong, Double edgeToBeRemoved = getEdgeToBeRemoved(); + + GraphLong, Double, Double graph = Graph.fromDataSet(vertices, edges, env); + + // Assumption: all minimum weight paths are kept + GraphLong, Double, Double ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + IterationConfiguration parameters = new IterationConfiguration(); + + if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + +
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643463 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * Incremental Single Sink Shortest Paths Example. + * + * The program takes as input the resulted graph after a SSSP computation, + * an edge to be removed and the initial graph(i.e. before SSSP was computed). + * + * - If the removed edge does not belong to the SP-graph, no computation is necessary. + * The edge is simply removed from the graph. + * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge, + * potentially require re-computation. + * When the edge u, v is removed, v checks if it has another out-going SP-edge. + * If yes, no further computation is required. + * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF. + * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message. + * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge. + * If not, it invalidates its current value and propagates the INVALIDATE message. + * The propagation stops when a vertex with an alternative shortest path is reached + * or when we reach a vertex with no SP-in-neighbors. + * + * Usage codeIncrementalSSSPExample lt;vertex pathgt; lt;edge pathgt; lt;edges in SSSPgt; + * lt;edge to be removedgt; lt;result pathgt; lt;number of iterationsgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData} + */ +@SuppressWarnings(serial) +public class IncrementalSSSPExample implements ProgramDescription { + + public static void main(String [] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetVertexLong, Double vertices = getVerticesDataSet(env); + + DataSetEdgeLong, Double edges = getEdgesDataSet(env); + + DataSetEdgeLong, Double edgesInSSSP = getEdgesinSSSPDataSet(env); + + EdgeLong, Double edgeToBeRemoved = getEdgeToBeRemoved(); + + GraphLong, Double, Double graph = Graph.fromDataSet(vertices, edges, env); + + // Assumption: all minimum weight paths are kept + GraphLong, Double, Double ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env); + + // remove the edge + graph.removeEdge(edgeToBeRemoved); + + // configure the iteration + IterationConfiguration parameters = new IterationConfiguration(); + + if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { + +
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643683 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -24,18 +24,24 @@ import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.aggregators.Aggregator; +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.CustomUnaryOperation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; --- End diff -- Could you please also update the description referring to the `withPlainEdges` and `withValuedEdges` methods? This is no longer the case :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-94167112 Hi @andralungu! Thanks for the update :-) I left a few inline comments. Overall it looks good, I just had a little trouble following the logic in `VertexCentricIteration` with your changes. I think it could be simplified a bit, so that all configuration options are set in one place. Other that that, there are some javadocs missing, please make sure to add a description on every public method. One concern I have is whether the `Vertex` class is the right place for the degrees fields and methods. These should be available only inside the iteration methods. In the current implementation one can use the public `setInDegree` method and then retrieve the value, regardless of whether that happens inside an iteration or whether it was set in the configuration. Could we instead create methods inside the `VertexUpdateFunction` and `MessagingFunction`? Something like `getVertexInDegree()` instead of `vertex.getInDegree()`? Or do you have a better idea? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643437 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.MessageIterator; +import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexUpdateFunction; + +/** + * Incremental Single Sink Shortest Paths Example. --- End diff -- hmm this might be a bit misleading, as this program only incrementally updates the shortest paths when removing an edge from the initial graph. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643520 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java --- @@ -189,4 +199,33 @@ public void addBroadcastSetForUpdateFunction(String name, DataSet? data) { public ListTuple2String, DataSet? getMessagingBcastVars() { return this.bcVarsMessaging; } + + // -- + // The direction, degrees and the total number of vertices should be optional. + // The user can access them by setting the direction, degrees or the numVertices options. + // -- --- End diff -- all parameters in this class are optional :-) Can you add javadocs to the new methods, like in the methods above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643695 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -86,7 +92,9 @@ private DataSetVertexVertexKey, VertexValue initialVertices; private IterationConfiguration configuration; - + + private DataSetVertexVertexKey, Tuple3VertexValue, Long, Long verticesWithDegrees; --- End diff -- it seems that this isn't used anywhere? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28644150 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java --- @@ -38,25 +42,57 @@ * @param Message The type of the message sent between vertices along the edges. * @param EdgeValue The type of the values that are associated with the edges. */ -public abstract class MessagingFunctionVertexKey extends ComparableVertexKey Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable implements Serializable { +public abstract class MessagingFunctionVertexKey extends ComparableVertexKey Serializable, + VertexValue extends Serializable, Message, EdgeValue extends Serializable implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices = -1L; + + public long getNumberOfVertices() throws Exception{ + if (numberOfVertices == -1) { + throw new InaccessibleMethodException(The number of vertices option is not set); + } + return numberOfVertices; + } + + void setNumberOfVertices(long numberOfVertices) { + this.numberOfVertices = numberOfVertices; + } + + // + // Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run + // the vertex centric iteration. + // + + private EdgeDirection direction; + + public EdgeDirection getDirection() { + return direction; + } + + public void setDirection(EdgeDirection direction) { --- End diff -- these shouldn't be public either --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643344 --- Diff: docs/gelly_guide.md --- @@ -380,6 +380,16 @@ all aggregates globally once per superstep and makes them available in the next * strongBroadcast Variables/strong: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively. +* strongNumber of Vertices/strong: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. + +The number of vertices can then be accessed in the vertex update function and in the messaging function using the `getNumberOfVertices()` method. + +* strongDegrees/strong: Accessing the in/out degree for a vertex within an iteration. This property can be set using the `setOptDegrees()` method. + +The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex using `vertex.getInDegree()` or `vertex.getOutDegree()`. + +* strongMessaging Direction/strong: The direction in which messages are sent. This can be either EdgeDirection.IN, EdgeDirection.OUT, EdgeDirection.ALL. The messaging direction also dictates the update direction which would be EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, respectively. This property can be set using the `setDirection()` method. + --- End diff -- First, I would highlight the `EdgeDirection.IN` etc. Then, maybe we can rephrase this to make a bit clearer. I would write something like the following: By default a vertex sends messages to its out-neighbors and thus, updates its value based on received messages from its in-neighbors. This configuration option allows to change the messaging direction and set it to [...]. The messaging direction also dictates the update direction. We could also add a couple of figures to illustrate the messaging-update directions. I can create those, so that they're in the same format as the rest of the figures in the guide. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28644120 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.IncrementalSSSPExample; +import org.apache.flink.graph.example.utils.IncrementalSSSPData; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class IncrementalSSSPITCase extends MultipleProgramsTestBase { + + private String verticesPath; + + private String edgesPath; + + private String edgesInSSSPPath; + + private String resultPath; + + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + public IncrementalSSSPITCase(TestExecutionMode mode) { + super(mode); + } + + @Before + public void before() throws Exception { + resultPath = tempFolder.newFile().toURI().toString(); + File verticesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8); + + File edgesFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8); + + File edgesInSSSPFile = tempFolder.newFile(); + Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8); + + verticesPath = verticesFile.toURI().toString(); + edgesPath = edgesFile.toURI().toString(); + edgesInSSSPPath = edgesInSSSPFile.toURI().toString(); + } + + @Test --- End diff -- I would add a few more tests here with a bigger dataset maybe. The example algorithm is quite complex. you only check what happens if you remove an SP-edge is a very small graph. We should at least test what happens when you remove a non-SP-edge and whether the INVALIDATE messages are propagated correctly in the case of a bigger graph. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643494 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java --- @@ -77,7 +77,7 @@ public Double map(VertexK, Double value) { extends VertexUpdateFunctionK, Double, Double { --- End diff -- I really like what the examples look like now! Much better having the Vertex object instead of separated key-value ^^ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643926 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, VertexValue inputData) { if (this.initialVertices == null) { throw new IllegalStateException(The input data set has not been set.); } - + // prepare some type information - TypeInformationVertexVertexKey, VertexValue vertexTypes = initialVertices.getType(); TypeInformationVertexKey keyType = ((TupleTypeInfo?) initialVertices.getType()).getTypeAt(0); TypeInformationTuple2VertexKey, Message messageTypeInfo = new TupleTypeInfoTuple2VertexKey,Message(keyType, messageType); - final int[] zeroKeyPos = new int[] {0}; - - final DeltaIterationVertexVertexKey, VertexValue, VertexVertexKey, VertexValue iteration = - this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos); + // create a graph + GraphVertexKey, VertexValue, EdgeValue graph = + Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment()); - // set up the iteration operator - if (this.configuration != null) { + // check whether the numVertices option is set and, if so, compute the total number of vertices + // and set it within the messaging and update functions - iteration.name(this.configuration.getName( - Vertex-centric iteration ( + updateFunction + | + messagingFunction + ))); - iteration.parallelism(this.configuration.getParallelism()); - iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory()); - - // register all aggregators - for (Map.EntryString, Aggregator? entry : this.configuration.getAggregators().entrySet()) { - iteration.registerAggregator(entry.getKey(), entry.getValue()); + if (this.configuration != null this.configuration.isOptNumVertices()) { + try { + long numberOfVertices = graph.numberOfVertices(); + messagingFunction.setNumberOfVertices(numberOfVertices); + updateFunction.setNumberOfVertices(numberOfVertices); + } catch (Exception e) { + e.printStackTrace(); } } - else { - // no configuration provided; set default name - iteration.name(Vertex-centric iteration ( + updateFunction + | + messagingFunction + )); - } - - // build the messaging function (co group) - CoGroupOperator?, ?, Tuple2VertexKey, Message messages; - MessagingUdfWithEdgeValuesVertexKey, VertexValue, Message, EdgeValue messenger = new MessagingUdfWithEdgeValuesVertexKey, VertexValue, Message, EdgeValue(messagingFunction, messageTypeInfo); - messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger); - - // configure coGroup message function with name and broadcast variables - messages = messages.name(Messaging); - if (this.configuration != null) { - for (Tuple2String, DataSet? e : this.configuration.getMessagingBcastVars()) { - messages = messages.withBroadcastSet(e.f1, e.f0); - } + if(this.configuration != null) { + messagingFunction.setDirection(this.configuration.getDirection()); + } else { + messagingFunction.setDirection(EdgeDirection.OUT); } - - VertexUpdateUdfVertexKey, VertexValue, Message updateUdf = new VertexUpdateUdfVertexKey, VertexValue, Message(updateFunction, vertexTypes); - - // build the update function (co group) - CoGroupOperator?, ?, VertexVertexKey, VertexValue updates = - messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf); - - // configure coGroup update function with name and broadcast variables - updates = updates.name(Vertex State Updates); - if
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r28643920 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java --- @@ -138,69 +146,46 @@ public void setInput(DataSetVertexVertexKey, VertexValue inputData) { if (this.initialVertices == null) { --- End diff -- I find the new logic of this method a bit confusing. Why do you set some configuration parameters here (degrees and direction) and the rest in the helper methods? As far as I understand, you have 2 cases that matter here: the degrees option is set or not. This defines whether you create a simple vertex or you augment it with degree information, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-92301058 Hey @andralungu! Now that #547 is merged, the degrees, total number of vertices and direction can be added as configuration options. This should also simplify writing the tests. Take a look at `VertexCentricConfigurationITCase`. You should be able to cover some of the cases by extending it. I hope rebasing won't be a big pain :-) If an option is not set and the user tries to call one of the methods that isn't computed, then we could display a message to inform them that the corresponding option is not set and how to enable it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-90728415 Hi @vasia , I simply assumed that if the previous examples work, the iteration also works as before. Would you like a specific test suite for that? For the degree option not being set : the getInDegree(); getOutDegree() methods are located inside the Vertex class. Not sure that their availability can be changed by setting the option on the iteration. They are not computed when the degree option is not set, but you can still see the method. Do you have any idea how to make the degrees inaccessible? Once this becomes clear, I can add the tests :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-90233701 Hey @andralungu! Thanks a lot for your work on this :) I only took a quick look at your changes for now, but it seems that all issues are fixed. This PR introduces a lot of changes, so I would like to look into it more carefully. Also, it'd be great if one more person could review. Next, I think we should add a few tests to make sure we don't break existing functionality and that the newly introduced options work as expected. More specifically, I would test the following: - if no direction is given, the iteration works as before, i.e. collecting messages from in-neighbors and sending messages to out-neighbors . - if direction is set to IN / OUT / ALL - if the degrees option is not set, the vertex cannot access the degree, i.e. no method is available or an error is produced - if the degrees option is set, the degrees can be accessed in all supersteps and their values are correct - combination of the above, e.g. set direction to ALL and degrees. I guess adding those tests will be easier after we merge #547 and make the new options part of the configuration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-90061271 @vasia , PR updated :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-87425953 Hi @vasia , 1). I will add the documentation once we reach a consensus. No use documenting something that might change :) 2). For the public setter: I still couldn't think of a workaround. Waiting for suggestions. 3). Most of the comments were addressed in this final commit, however, when speaking about avoiding unnecessary overhead, the only way I could do that is to add another VertexCentricIteration class that operates on Tuple3 and have the user decide from the constructor which class he/she wants to use. To me that seems to introduce a ton of duplicate code which will look very bad. Keeping in mind that I need to store the degrees in the vertex value, any suggestions on how to eliminate this overhead would be very useful. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-87462312 Hi @andralungu! Regarding (2), I think you could do that inside VertexCentricIteration instead of inside a method of Graph. Regarding (3), I agree that adding another VertexCentricIteration wouldn't be a nice solution. Why don't you create the Tuple3 (or whatever you need based on the options) vertex dataset inside `createResult()`? I don't see the need in changing VertexCentricIteration's types / output. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r27344502 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java --- @@ -38,11 +41,41 @@ * @param Message The type of the message sent between vertices along the edges. * @param EdgeValue The type of the values that are associated with the edges. */ -public abstract class MessagingFunctionVertexKey extends ComparableVertexKey Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable implements Serializable { +public abstract class MessagingFunctionVertexKey extends ComparableVertexKey Serializable, + VertexValue extends Serializable, Message, EdgeValue extends Serializable implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices; + + public long getNumberOfVertices() { + return numberOfVertices; + } + + public void setNumberOfVertices(long numberOfVertices) { --- End diff -- Normally it should be private. However, I need to access that setter in Graph's createVertexCentricIteration. If you can think of an alternative, let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/537#issuecomment-86727594 Hey @andralungu! Thanks for this PR. I know it took more effort than you might have thought initially :-) I have a few comments: - In my opinion, the degrees availability and the total number of vertices should be *optional*. Otherwise, we are introducing unnecessary overhead for the majority of cases, i.e. when these are not actually used by an algorithm. - The edge direction option and the degrees / numOfVertices should be *separate extensions* and not tight together, i.e. we should allow a user to choose the edge direction option independently of whether they also want to access the degrees. To be consistent with the rest of the VertexCentricIteration options, we could introduce methods `setMessagingDirection()`, `setInDegreesAvailable()` and so on. - The change should be transparent to user code, i.e. if a user has turned on the option for accessing inDegrees, they should be able to call `vertex.getInDegree()` inside the messaging/update functions, but `vertex.getValue()` should still return the vertex value. In other words, this Tuple3 you're using to store the degrees should be hidden from the public methods. - Finally, we should add documentation for these options, including description of how they work and some illustrative examples. Let me know what you think about the above and whether you have any questions! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r27256989 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java --- @@ -38,11 +41,41 @@ * @param Message The type of the message sent between vertices along the edges. * @param EdgeValue The type of the values that are associated with the edges. */ -public abstract class MessagingFunctionVertexKey extends ComparableVertexKey Serializable, - VertexValue extends Serializable, Message, EdgeValue extends Serializable implements Serializable { +public abstract class MessagingFunctionVertexKey extends ComparableVertexKey Serializable, + VertexValue extends Serializable, Message, EdgeValue extends Serializable implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices; + + public long getNumberOfVertices() { + return numberOfVertices; + } + + public void setNumberOfVertices(long numberOfVertices) { --- End diff -- I guess this one shouldn't be public. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r27257390 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java --- @@ -40,7 +41,22 @@ VertexValue extends Serializable, Message implements Serializable { private static final long serialVersionUID = 1L; - + + // + // Attributes that allow vertices to access their in/out degrees and the total number of vertices + // inside an iteration. + // + + private long numberOfVertices; + + public long getNumberOfVertices() { + return numberOfVertices; + } + + public void setNumberOfVertices(long numberOfVertices) { --- End diff -- same here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r27255825 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -1118,17 +1118,61 @@ public boolean filter(EdgeK, EV edge) { } /** -* Create a Vertex-Centric iteration on the graph. -* +* Create a Vertex-Centric iteration on the graph. inDegrees, outDegrees +* and the total number of vertices are set here in order to be retrieved +* later on within the iteration. +* +* Also, The user can define the direction in which the updates are performed +* and in which the messages are sent. The function below tackles the regular +* use-case where updates of the in-neighbors are used to calculate state +* and messages are sent to out-neighbors. +* * @param vertexUpdateFunction the vertex update function * @param messagingFunction the messaging function * @param maximumNumberOfIterations maximum number of iterations to perform * @return */ - public M VertexCentricIterationK, VV, M, EV createVertexCentricIteration( + public M, NV extends Serializable VertexCentricIterationK, VV, M, EV createVertexCentricIteration( --- End diff -- NV is never used? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1523][gelly] Vertex centric iteration e...
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/537#discussion_r27256784 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java --- @@ -68,12 +70,12 @@ public LabelPropagation(int maxIterations) { public static final class UpdateVertexLabelK extends ComparableK Serializable extends VertexUpdateFunctionK, Long, Long { - public void updateVertex(K vertexKey, Long vertexValue, + public void updateVertex(VertexK, Tuple3Long,Long,Long vertex, --- End diff -- as in the comment below, this should be `VertexK, Long` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---