[GitHub] flink pull request: [FLINK-2634] [gelly] [WIP] Vertex Centric Tria...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1291#issuecomment-172458727 Hi @vasia, This PR contained a bugfix that could have been validated, but I don't mind closing it! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] [WIP] Vertex Centric Tria...
Github user andralungu closed the pull request at: https://github.com/apache/flink/pull/1291 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2879] [docs] Fixed broken links on the ...
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/1348 [FLINK-2879] [docs] Fixed broken links on the architecture page Fixed the links on the general architecture page. They were pointing to non-existing htmls. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink brokenLinks Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1348.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1348 commit 04058f78c3d161f28cd3c90f3c10ad31b495490f Author: andralungu <lungu.an...@gmail.com> Date: 2015-11-11T20:00:36Z [FLINK-2879] [docs] Fixed broken links on the architecture page --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] [WIP] Vertex Centric Tria...
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/1291 [FLINK-2634] [gelly] [WIP] Vertex Centric Triangle Count This PR builds on the code presented in #1105. Basically, the reduceOn* calls are replaced with groupReduceOn* calls. As discussed back then, I made the lib method accept any kind of keys. While doing so, I found a bit of a bug (which is why I marked this as WIP). The groupReduceOnNeighbors function has a version that takes a type argument. The problem with that is: coGroup tries to build a type before the call to `returns()` which means that it cannot infer the type from the info it has. I'll explain this a bit better in a Jira. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink trianglecount-vertexcentric Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1291.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1291 commit 1fbc09ca35e80f61f31cf7a6166f27f2a6f142c1 Author: andralungu <lungu.an...@gmail.com> Date: 2015-10-22T06:18:43Z [FLINK-2634] [gelly] Vertex Centric Triangle Count [FLINK-2634] [gelly] Fixed Type Erasure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
Github user andralungu closed the pull request at: https://github.com/apache/flink/pull/1105 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520] [gelly] Create a Graph from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/1149#discussion_r39947900 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.NullValue; +import org.apache.flink.api.java.ExecutionEnvironment; + +/** + * A class to build a Graph using path(s) provided to CSV file(s) with optional vertex and edge data. + * The class also configures the CSV readers used to read edge and vertex data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags, + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in the {@link org.apache.flink.api.java.io.CsvReader} class. + */ + +public class GraphCsvReader { + + @SuppressWarnings("unused") + private final Path vertexPath, edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader edgeReader; + protected CsvReader vertexReader; + protected MapFunction mapper; + protected Class vertexKey; + protected Class vertexValue; + protected Class edgeValue; + +// + public GraphCsvReader(Path vertexPath, Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.vertexReader = new CsvReader(vertexPath, context); + this.edgeReader = new CsvReader(edgePath, context); + this.mapper = null; + this.executionContext = context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.edgeReader = new CsvReader(edgePath, context); + this.vertexReader = null; + this.mapper = null; + this.executionContext = context; + } + + public <K, VV> GraphCsvReader(Path edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.edgeReader = new CsvReader(edgePath, context); + this.vertexReader = null; + this.mapper = mapper; + this.executionContext = context; + } + + public GraphCsvReader (String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")), + new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context); + } + + + public <K, VV> GraphCsvReader(String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), mapper, context); + } + + /** +* Creates a Graph from CSV input with vertex values and edge values. +*
[GitHub] flink pull request: [FLINK-1520] [gelly] Create a Graph from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/1149#discussion_r39948297 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java --- @@ -110,24 +105,20 @@ public static void main(String [] args) throws Exception { // Emit results if(fileOutput) { resultedVertices.writeAsCsv(outputPath, "\n", ","); - - // since file sinks are lazy, we trigger the execution explicitly - env.execute("Incremental SSSP Example"); } else { resultedVertices.print(); } + env.execute("Incremental SSSP Example"); --- End diff -- I'm not sure whether I am missing something... Why do you add `env.execute()` after `print()`. It's no longer needed. Have a look here: https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/graph/PageRankBasic.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520] [gelly] Create a Graph from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1149#issuecomment-141905104 Hi @vasia, As you said, I already reviewed this :P. I left a couple of comments inline. Please reverify the forwarded fields annotations. If you put them there for one mapper, add them for the others too. Appart from that, it's good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1105#issuecomment-141756735 I've created the two JIRA issues: FLINK-2714 and FLINK-2715. @vasia, if no other objections, can you have another look at this and merge it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1105#issuecomment-140133538 Hey you two :) I suggest that we add this one. Let the JIRA that @vasia opened handle the keys. Then add the DataSet example (another JIRA) and a JIRA for benchmarks. I think that would more or less make everyone happy. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2661] Added Node Splitting Methods
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1124#issuecomment-140131099 Hi @vasia , There was no clear rejection in the discussion, so I thought I'd try my luck :) This technique could be enabled by using a flag only if we introduce a node split version of all the library methods. Otherwise, for newly written code, users would still have to pass a combine function according to their algorithm. They'd also have to pass the threshold that differentiates high-degree vertices from low-degree vertices. What is more, they still need to split and aggregate before and after each step. These are aspects that cannot be guessed and that highly depend on the algorithm. Also, what I had in mind regarding this PR was that we would discuss and try to improve the code a bit, rather than immediately discarding it :( --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2661] Added Node Splitting Methods
Github user andralungu closed the pull request at: https://github.com/apache/flink/pull/1124 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2661] Added Node Splitting Methods
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/1124 [FLINK-2661] Added Node Splitting Methods Social media graphs, citation networks or even protein networks have a common property: their degree distribution follows a power-law curve. This structure raises challenges to both vertex-centric and GSA/GAS models because they uniformly process vertices, regardless of their degree distribution. This leads to large execution time stalls: vertices wait for skewed nodes to finish computation [synchronous]. This PR aims to diminish the impact of high-degree nodes by proposing four main functions: `determinieSkewedVertices`, `treeDeAggregate` (splits a node into subnodes, recursively, in levels), `propagateValuesToSplitVertices` (useful when the algorithm performs more than one superstep), `treeAggregate` (brings the graph back to its initial state). These functions modify a graph at a high-level, making its degree distribution more uniform. The method does not need any special partitioning or runtime modification and (for skewed networks and computationally intensive algorithms) can speed up the run time by a factor of two. I added an example: NodeSplittingJaccardSimilarityMeasure, for which I needed to split the overall sequence of operations to two functions to be able to test the result. Calling the entire main method would have resulted in the "Two few memory segments etc" exception - too many operations called within one test, in other words. For more info, please consult the additional entry in the documentation. If we reach a common point and this PR gets merged, I will also follow @fhueske 's suggestion from the mailing list - adding a Split version of each of the library methods to allow users to verify whether their run time improves. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink splitJaccardFlink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1124.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1124 commit b02d0917edcd5f3c8846fe01044afd7444a58c08 Author: Andra Lungu <lungu.an...@gmail.com> Date: 2015-09-12T10:25:20Z [FLINK-2661] Added Node Splitting Methods [FLINK-2661] Minor modifications in the docs [FLINK-2661] pdf to png --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1105#issuecomment-139820404 If it's okay with you, I'd like to see what @vasia has to say about adding the Triangle Count example from the DataSet API + a reduce as a library method. IMO, it's a better addition, but for some reason, we preferred the Pregel/GSA implementations at a certain point (it was because in my thesis, I take Pregel as a baseline). Also the generic K instead of Long makes perfect sense. However, if we decide to change it, I'll have to open a JIRA to revise the entire set of library methods because apart from PageRank, I think they all restrict themselves to one common key type. I would be kind of scared of type erasure in the K implements Key case :-S --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-139822550 I think it's safe to open a fresh PR... fixing this one would be overkill. You can also update and rebase #923 so that we can review them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/1105#discussion_r39276971 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleCount.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.NeighborsFunctionWithVertexValue; +import org.apache.flink.graph.utils.VertexToTuple2Map; +import org.apache.flink.types.NullValue; +import org.apache.flink.util.Collector; + +import java.util.Iterator; +import java.util.TreeMap; + +/** + * Triangle Count Algorithm. + * + * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs + * and send messages to them. Each received message is then propagated to neighbors with higher id. + * Finally, if a node encounters the target id in the list of received messages, it increments the number + * of triangles found. + * + * For skewed graphs, we recommend calling the GSATriangleCount library method as it uses the more restrictive + * `reduceOnNeighbors` function which internally makes use of combiners to speed up computation. + * + * This implementation is non - iterative. + * + * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of + * Tuple1 which contains a single integer representing the number of triangles. + */ +public class TriangleCount implements + GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1>> { --- End diff -- For the Tuple1 comment: I wanted to be consistent with the other examples. If you run it on a cluster, you'd like the result printed in a file. You could print it with regular Java functions, but `writeAsCsv` and `print` were more appealing (to me at least) :) For the key type: At some point, you check that your id is lower than the neighbors' ids... How would that comparison be made if you don't know the type. If let's say you received String, you'd need to do `Long.parseLong` If you know a way, I'd gladly change it, but I don't think this is the only library method that has the key type problem... Even ConnectedComponents uses Long. Maybe we can open a JIRA for that... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1105#issuecomment-139564327 The only argument I have for this particular code (as opposed to the one in the DataSet API) is that his one simulates what Pregel is doing... But since it's a library method, it should be used as is, without caring about the model. IMO, the discussion in #1054 asked for this particular version, but if there is a consensus I can change it to use the more efficient one... or even close the PR and point people to the DataSet API... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-138497145 Since the bugfix version was released days ago, I guess this is safe to merge... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2634] [gelly] Added a vertex-centric Tr...
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/1105 [FLINK-2634] [gelly] Added a vertex-centric Triangle Count Lib Method This PR adds a vertex centric version of the TriangleCount algorithm as discussed in #1054. The main difference is that the reduceOn* calls have been replaced with groupReduceOn* calls. Depending on the input graph, one may need to use the GAS version (skewed networks) or the vertex-centric version. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink tcVertexCentric Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1105.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1105 commit 6f24c6918d7de4cd5c8fdced7248836a4704ceb3 Author: Andra Lungu <lungu.an...@gmail.com> Date: 2015-09-08T12:15:23Z [FLINK-2634] [gelly] Added a vertex-centric Triangle Count Library Method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1058#issuecomment-138692441 It seems the issue was already fixed via #1075 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...
Github user andralungu closed the pull request at: https://github.com/apache/flink/pull/1058 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1058#issuecomment-135407258 so you mean something like `ListTuple2Integer, Long offsets = getRuntimeContext().getBroadcastVariableWithInitializer(counts, new ArrayList());`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1058#issuecomment-135396805 Yup that's what I thought as well. However, I only tested `zipWithUniqueId` on a cluster. A student came to me and said that `zipWithIndex` did not work in a cluster environment (I believed him) and claimed some days after via the Jira issue that this was the fix that worked for him. That's all the input I have. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134847386 okay... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2512] [bugfix] Used a concurrent list i...
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/1058 [FLINK-2512] [bugfix] Used a concurrent list in the open method This PR fixes the concurrency issue raised by the elements in the broadcast set. I instantiated the list to be a CopyOnWriteArrayList. Hope that fixes the issue! You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink zipWithIndexBugFix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1058.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1058 commit 8e81256d6d10140d180762c118ef4bd575300dd3 Author: Andra Lungu lungu.an...@gmail.com Date: 2015-08-26T11:26:32Z [FLINK-2512] [bugfix] Used a concurrent list in the open method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1058#issuecomment-135042510 I guess because more parallel instances are trying to access that broadcast variable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2512] [bugfix] Used a concurrent list i...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1058#issuecomment-135027923 excerpt from the JIRA This can be fixed by wrapping a concurrent list around the counts variable e.g. CopyOnWriteArrayList from java.util.concurrrent (in the open method) Stood there open for a while so I thought I fixed it. Don't really know what test should be written. The use case is the same, the environment just goes from local to cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] [bugfix] Used a concurrent list i...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1058#issuecomment-135035150 Yes, I am sorry about that. It was a propagated typo. Problem solved! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134590137 You're right @tillrohrmann. I updated the JIRA to also contain a small description :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2563] [gelly] changed Graph's run() to ...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1042#issuecomment-134516708 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1054#issuecomment-134702560 Hi @vasia, I clarified the type of input expected. The graph should be undirected. Without the distinct, you get duplicate edges there(and an erroneous number of triangles). The second bullet point is again not an issue because the graph is undirected. The result should be fine. For the SNAP data sets, I got a number equal to theirs on a cluster. Concerning the runtime, you are right, It's just true for some cases (generally faster by a factor of two) but it highly depends on the data set. So, once this gets merged, I'll go ahead and propose the vertex centric version as well. That way, the user can choose. Hope I clarified everything! Let me know if you still have questions :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2570] [gelly] Added a Triangle Count Li...
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/1054 [FLINK-2570] [gelly] Added a Triangle Count Library Method This PR adds a Triangle Count Library Method to Gelly. I decided to add the GAS version because it is faster than vertex-centric for most graph data sets. This implementation has been extensively tested on a 30-node, 16GB RAM/machine, cluster. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink triangleCount Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1054.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1054 commit 99ef9720f5c70ab2029b21017d6b9d3100c827e8 Author: Andra Lungu lungu.an...@gmail.com Date: 2015-08-25T12:07:18Z [FLINK-2570] [gelly] Added a Triangle Count Library Method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2563] [gelly] changed Graph's run() to ...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1042#issuecomment-133881425 Exactly what I had in mind. +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/1000#issuecomment-130002982 Apart from the minor comment, this looks good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2451] [gelly] examples and library clea...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/1000#discussion_r36779494 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java --- @@ -86,26 +72,35 @@ public void testConnectedComponents() throws Exception { @Test public void testSingleSourceShortestPaths() throws Exception { - GSASingleSourceShortestPaths.main(new String[]{1, edgesPath, resultPath, 16}); - expectedResult = 1 0.0\n + - 2 12.0\n + - 3 13.0\n + - 4 47.0\n + - 5 48.0\n + - 6 Infinity\n + - 7 Infinity\n; + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + GraphLong, Double, Double inputGraph = Graph.fromDataSet( + SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), + new InitMapperSSSP(), env); + +ListVertexLong, Double result = inputGraph.run(new GSASingleSourceShortestPathsLong(1l, 16)) + .getVertices().collect(); --- End diff -- Is there a specific reason for which we decided to use `collect()` in the tests? It does not seem to be a consistency issue. The other tests (in flink) are still using `compareResultsByLinesInMemory()`. Do we gain anything? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-Gelly] [example] added missing assumpti...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/883#issuecomment-130003492 I guess the only request that was not fulfilled for this PR was to squash the commits. Then it should be fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-130014300 Saw this I will also update the documentation afterwards... Sorry! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-130013958 Hi @vasia, Not sure whether this comment was issued for me... Nevertheless I left some suggestions inline. All in all, it covers the problems discussed in the 73! comments here. You forgot to properly document the edgeTypes(K, EV), etc methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Hits
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/765#issuecomment-130016029 @vasia , I guess the guys already finished the IMPRO3 course. What is the status of this PR @mfahimazizi? Do you have time to address the comments? Are you still interested in contributing? We would be happy to answer your questions :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2452] [Gelly] adds a playcount threshol...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/968#issuecomment-128456801 Nope, no comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2375] Add Approximate Adamic Adar Simil...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/923#issuecomment-122868758 Could you also do a git ammend to reference the correct jira issue in the commit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-123153943 Hi @shghatge , Did you also run the two examples on the cluster to make sure that the approximate version is faster? Then you could also add some numbers to the two PRs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121215203 Hmmm :-? but can you pass NullValue to tyes... it expects Something.class. Can it be overwritten without type erasure getting in the way? Anyway... I will let @shghatge take over from here :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-120856171 Hi, I just had a closer look at this PR and it made me seriously question the utility of a `Graph.fromCSV` method. Why? First of all because it's more limited than the regular `env.fromCsv()` in the sense that it does not allow POJOs and it would be a bit tedious to support that. There would be a need for methods with 2 to n fields, according to the amount of attributes present in the POJO. Second, because, and I am speaking strictly as a user here, I would rather write: private static DataSetEdgeLong, Double getEdgesDataSet(ExecutionEnvironment env) { if(fileOutput) { return env.readCsvFile(edgeInputPath) .ignoreComments(#) .fieldDelimiter(\t) .lineDelimiter(\n) .types(Long.class, Long.class, Double.class) .map(new Tuple3ToEdgeMapLong, Double()); } else { return CommunityDetectionData.getDefaultEdgeDataSet(env); } } than... private static GraphLong, Long, Double getGraph(ExecutionEnvironment env) { GraphLong, Long, Double graph; if(!fileOutput) { DataSetEdgeLong, Double edges = CommunityDetectionData.getDefaultEdgeDataSet(env); graph = Graph.fromDataSet(edges, new MapFunctionLong, Long() { public Long map(Long label) { return label; } }, env); } else { graph = Graph.fromCsvReader(edgeInputPath,new MapFunctionLong, Long() { public Long map(Long label) { return label; } }, env).ignoreCommentsEdges(#) .fieldDelimiterEdges(\t) .lineDelimiterEdges(\n) .typesEdges(Long.class, Double.class) .typesVertices(Long.class, Long.class); } return graph; } Maybe it's just a preference thing... but I believe it's at least worth a discussion. On the other hand, the utility of such a method should have been questioned from its early Jira days, so I guess that's my mistake. I would like to hear your thoughts on this. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121032514 Hi @vasia, I also saw the types issue, but I had a feeling that this is the way it was decided in the previous comment. I would rather have different names for 2 and 3 than to force a call to `typeVertices` if it's not needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-121039562 Yes, but then you would have the following methods: `types`, `typesNoEdgeValue`, `typesNoVertexValue` and again `types`. So, even if it's not 100% needed I'd try to keep it consistent. We could also make it more graph-oriented (the name `types` was generic). The following is just an example: 1). keyType(K) 2). keyAndVertexTypes(K, VV) 3). keyAndEdgeTypes(K, EV) 4). keyVertexAndEdgeTypes(K, VV, EV) With a nice documentation, I think I'd understand what these are for :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r34504887 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +@SuppressWarnings({unused , unchecked}) +public class GraphCsvReaderK,VV,EV { + + private final Path vertexPath,edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + protected ClassK vertexKey; + protected ClassVV vertexValue; + protected ClassEV edgeValue; + +// + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath,final MapFunctionK, VV mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, The file path may not be null.)), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, The file path may not be null.)), + new Path(Preconditions.checkNotNull(edgePath, The file path may not be null.)), context); + } + + + public GraphCsvReader (String edgePath, final MapFunctionK, VV mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, The file path may not be null.)),mapper, context); + } + + // + /** +* Specifies the types for the edges fields and returns this instance of GraphCsvReader
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r34504919 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +@SuppressWarnings({unused , unchecked}) +public class GraphCsvReaderK,VV,EV { + + private final Path vertexPath,edgePath; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + protected ClassK vertexKey; + protected ClassVV vertexValue; + protected ClassEV edgeValue; + +// + public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) { + this.vertexPath = vertexPath; + this.edgePath = edgePath; + this.VertexReader = new CsvReader(vertexPath,context); + this.EdgeReader = new CsvReader(edgePath,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path edgePath,final MapFunctionK, VV mapper, ExecutionEnvironment context) { + this.vertexPath = null; + this.edgePath = edgePath; + this.EdgeReader = new CsvReader(edgePath,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String edgePath,ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, The file path may not be null.)), context); + + } + + public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(vertexPath, The file path may not be null.)), + new Path(Preconditions.checkNotNull(edgePath, The file path may not be null.)), context); + } + + + public GraphCsvReader (String edgePath, final MapFunctionK, VV mapper, ExecutionEnvironment context) { + this(new Path(Preconditions.checkNotNull(edgePath, The file path may not be null.)),mapper, context); + } + + // + /** +* Specifies the types for the edges fields and returns this instance of GraphCsvReader
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-120654787 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/892#discussion_r34189267 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData; +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Adamic Acard similarity coefficient which is given as + * Summation of weights of common neighbors of the source and destination vertex + *The weights are given as 1/log(nK) nK is the degree or the vertex + * See a href=http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf;Friends and neighbors on the Web/a + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code AdamicAdarSimilarityMeasure lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class AdamicAdarSimilarityMeasure implements ProgramDescription { + + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = AdamicAdarSimilarityMeasure.getEdgesDataSet(env); + + /*Graph is generated without vertex weights. Vertices will have a Tuple2 as value + where first field will be the weight of the vertex as 1/log(kn) kn is the degree of the vertex + Second field is a HashSet whose elements are again Tuple2, of which first field is the VertexID of the neighbor + and second field is the weight of that neighbor + */ + GraphLong, Tuple2Double, HashSetTuple2Long, Double, Double graph = + Graph.fromDataSet(edges,new mapVertices(), env); + + DataSetTuple2Long, Long degrees = graph.getDegrees(); + + //vertices are given weights in graph + graph = graph.joinWithVertices(degrees,new AssignWeightToVertices()); + + //neighbors are computed for all the vertices + DataSetTuple2Long, Tuple2Double, HashSetTuple2Long, Double computedNeighbors = + graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + //graph is updated with the new vertex values + GraphLong, Tuple2Double, HashSetTuple2Long, Double, Double graphWithVertexValues = + graph.joinWithVertices(computedNeighbors, new UpdateGraphVertices()); + + //edges are given Adamic Adar coefficient as value + DataSetEdgeLong, Double edgesWithAdamicValues
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/892#discussion_r34189351 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData; +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Adamic Acard similarity coefficient which is given as + * Summation of weights of common neighbors of the source and destination vertex + *The weights are given as 1/log(nK) nK is the degree or the vertex + * See a href=http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf;Friends and neighbors on the Web/a + * + * p + * Input files are plain text files and must be formatted as follows: + * br + * Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs. + * Edges themselves are separated by newlines. + * For example: code12\n13\n/code defines two edges 1-2 and 1-3. + * /p + * + * Usage code AdamicAdarSimilarityMeasure lt;edge pathgt; lt;result pathgt;/codebr + * If no parameters are provided, the program is run with default data from + * {@link org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData} + */ +@SuppressWarnings(serial) +public class AdamicAdarSimilarityMeasure implements ProgramDescription { + + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetEdgeLong, Double edges = AdamicAdarSimilarityMeasure.getEdgesDataSet(env); + + /*Graph is generated without vertex weights. Vertices will have a Tuple2 as value + where first field will be the weight of the vertex as 1/log(kn) kn is the degree of the vertex + Second field is a HashSet whose elements are again Tuple2, of which first field is the VertexID of the neighbor + and second field is the weight of that neighbor + */ + GraphLong, Tuple2Double, HashSetTuple2Long, Double, Double graph = + Graph.fromDataSet(edges,new mapVertices(), env); + + DataSetTuple2Long, Long degrees = graph.getDegrees(); + + //vertices are given weights in graph + graph = graph.joinWithVertices(degrees,new AssignWeightToVertices()); + + //neighbors are computed for all the vertices + DataSetTuple2Long, Tuple2Double, HashSetTuple2Long, Double computedNeighbors = + graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL); + + //graph is updated with the new vertex values + GraphLong, Tuple2Double, HashSetTuple2Long, Double, Double graphWithVertexValues = + graph.joinWithVertices(computedNeighbors, new UpdateGraphVertices()); + + //edges are given Adamic Adar coefficient as value + DataSetEdgeLong, Double edgesWithAdamicValues
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/892#issuecomment-119708667 Code-style freak says everything else is fine --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2310] Add an Adamic Adar Similarity exa...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/892#discussion_r34189005 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/AdamicAdarSimilarityMeasure.java --- @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.ReduceNeighborsFunction; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.Triplet; +import org.apache.flink.graph.example.utils.AdamicAdarSimilarityMeasureData; +import java.util.HashSet; + +/** + * Given a directed, unweighted graph, return a weighted graph where the edge values are equal + * to the Adamic Acard similarity coefficient which is given as + * Summation of weights of common neighbors of the source and destination vertex + *The weights are given as 1/log(nK) nK is the degree or the vertex + * See a href=http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf;Friends and neighbors on the Web/a --- End diff -- @see, not just see :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1745] [ml] [WIP] Add exact k-nearest-ne...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/696#discussion_r33924163 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/KNN.scala --- @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.classification + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.DataSetUtils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.{DistanceMetric, EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * This algorithm calculates `k` nearest neighbor points in training set for each points of + * testing set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.classification.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.classification.KNN.DistanceMetric]] + * Sets the distance metric to calculate distance between two points. If no metric is specified, + * then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. (Default value: + * '''EuclideanDistanceMetric()''') + * + */ +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[Vector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k 1, K must be positive.) +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/ + def setDistanceMetric(metric: DistanceMetric): KNN = { +parameters.add(DistanceMetric, metric) +this + } + + /** Sets the number of data blocks/partitions +* @param n the number of data blocks +*/ + def setBlocks(n: Int): KNN = { +require(n 1, Number of blocks must be positive.) +parameters.add(Blocks, n) +this + } +} + +object KNN { + + case object K extends Parameter[Int] { +val defaultValue: Option[Int] = None + } + + case object DistanceMetric extends Parameter[DistanceMetric] { +val defaultValue: Option[DistanceMetric] = Some(EuclideanDistanceMetric()) + } + + case object Blocks extends Parameter[Int] { +val defaultValue: Option[Int] = None + } + + def apply(): KNN = { +new KNN() + } + + /** [[FitOperation]] which trains a KNN based on the given training data set. +* @tparam T Subtype of [[Vector]] +*/ + implicit def fitKNN[T : Vector : TypeInformation] = new FitOperation[KNN, T
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-118891421 I see the requirements have been fulfilled here. If no objections, I'd like to merge this by the end of the week :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/801#issuecomment-118171270 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: My branch
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/881#issuecomment-118177927 I am a bit out of context here, but I believe this PR should be deleted :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-118179979 Hihi! Shivani is updating the PR on the fly :) Reminds me of myself back in the day ;) If I may tease: If these GSA parameters did not have an example, I would have asked for one. So my two cents: shouldn't we, instead of deleting completely, replace the example with a better one? IncrementalGSASSSP :)) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-117540482 Hi @shghatge , This PR looks very nice; I added some minor inline comments to make it look even nicer :) One major problem: you have an example and a test for it; however, you don't have tests for the GSAConfiguration itself; check https://github.com/apache/flink/blob/master/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java You need to make sure that given no direction; the iteration works as before and if you give it direction IN or ALL, it behaves as it is supposed to. Apart from that, this looks almost spotless. One other minor thing; when you make the PR, in order to have it uber - clean you should squash your commits. In this case, you have 3 commits, so: git rebase -i HEAD~3 should do the trick Then leave pick for the first commit; squash for the others and let the instructions guide you (Ctrl-x; Y, etc...). Then force push. The common practice afterwards (i.e. after you address my comments here) is to leave it as a new commit (don't squash!). So, first squash these 3, then fix the minor issues and commit again. Awesome work! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/877#discussion_r33657016 --- Diff: docs/libs/gelly_guide.md --- @@ -693,6 +693,9 @@ Currently, the following parameters can be specified: * strongNumber of Vertices/strong: Accessing the total number of vertices within the iteration. This property can be set using the `setOptNumVertices()` method. The number of vertices can then be accessed in the gather, sum and/or apply functions by using the `getNumberOfVertices()` method. If the option is not set in the configuration, this method will return -1. +* strongNeighbor Direction/strong: By Default values are gathered from the out neighbors of the Vertex. This can be modified --- End diff -- default without capital D, set direction should be written without capital s and in between inverted commas (i.e. `setDirection()`) It would be nice to have an example in the docs, showing users how to play with this option; check the number of vertices example for inspiration :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/877#discussion_r33657118 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAExistenceOfPaths.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.PathExistenceData; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.GSAConfiguration; + + +import java.util.HashSet; + +/** + * This example implements a program in which we find out the vertices for which there exists a path from given vertex + * + * The edges input file is expected to contain one edge per line, with long IDs and long values + * The vertices input file is expected to contain one vertex per line with long IDs and no value + * If no arguments are provided, the example runs with default data for this example + */ +public class GSAExistenceOfPaths implements ProgramDescription { + + @SuppressWarnings(serial) + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + DataSetTuple3Long, Long, Long edges = getEdgesDataSet(env); + DataSetTuple2Long, HashSetLong vertices = getVerticesDataSet(env); + + GraphLong, HashSetLong, Long graph = Graph.fromTupleDataSet(vertices, edges, env); + + + GSAConfiguration parameters = new GSAConfiguration(); + parameters.setDirection(EdgeDirection.IN); + // Execute the GSA iteration + GraphLong, HashSetLong, Long result = graph.runGatherSumApplyIteration(new GetReachableVertices(), + new FindAllReachableVertices(), --- End diff -- leave just one blank line here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/877#discussion_r33657394 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAExistenceOfPaths.java --- @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.example; + +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.EdgeDirection; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.example.utils.PathExistenceData; +import org.apache.flink.graph.gsa.ApplyFunction; +import org.apache.flink.graph.gsa.GatherFunction; +import org.apache.flink.graph.gsa.Neighbor; +import org.apache.flink.graph.gsa.SumFunction; +import org.apache.flink.graph.gsa.GSAConfiguration; + + +import java.util.HashSet; + +/** + * This example implements a program in which we find out the vertices for which there exists a path from given vertex + * + * The edges input file is expected to contain one edge per line, with long IDs and long values + * The vertices input file is expected to contain one vertex per line with long IDs and no value + * If no arguments are provided, the example runs with default data for this example + */ +public class GSAExistenceOfPaths implements ProgramDescription { + + @SuppressWarnings(serial) --- End diff -- not sure you need this annotation... could you run the example without it and see whether you get a warning? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2141] Allow GSA's Gather to perform thi...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/877#issuecomment-117785914 LGTM :) @vasia ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-117787551 Nice and rebased. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/801#issuecomment-117112526 Okay, can I get some +1s to merge this :)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
Github user andralungu closed the pull request at: https://github.com/apache/flink/pull/832 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2150] Added zipWithUniqueIds
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/801#issuecomment-116287334 Hi @thvasilo , Sorry for the late reply... I stated in the `zipWithIndex` PR that I am waiting for that to be merged before I update this one :). So, here it is! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354071 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -282,6 +282,54 @@ public void flatMap(EdgeK, EV edge, CollectorTuple1K out) { } /** + * Creates a graph from CSV files. + * + * Vertices with value are created from a CSV file with 2 fields + * Edges with value are created from a CSV file with 3 fields + * from Tuple3. + * + * @param verticesPath path to a CSV file with the Vertices data. + * @param edgesPath path to a CSV file with the Edges data + * @param context the flink execution environment. + * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the --- End diff -- on which calling not which on ... or which on calling the types method specifies (not to specify) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354597 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReaderK,VV,EV{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { --- End diff -- again the bracket issue :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354535 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReaderK,VV,EV{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { --- End diff -- Also, @vasia was talking about some general coding style rules in one of the previous comments... The way we add the opening block brackets must be consistent. So here, after public GraphCsvReader(...) { //open the bracket on the same line. Please look in the rest of the document for similar issues... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33353966 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -282,6 +282,54 @@ public void flatMap(EdgeK, EV edge, CollectorTuple1K out) { } /** + * Creates a graph from CSV files. + * + * Vertices with value are created from a CSV file with 2 fields + * Edges with value are created from a CSV file with 3 fields + * from Tuple3. --- End diff -- there is a trailing from Tuple3 here... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354571 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReaderK,VV,EV{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; --- End diff -- again, the path1, path2 issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354654 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReaderK,VV,EV{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunctionK, VV mapper, ExecutionEnvironment context) + { + this.path1=null; --- End diff -- here it's this,path1 = null; for consistency with the rest. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33355251 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java --- @@ -150,20 +149,15 @@ private static boolean parseParameters(String[] args) { } @SuppressWarnings(serial) - private static DataSetEdgeLong, NullValue getEdgesDataSet(ExecutionEnvironment env) { - if (fileOutput) { - return env.readCsvFile(edgesInputPath) - .lineDelimiter(\n).fieldDelimiter(\t) - .types(Long.class, Long.class).map( - new MapFunctionTuple2Long, Long, EdgeLong, NullValue() { - - public EdgeLong, NullValue map(Tuple2Long, Long value) { - return new EdgeLong, NullValue(value.f0, value.f1, - NullValue.getInstance()); - } - }); - } else { - return ExampleUtils.getRandomEdges(env, NUM_VERTICES); + private static GraphLong, NullValue, NullValue getGraph(ExecutionEnvironment env) { + if(fileOutput) { + return Graph.fromCsvReader(edgesInputPath, env).lineDelimiterEdges(\n).fieldDelimiterEdges(\t) + .types(Long.class); + + } + else + { + return Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env); --- End diff -- Yup... so I like how this looks better than how the previous rewritings were made... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-115689621 Hi @shghatge , I left my set of comments inline. They are mostly related to coding style issues. I guess you should revisit the previous comments here. Also, don't forget to rebase. It seems like there are some merge conflicts that need to be fixed :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354695 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReaderK,VV,EV{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) + { + this.path1 = path1; + this.path2 = path2; + this.VertexReader = new CsvReader(path1,context); + this.EdgeReader = new CsvReader(path2,context); + this.mapper=null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = null; + this.executionContext=context; + } + + public GraphCsvReader(Path path2,final MapFunctionK, VV mapper, ExecutionEnvironment context) + { + this.path1=null; + this.path2 = path2; + this.EdgeReader = new CsvReader(path2,context); + this.VertexReader = null; + this.mapper = mapper; + this.executionContext=context; + } + + public GraphCsvReader (String path2,ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, The file path may not be null.)), context); + + } + + public GraphCsvReader(String path1, String path2, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path1, The file path may not be null.)),new Path(Preconditions.checkNotNull(path2, The file path may not be null.)), context); + } + + + public GraphCsvReader (String path2, final MapFunctionK, VV mapper, ExecutionEnvironment context) + { + this(new Path(Preconditions.checkNotNull(path2, The file path may not be null.)),mapper, context); + } + + public CsvReader getEdgeReader() + { + return this.EdgeReader; + } + + public CsvReader getVertexReader() + { + return this.VertexReader; + } + // + + /** +* Specifies the types for the Graph fields and returns a Graph with those field types +* +* This method
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354136 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -282,6 +282,54 @@ public void flatMap(EdgeK, EV edge, CollectorTuple1K out) { } /** + * Creates a graph from CSV files. + * + * Vertices with value are created from a CSV file with 2 fields + * Edges with value are created from a CSV file with 3 fields + * from Tuple3. + * + * @param verticesPath path to a CSV file with the Vertices data. + * @param edgesPath path to a CSV file with the Edges data + * @param context the flink execution environment. + * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , which on calling types() method to specify types of the + *Vertex ID, Vertex Value and Edge value returns a Graph + */ + public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) { + return new GraphCsvReader(verticesPath, edgesPath, context); + } + /** Creates a graph from a CSV file for Edges., Vertices are --- End diff -- ... Edges. \n (right now it\s .,) Vertices --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33355373 --- Diff: flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java --- @@ -156,4 +181,17 @@ public DummyCustomType map(Long vertexId) { return new DummyCustomType(vertexId.intValue()-1, false); } } + + private FileInputSplit createTempFile(String content) throws IOException { + File tempFile = File.createTempFile(test_contents, tmp); + tempFile.deleteOnExit(); --- End diff -- `deleteOnExit()`... nice! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33354309 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph; +import com.google.common.base.Preconditions; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.io.CsvReader; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.Path; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.types.NullValue; +/** + * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data + * The class also configures the CSV readers used to read edges(vertices) data such as the field types, + * the delimiters (row and field), the fields that should be included or skipped, and other flags + * such as whether to skip the initial line as the header. + * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class. + */ +public class GraphCsvReaderK,VV,EV{ + + private final Path path1,path2; + private final ExecutionEnvironment executionContext; + protected CsvReader EdgeReader; + protected CsvReader VertexReader; + protected MapFunctionK, VV mapper; + +// + + public GraphCsvReader(Path path1,Path path2, ExecutionEnvironment context) --- End diff -- let's not call these path1 and path2. I suggest we use better names like edgePath, vertexPath... This is valid for the methods underneath too... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/847#discussion_r33355180 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java --- @@ -119,23 +113,32 @@ private static boolean parseParameters(String [] args) { return true; } - @SuppressWarnings(serial) - private static DataSetEdgeLong, NullValue getEdgesDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgeInputPath) - .ignoreComments(#) - .fieldDelimiter(\t) - .lineDelimiter(\n) - .types(Long.class, Long.class) - .map(new MapFunctionTuple2Long, Long, EdgeLong, NullValue() { - @Override - public EdgeLong, NullValue map(Tuple2Long, Long value) throws Exception { - return new EdgeLong, NullValue(value.f0, value.f1, NullValue.getInstance()); + private static GraphLong, Long, NullValue getGraph(ExecutionEnvironment env) + { + GraphLong, Long, NullValue graph; + if(!fileOutput) + { + DataSetEdgeLong, NullValue edges = ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env); --- End diff -- Let's also keep this consistent. In Single Source Shortest Paths you read fromDataSet(getDefault..., env). maybe we could do that for all the examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/832#issuecomment-115134406 Hey Theo, Thanks a lot for finding my bug there ^^ PR updated to address the Java issues and to contain a pimped Scala version of `zipWithIndex` :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/832#issuecomment-114883882 Uhmmm... flink.api.scala is imported. That's not the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/832#issuecomment-114878728 Actually, I get a weird compile error: it says missing Type parameter for the map in DataSet.scala... I think this is because the map is overriden... and I haven't found the workaround just yet... (The error is reproducible by calling `testZipWithIndex`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/832#issuecomment-114481973 Hey @tillrohrmann , Sorry for the incredibly late reply. The last weeks have been very hectic. Nevertheless, I'd like to properly finish and polish this issue very soon. For that: I have addressed the Java comments, but I still have to provide support for Scala. I love this task because it really takes me out of my comfort zone: Gelly and Java. It's no secret that Scala is not my strongest point. Therefore, I'd like to use this thread to ask some rather trivial questions: Before defining implicit methods and using pimp-my-lib, I need to wrap the Java function. Which should be easy right? Since there is a `wrap` method. This being said, in org.apache.flink.api.scala, I created a DataSetUtils class. and wanted to call wrap(ju.countElements...). Apparently it does not let me. Can someone help me out with that? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1962] Add Gelly Scala API
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/808#issuecomment-114420568 Thanks @aljoscha for reminding me... This slipped my mind for a sec. I agree that the methods should have Scaladoc (this should be straightforward to add starting from the existing Javadoc). What would also be nice, IMO would be to update the docs so that they would also contain a Scala version of the example code snippets, similar to the programming guide, for example (http://ci.apache.org/projects/flink/flink-docs-release-0.8/programming_guide.html). Furthermore, the newly added methods (e.g. `addVertices`) are missing. However, that should be a separate Jira. Apart from the formatting issue and the documentation issue, everything looks nice and clean :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2149][gelly] Simplified Jaccard Example
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/770#issuecomment-113752297 Meging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113483077 Mea culpa. No the mapper test is fine; For the examples comment, I meant to go through the classes in the example folder and to modify the way the graph is currently read. Right now, we fetch the edges via `env.fromCsv` and then use `Graph.fromDataSet` to create the graph. We should do it directly via Graph.fromCsv. The example in the docs is fine, because it explains how fromDataSet works. That is still available. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-113149078 Hi @shghatge , Don't forget to remove the definition for the public removeVertices(DataSet) from the documentation. Up for discussion: should we keep the name removeVertices for the private, helper method or should we call it something else, like removeVerticesAndEdges... Names are not my strongest point, but I guess you got the idea :) Personally, I am fine with the current name! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113213190 This looks very nice! Someone deserves a virtual :ice_cream: ! There are some tests missing: - test `fromCSV` with a Mapper - you just test `types`, `ignoreFirstLines` and `ignoreComments`; let's at least add tests for the `lineDelimiter*` and the `fieldDelimiter*` methods. I'm sure they work, but tests are written to guarantee that the functionality will also be there (at the same quality) in the future (i.e. some exotic code addition will not break it) :) I saw an outdated Vasia comment on an unused import; always hit mvn verify before pushing - it would have caught that :D --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520]Read edges and vertices from CSV f...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/847#issuecomment-113217602 Ah! And I just remembered! Maybe it makes sense to update the examples to use `fromCSV` when creating the Graph instead of `getEdgesDataSet`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1520] Read edges and vertices from CSV ...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/846#issuecomment-112783592 Hi @shghatge , It seems we have a bit of a mess in this PR. Nothing that cannot be fixed. Let's take it offline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2178][gelly] Fixed groupReduceOnNeighbo...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/799#issuecomment-112373119 PR updated to test the same issue in ApplyCoGroupFunctionOnAllEdges and ApplyCoGroupFunction. Also added tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/832#discussion_r32410253 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/utils/DataSetUtils.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; + +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * This class provides simple utility methods for zipping elements in a file with an index. + * + * @param T The type of the DataSet, i.e., the type of the elements of the DataSet. + */ +public class DataSetUtilsT { + + /** +* Method that goes over all the elements in each partition in order to retireve +* the total number of elements. +* +* @param input the DataSet received as input +* @return a data set containing tuples of subtask index, number of elements mappings. +*/ + public DataSetTuple2Integer, Long countElements(DataSetT input) { + return input.mapPartition(new RichMapPartitionFunctionT, Tuple2Integer,Long() { + @Override + public void mapPartition(IterableT values, CollectorTuple2Integer, Long out) throws Exception { + long counter = 0; + for(T value: values) { + counter ++; + } + + out.collect(new Tuple2Integer, Long(getRuntimeContext().getIndexOfThisSubtask(), counter)); + } + }); + } + + /** +* Method that takes a set of subtask index, total number of elements mappings +* and assigns ids to all the elements from the input data set. +* +* @param input the input data set +* @return a data set of tuple 2 consisting of consecutive ids and initial values. +*/ + public DataSetTuple2Long, T zipWithIndex(DataSetT input) { --- End diff -- Yes, I thought about making the method static. However, if we move it to DataSet, we would need to call it from an instance. Would you do DataSet.zipWithIndex()? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/818#discussion_r32415140 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -1151,6 +1151,23 @@ public void coGroup(IterableVertexK, VV vertex, IterableVertexK, VV vert } } + + public GraphK, VV, EV removeVertices(DataSetVertexK, VV verticesToBeRemoved){ --- End diff -- Yes, this is definitely the idea, but right now you are duplicating a lot of code. Can we find a smarter way (i.e. that requires as little code duplication as possible) :)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/818#discussion_r32415176 --- Diff: flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java --- @@ -1151,6 +1151,23 @@ public void coGroup(IterableVertexK, VV vertex, IterableVertexK, VV vert } } + --- End diff -- always add Javadoc to new methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user andralungu commented on a diff in the pull request: https://github.com/apache/flink/pull/818#discussion_r32414981 --- Diff: docs/libs/gelly_guide.md --- @@ -240,6 +240,7 @@ GraphLong, Double, Double networkWithWeights = network.joinWithEdgesOnSource(v img alt=Union Transformation width=50% src=fig/gelly-union.png/ /p +* strongDifference/strong: Gelly's `difference()` method performs a difference on the vertex and edge sets of the input graphs. The resultant graph is formed by removing the common vertices and edges from the graph. --- End diff -- I think @vasia also wanted you to update the description for union ^^ Now, this still looks a bit unclear. It seems that there are two input graphs. You should make it obvious that the current graph gets differentiated with an input graph. That way, you won't leave room for comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2093][gelly] Added difference Method
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/818#issuecomment-112099290 Hi @vasia , In essence the `difference` method is just a fancy way of removing vertices, right? When you remove a vertex, you also remove the edge for which it was a source/target. Since the add/remove vertices methods work just for lists and collect is unsafe, we mutually agreed to overload `removeVertices` to work for data sets. This way you would duplicate the least amount of code. Otherwise, you would take the exact code in the DataSet removeVertices and duplicate it in difference. That's not very practical IMO. Also, it may occur that a user has a DataSet of elements to remove. An extra removeVertices won't really hurt then, would it? But if you have suggestions on how to improve this, we are more than eager to hear about them :) -Andra --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2149][gelly] Simplified Jaccard Example
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/770#issuecomment-111807874 PR updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2178][gelly] Fixed groupReduceOnNeighbo...
Github user andralungu commented on the pull request: https://github.com/apache/flink/pull/799#issuecomment-111761198 Yup, exactly! The use case was: I modified something in the edge data set, called groupReduceOnNeighbors on the result and got NPE, exception that could have been avoided with this check, just like was the case for the Degree NPE. Making sure that the iterator is not null can save some people lots of headaches, IMO :) And it doesn't hurt anyone who has a correct data set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2152] Added zipWithIndex
GitHub user andralungu opened a pull request: https://github.com/apache/flink/pull/832 [FLINK-2152] Added zipWithIndex This PR adds the zipWithIndex utility method to Flink's DataSetUtils as described in the mailing list discussion: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/The-correct-location-for-zipWithIndex-and-zipWithUniqueId-td6310.html. The method could, in the future, be moved to DataSet. @fhueske , @tillrohrmann , once we reach a conclusion for this one, I will also update #801 (I wouldn't like to fix unnecessary merge conflicts). Once zipWIthUniqueIds is added, I could also explain the difference in the docs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/andralungu/flink zipWithIndex Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/832.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #832 commit fdbf0167cc10e952faddc2a7d71e73e7f1f2d03f Author: andralungu lungu.an...@gmail.com Date: 2015-06-12T18:37:27Z [FLINK-2152] zipWithIndex implementation [FLINK-2152] Added zipWithIndex utility method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---